@ConnectionManagement(configElementName="config", friendlyName="Configuration") public class ConnectorConfig extends Object
<spark-sql:config name="SparkSql__Configuration" appName="sparksql-demo" master="spark://192.168.1.1:7077" cassandraIp="192.168.1.2" cassandraPort="9042" addCassandraJar="true"> <spark-sql:advanced-spark-conf> <spark-sql:advanced-spark-conf key="spark.executor.cores">1</spark-sql:advanced-spark-conf> <spark-sql:advanced-spark-conf key="spark.cores.max">2</spark-sql:advanced-spark-conf> </spark-sql:advanced-spark-conf> <spark-sql:list-file-data-props> <spark-sql:list-file-data-prop type="CSV" alias="csv_file" uri="hdfs://192.168.1.3:5001/demo/file1.csv" addJar="true" /> <spark-sql:list-file-data-prop type="JSON" alias="json_fime" uri="hdfs://192.168.1.3:5001/demo/file2.json" addJar="true" /> </spark-sql:list-file-data-props> <spark-sql:list-jdbc-data-props> <spark-sql:list-jdbc-data-prop driver="org.apache.derby.jdbc.ClientDriver" dbTable="" url="jdbc:derby://192.168.1.4:19000/MyDbTest" addJar="true"/> </spark-sql:list-jdbc-data-props> <spark-sql:list-cassandra-data-props> <spark-sql:list-cassandra-data-prop keyspace="demo" tableName=""/> </spark-sql:list-cassandra-data-props> </spark-sql:config>
Constructor and Description |
---|
ConnectorConfig() |
Modifier and Type | Method and Description |
---|---|
void |
addCsvAndAvroJars(org.apache.spark.SparkContext sc,
FileDataProperties fileDataProperties) |
protected void |
addDrivers() |
void |
addJarDrivers(org.apache.spark.SparkContext sc,
JdbcDataProperties jdbcDataProperties) |
void |
connect(String connectionKey,
String master,
String appName)
Connect to Apache Spark Cluster
|
protected void |
connectDataSources()
Connect all defined data sources to the sql context
|
void |
connectDataSources(CassandraDataProperties cassandraDataProperties)
Connect all defined data sources to the sql context
|
void |
connectDataSources(FileDataProperties fileDataProperties)
Connect all defined data sources to the sql context
|
void |
connectDataSources(JdbcDataProperties jdbcDataProperties)
Connect all defined data sources to the sql context
|
String |
connectionId()
login ID
|
protected boolean |
dataSourceAvro(FileDataProperties fileDataProperties) |
protected boolean |
dataSourceCsv(FileDataProperties fileDataProperties) |
protected void |
destroyRunner() |
void |
disconnect()
Disconnect
|
protected boolean |
empty(String field) |
UrlDelegatingCl |
ensureRunner() |
Boolean |
getAddCassandraJar() |
Map<String,String> |
getAdvancedSparkConf() |
String |
getCassandraIp() |
String |
getCassandraPort() |
String |
getHeartbeatInterval() |
List<CassandraDataProperties> |
getListCassandraDataProps() |
List<FileDataProperties> |
getListFileDataProps() |
List<JdbcDataProperties> |
getListJdbcDataProps() |
String |
getLocalIp() |
String |
getLocalPort() |
org.apache.spark.SparkContext |
getSparkContext() |
org.apache.spark.sql.SQLContext |
getSparkSqlContext() |
SparkSqlEnum.SqlContextType |
getSqlContextType() |
protected void |
innerConnect(String key,
String master,
String appName) |
boolean |
isConnected()
Test if Spark Context (driver) exists and is valid (connected if non
local)
|
void |
setAddCassandraJar(Boolean addCassandraJar) |
void |
setAdvancedSparkConf(Map<String,String> advancedSparkConf) |
void |
setCassandraIp(String cassandraIp) |
void |
setCassandraPort(String cassandraPort) |
void |
setHeartbeatInterval(String heartbeatInterval) |
void |
setListCassandraDataProps(List<CassandraDataProperties> listCassandraDataProps) |
void |
setListFileDataProps(List<FileDataProperties> listFileDataProps) |
void |
setListJdbcDataProps(List<JdbcDataProperties> listJdbcDataProps) |
void |
setLocalIp(String localIp) |
void |
setLocalPort(String localPort) |
void |
setSqlContextType(SparkSqlEnum.SqlContextType sqlContext) |
void |
testConnect(String key,
String master,
String appName)
Test Connect
|
public static final String SEPARATOR
public String getLocalIp()
public void setLocalIp(String localIp)
public String getLocalPort()
public void setLocalPort(String localPort)
public String getHeartbeatInterval()
public void setHeartbeatInterval(String heartbeatInterval)
public SparkSqlEnum.SqlContextType getSqlContextType()
public void setSqlContextType(SparkSqlEnum.SqlContextType sqlContext)
public List<FileDataProperties> getListFileDataProps()
public void setListFileDataProps(List<FileDataProperties> listFileDataProps)
public List<JdbcDataProperties> getListJdbcDataProps()
public void setListJdbcDataProps(List<JdbcDataProperties> listJdbcDataProps)
public String getCassandraIp()
public void setCassandraIp(String cassandraIp)
public String getCassandraPort()
public Boolean getAddCassandraJar()
public void setAddCassandraJar(Boolean addCassandraJar)
public void setCassandraPort(String cassandraPort)
public List<CassandraDataProperties> getListCassandraDataProps()
public void setListCassandraDataProps(List<CassandraDataProperties> listCassandraDataProps)
public org.apache.spark.sql.SQLContext getSparkSqlContext()
public org.apache.spark.SparkContext getSparkContext()
public UrlDelegatingCl ensureRunner()
protected void destroyRunner()
@Connect public void connect(@Optional @Password @ConnectionKey String connectionKey, String master, String appName) throws org.mule.api.ConnectionException
connectionKey
- Shared secret currently supported by Apache Spark for
authentication (Apache Spark 'spark.authenticate.secret'
option, 'spark.authenticate' will be set to 'true' if not
empty or null.).master
- Spark Master URL (Apache Spark 'spark.master' option).
Next values are currently supported (see
Apache Spark documentation):
appName
- The name of your application (Apache Spark 'spark.app.name'
option). This will appear in the UI and in log data.org.mule.api.ConnectionException
- if the connection does not occur@TestConnectivity public void testConnect(String key, String master, String appName) throws org.mule.api.ConnectionException
key
- Key to connect to server (if necessary)master
- Spark connectionappName
- App Nameorg.mule.api.ConnectionException
- if the connection failsprotected void innerConnect(String key, String master, String appName) throws SQLException, org.mule.api.ConnectionException
SQLException
org.mule.api.ConnectionException
protected void addDrivers()
protected boolean empty(String field)
public void addCsvAndAvroJars(org.apache.spark.SparkContext sc, FileDataProperties fileDataProperties)
protected boolean dataSourceCsv(FileDataProperties fileDataProperties)
protected boolean dataSourceAvro(FileDataProperties fileDataProperties)
public void addJarDrivers(org.apache.spark.SparkContext sc, JdbcDataProperties jdbcDataProperties)
protected void connectDataSources() throws SQLException
SQLException
- An exception that provides information on a database access
error or other errors.public void connectDataSources(FileDataProperties fileDataProperties)
fileDataProperties
- Hadoop HDFS file optionsSparkSqlConnectorException
- If file options are not validpublic void connectDataSources(JdbcDataProperties jdbcDataProperties) throws SQLException
jdbcDataProperties
- JDBC accesible database optionsSparkSqlConnectorException
- If new JDBC data source options are not validSQLException
- error thrown during jdbc configurationpublic void connectDataSources(CassandraDataProperties cassandraDataProperties)
cassandraDataProperties
- Cassandra database keyspace and tableSparkSqlConnectorException
- If schemas or table options are not valid@Disconnect public void disconnect()
@ValidateConnection public boolean isConnected()
@ConnectionIdentifier public String connectionId()
Copyright © 2010–2017. All rights reserved.