Annypoint connector SparkSql enables Apache Spark SQL to be used in Mule by providing integration inside Mule flows. The main functionality the Spark SQL Connector is to allow the execution of Spark job to extract structured data using Spark SQL capabilities. Version 1.0.0 allows a user to submit a job (defined as a SQL Query) into a Spark standalone Cluster and retrieve the results as a collection of entities. A use case can be defining a context with sources like a JSON file at hadoop, a cassandra DB and a PostgreSQL DB and execute a SQL Query (a SparkSQL Job) that joins and filters those data sources and produces, as result, a list of objects with requested data.

Additional Info

Requires Mule Enterprise License

Yes  

Requires Entitlement

Yes  

Mule Version

3.6.0 or higher

Configs


Configuration

<spark-sql:config>

Connection Management

Spark SQL Configuration contains the required information to connect to a Apache Spark Cluster. The global configuration will hold what is usually called the Apache Spark 'Driver' or 'Spark Context', the process that sends tasks to executors and listen to them during their lifetime.
See (see Apache Spark configuration) and (see Apache Spark security)

XML Sample


    <spark-sql:config name="SparkSql__Configuration" appName="sparksql-demo" master="spark://192.168.1.1:7077" 
    	 cassandraIp="192.168.1.2" cassandraPort="9042" 
    	 addCassandraJar="true">
    	<spark-sql:advanced-spark-conf>
            <spark-sql:advanced-spark-conf key="spark.executor.cores">1</spark-sql:advanced-spark-conf>
            <spark-sql:advanced-spark-conf key="spark.cores.max">2</spark-sql:advanced-spark-conf>
        </spark-sql:advanced-spark-conf>
        <spark-sql:list-file-data-props>
            <spark-sql:list-file-data-prop type="CSV" alias="communications" uri="hdfs://192.168.1.3:5001/spark-demo/com.csv" addJar="true" />
            <spark-sql:list-file-data-prop type="JSON" alias="chargebacks" uri="hdfs://192.168.1.3:5001/spark-demo/char.json" addJar="true" />
        </spark-sql:list-file-data-props>
        <spark-sql:list-jdbc-data-props>
    		<spark-sql:list-jdbc-data-prop driver="org.apache.derby.jdbc.ClientDriver" dbTable="" 
    			url="jdbc:derby://192.168.1.4:19000/MyDbTest" addJar="true"/>
        </spark-sql:list-jdbc-data-props>
        <spark-sql:list-cassandra-data-props>
            <spark-sql:list-cassandra-data-prop keyspace="demo" tableName=""/>
        </spark-sql:list-cassandra-data-props>
    </spark-sql:config>

Attributes

Name Java Type Description Default Value Required

name

String

The name of this configuration. With this name can be later referenced.

x 

connectionKey

String

Shared secret currently supported by Apache Spark for authentication (Apache Spark 'spark.authenticate.secret' option, 'spark.authenticate' will be set to 'true' if not empty or null.).

 

master

String

Spark Master URL (Apache Spark 'spark.master' option). Next values are currently supported (see Apache Spark documentation):

  • local - Run Spark locally with one worker thread (i.e. no parallelism at all).
  • local[K] - Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
  • local[*] - Run Spark locally with as many worker threads as logical cores on your machine.
  • spark://HOST:PORT - Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.

x 

appName

String

The name of your application (Apache Spark 'spark.app.name' option). This will appear in the UI and in log data.

x 

localIp

String

Hostname or IP address for the driver to listen on. This is used for communicating with the executors and the standalone Master (Apache Spark 'spark.driver.host' option).

 

localPort

String

Port for the driver to listen on. This is used for communicating with the executors and the standalone Master (Apache Spark 'spark.driver.port' option).

 

heartbeatInterval

String

Interval between each executor's heartbeats to the driver. Heartbeats let the driver know that the executor is still alive and update it with metrics for in-progress tasks (Apache Spark 'spark.executor.heartbeatInterval' option).

10s

 

advancedSparkConf

Map<String,String>

A map that can be used to configure any Apache Spark option that can not be set directly in this Configuration (see Apache Spark configuration)

 

sqlContextType

SqlContextType

Customize the entry point to Spark SQL functionallity (see Apache Spark SQL programming guide). Currently only SQL (SQLContext) is supported.

SQL

 

listFileDataProps

List<FileDataProperties>

A list of files (at Hadoop HDFS) to be included as data sources and that will be available as 'tables' to the the Apache Spark SQL Context.

 

listJdbcDataProps

List<JdbcDataProperties>

A list of JDBC managed database tables to be included as data sources and that will be available as 'tables' to the the Apache Spark SQL Context.

 

cassandraIp

String

Cassandra database IP to connect to. Set this option only if you plan to use a Cassandra database as another data source.

 

cassandraPort

String

Cassandra database PORT to connect to. Set this option only if you plan to use a Cassandra database as another data source.

 

addCassandraJar

Boolean

Flag used to instruct the Spark Context to send cassandra driver to Spark Workers. This is usually required (true) if you plan to use a Cassandra database as another data source.

false

 

listCassandraDataProps

List<CassandraDataProperties>

The list of cassandra keyspaces and table names to work with. Set this option only if you plan to use a Cassandra database as another data source. They will be included as data sources and will be available as 'tables' to the the Apache Spark SQL Context.

 

Processors


Add jdbc data properties

<spark-sql:add-jdbc-data-properties>

Dynamically add new datasources from JDBC accesible databases

XML Sample

<spark-sql:add-jbdc-data-properties config-ref="SparkSql__Configuration" >
        <spark-sql:list-jbdc-data-props>
            <spark-sql:list-jbdc-data-prop driver="org.apache.derby.jdbc.ClientDriver" dbTable="derbyDb" url="jdbc:derby://xxx.xxx.xxx.xxx:1527/MyDbTest" addJar="true"/>
        </spark-sql:list-jbdc-data-props>
    </spark-sql:add-jbdc-data-properties>

Attributes

Name Java Type Description Default Value Required

config-ref

String

Specify which config to use

x 

listJdbcDataProps

List<JdbcDataProperties>

List of JDBC accesible databases options

x 


Add file data properties

<spark-sql:add-file-data-properties>

Dynamically add new datasources from Hadoop HDFS files

XML Sample

<spark-sql:add-file-data-properties config-ref="SparkSql__Configuration" >
        <spark-sql:list-file-data-props>
            <spark-sql:list-file-data-prop type="JSON" uri="hdfs://xxx.xxx.xxx.xxx:19000/user/ex1.json" alias="json_table"/>
        </spark-sql:list-file-data-props>
    </spark-sql:add-file-data-properties>

Attributes

Name Java Type Description Default Value Required

config-ref

String

Specify which config to use

x 

listFileDataProps

List<FileDataProperties>

List of Hadoop HDFS files options

x 


Add cassandra data properties

<spark-sql:add-cassandra-data-properties>

Dynamically add new datasources from Cassandra database keyspaces and tables

XML Sample

<spark-sql:add-cassandra-data-properties config-ref="SparkSql__Configuration2" >
            <spark-sql:list-cassandra-data-props>
                <spark-sql:list-cassandra-data-prop keyspace="keyspaceName" tableName="tableName"/>
        </spark-sql:list-cassandra-data-props>
    </spark-sql:add-cassandra-data-properties>

Attributes

Name Java Type Description Default Value Required

config-ref

String

Specify which config to use

x 

listCassandraDataProps

List<CassandraDataProperties>

List of Cassandra database keyspaces and tables

x 


Custom sql

<spark-sql:custom-sql>

Run an arbitraty SQL query through Spark SQL Context.

XML Sample

<spark-sql:custom-sql config-ref="SparkSql__Configuration" sql="select * from json_table"/>

Attributes

Name Java Type Description Default Value Required

config-ref

String

Specify which config to use

x 

sql

String

SQL Query

x 

Returns

Return Java Type Description

Object

List> with query results or an empty


Sql select

<spark-sql:sql-select>

SQL Select

XML Sample

<spark-sql:sql-select config-ref="SparkSql__Configuration" sqlSelect="select * from json_table"/>

Attributes

Name Java Type Description Default Value Required

config-ref

String

Specify which config to use

x 

sqlSelect

String

SQL select query to run into the Apache Spark Cluster through the Spark SQL Context.

x 

temporaryTableName

String

If set, it will be the name of a new temporary table available at the Spark SQL Context with current query results.

 

streaming

Boolean

if set (true) the result will be of type 'Iterator>' and not a 'List>'. This is the way to avoid loading large sizes of data into memory.

 

Returns

Return Java Type Description

Object

the result will be the structured data found by the query.. If


Drop temporary table

<spark-sql:drop-temporary-table>

Drop a temporary table present at the Spark SQL Context.

XML Sample

<spark-sql:drop-temporary-table config-ref="SparkSql__Configuration" temporaryTableName="avro_table" />

Attributes

Name Java Type Description Default Value Required

config-ref

String

Specify which config to use

x 

temporaryTableName

String

Temporary table name

x 


Disconnect

<spark-sql:disconnect>

Force global configuration disconnection. This way, the Spark Context / "driver" will be cleared and all related resources freed (Apache Spark Executors for example)

Attributes

Name Java Type Description Default Value Required

config-ref

String

Specify which config to use

x