Hammad,

The recommended way to implement this logic would be to:

Create a SparkSession.
Create a Streaming Context using the SparkContext embedded in the
SparkSession

Use the single SparkSession instance for the SQL operations within the
foreachRDD.
It's important to note that spark operations can process the complete
dataset. In this case, there's no need to do a perPartition or perElement
operation. (that would be the case if we were directly using the drivers
API and DB connections)

Reorganizing the code in the question a bit, we should have:

 SparkSession sparkSession = SparkSession
            .builder()
            .setMaster("local[2]").setAppName("TransformerStreamPOC")

            .config("spark.some.config.option", "some-value")
            .getOrCreate();

JavaStreamingContext jssc = new
JavaStreamingContext(sparkSession.sparkContext,
Durations.seconds(60));

// this dataset doesn't seem to depend on the received data, so we can
load it once.

Dataset<Row> baselineData =
sparkSession.read().jdbc(MYSQL_CONNECTION_URL, "table_name",
connectionProperties);

// create dstream

DStream<???> dstream = ...

... operations on dstream...

dstream.foreachRDD { rdd =>

    Dataset<???> incomingData = sparkSession.createDataset(rdd)

   ... do something the incoming dataset, eg. join with the baseline ...

   DataFrame joined =  incomingData.join(baselineData, ...)

   ... do something with joined ...

  }


kr, Gerard.

On Sun, Oct 1, 2017 at 7:55 PM, Hammad <ham...@flexilogix.com> wrote:

> Hello,
>
> *Background:*
>
> I have Spark Streaming context;
>
> SparkConf conf = new 
> SparkConf().setMaster("local[2]").setAppName("TransformerStreamPOC");
> conf.set("spark.driver.allowMultipleContexts", "true");   *<== this*
> JavaStreamingContext jssc = new JavaStreamingContext(conf, 
> Durations.seconds(60));
>
>
> that subscribes to certain kafka *topics*;
>
> JavaInputDStream<ConsumerRecord<String, String>> stream =
>         KafkaUtils.createDirectStream(
>                 jssc,
>                 LocationStrategies.PreferConsistent(),
>                 ConsumerStrategies.<String, String>Subscribe(*topics*, 
> kafkaParams)
>         );
>
> when messages arrive in queue, I recursively process them as follows (below 
> code section will repeat in Question statement)
>
> stream.foreachRDD(rdd -> {
> //process here - below two scenarions code is inserted here
>
> });
>
>
> *Question starts here:*
>
> Since I need to apply SparkSQL to received events in Queue - I create 
> SparkSession with two scenarios;
>
> *1) Per partition one sparkSession (after 
> "spark.driver.allowMultipleContexts" set to true); so all events under this 
> partition are handled by same sparkSession*
>
> rdd.foreachPartition(partition -> {
>     SparkSession sparkSession = SparkSession
>             .builder()
>             .appName("Java Spark SQL basic example")
>             .config("spark.some.config.option", "some-value")
>             .getOrCreate();
>
>     while (partition.hasNext()) {
>       Dataset<Row> df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL, 
> "table_name", connectionProperties);
>
>     }}
>
> *2) Per event under each session; so each event under each queue under each 
> stream has one sparkSession;*
>
> rdd.foreachPartition(partition -> {    while (partition.hasNext()) {    
> SparkSession sparkSession = SparkSession.builder().appName("Java Spark SQL 
> basic example").config("spark.some.config.option", 
> "some-value").getOrCreate();
>
>     Dataset<Row> df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL, 
> "table_name", connectionProperties);
>
>     }}
>
>
> Is it good practice to create multiple contexts (lets say 10 or 100)?
> How does number of sparkContext to be allowed vs number of worker nodes
> relate?
> What are performance considerations with respect to scenario1 and
> scenario2?
>
> I am looking for these answers as I feel there is more to what I
> understand of performance w.r.t sparkContexts created by a streaming
> application.
> Really appreciate your support in anticipation.
>
> Hammad
>
>

Reply via email to