Re: Consuming Data in Parallel using Spark Streaming

2018-02-22 Thread naresh Goud
Here is my understanding, hope this gives some idea to understand how it
works. It might be wrong also, please excuse if it’s . I am trying to
derivating execution model with my understanding. Sorry it’s long email.

driver will keep polling Kafka for latest offset of each topic and then it
schedule jobs with offsets pulled from topic meta data.

Here job(processing logic1+  Logic2 + logic3).
These logics will be executed sequential only as defined in your
application code in executor.

Whenever job get started it will be started in one transaction which
includes following activities
Transaction
{
Get data from Kafka.
Execute logic1 -> logic2 -> logic3
Update processed record offset information
}

Having said that, coming to your approach mentioned for parallel processing
 If you pass three topics to single create Dstream spark will poll once to
get offsets of all topics instead of three poll if you create with with
different createDStream.

With the above mentioned approach of execution job is scheduled as below.
Job{
   Logic1 with offsets
Logic2 with its topic offsets
   Logic 3 with its offsets
}

With this approach also it executing logics in sequential.

Lets come to your last point of differentiate data by somehow and I am
assuming your application logic as below and  schedules job would look like
this

Job{
If(topic1 record){execute logic1)
If(topic2 record ) {execute logic2}
If(topic3 record) {execute logic3}
}

This is also leads to sequential execution.


distributed system are not designed  to execute parts of  job in parallel,
instead it will execute whole job across partitions of data in parallel.

To summarize it will be possible to parallelism is possible within each
topic processing not across processing different topics. Assume if you have
partition for a topic 3, then there would be 3 executors run parallel
executing job.





On Thu, Feb 22, 2018 at 9:44 PM Vibhakar, Beejal <
beejal.vibha...@fisglobal.com> wrote:

> Naresh – Thanks for taking out time to respond.
>
>
>
> So is it right to say that it’s the Driver program which at every 30
> seconds tells the executors (Which manage the Streams) to run rather than
> each executor making that decision themselves? And this really makes it
> sequential execution in my case?
>
>
>
> BTW, do you think following would be more suitable way to run this in
> parallel?
>
>
>
>- Right now I am creating 3 DataStream, one for each entity using
>KafkaUtils.createDirectStream API
>- While creating each DataStream, I pass on a single Kafka topic
>- Instead of creating 3 DataStream if I create a single DataStream and
>pass on multiple Kafka topics (TOPIC1, TOPIC2, TOPIC3)  to it, it should be
>able to parallelize the processing (We just need to allocate right number
>of executors)
>- To have separate processing logic for each entity, I just need some
>way to differentiate records of one type of entity from other type of
>entities.
>
>
>
> -Beejal
>
>
>
> *From:* naresh Goud [mailto:nareshgoud.du...@gmail.com]
> *Sent:* Friday, February 23, 2018 8:56 AM
> *To:* Vibhakar, Beejal <beejal.vibha...@fisglobal.com>
> *Subject:* Re: Consuming Data in Parallel using Spark Streaming
>
>
>
> You will have the same behavior both in local and hadoop cluster.
>
> since there will be only one stream context in driver which runs in Single
> JVM).
>
>
>
> On Wed, Feb 21, 2018 at 9:12 PM, Vibhakar, Beejal <
> beejal.vibha...@fisglobal.com> wrote:
>
> I am trying to process data from 3 different Kafka topics using 3
> InputDStream with a single StreamingContext. I am currently testing this
> under Sandbox where I see data processed from one Kafka topic followed by
> other.
>
>
>
> *Question#1:* I want to understand that when I run this program in Hadoop
> cluster, will it process the data in parallel from 3 Kafka topics OR will I
> see the same behavior as I see in my Sandbox?
>
>
>
> *Question#2:* I aim to process the data from all three Kafka topics in
> parallel.  Can I achieve this without breaking this program into 3 separate
> smaller programs?
>
>
>
> Here’s how the code template looks like..
>
>
>
>*val* ssc = *new* StreamingContext(sc, 30)
>
>
>
> *val topic1 = Array(“TOPIC1”)*
>
>
>
>*val* dataStreamTopic1 = KafkaUtils.createDirectStream[Array[Byte],
> GenericRecord](
>
>   ssc,
>
>   PreferConsistent,
>
>   Subscribe[Array[Byte], GenericRecord](*topic1*, kafkaParms))
>
>
>
>  // Processing logic for dataStreamTopic1
>
>
>
>
>
> *val topic2 = Array(“TOPIC2”)*
>
>
>
>*val* dataStreamTopic2 = KafkaUtils.createDirectStream[Array[Byte],
&g

RE: Consuming Data in Parallel using Spark Streaming

2018-02-22 Thread Vibhakar, Beejal
Naresh – Thanks for taking out time to respond.

So is it right to say that it’s the Driver program which at every 30 seconds 
tells the executors (Which manage the Streams) to run rather than each executor 
making that decision themselves? And this really makes it sequential execution 
in my case?

BTW, do you think following would be more suitable way to run this in parallel?


  *   Right now I am creating 3 DataStream, one for each entity using 
KafkaUtils.createDirectStream API
  *   While creating each DataStream, I pass on a single Kafka topic
  *   Instead of creating 3 DataStream if I create a single DataStream and pass 
on multiple Kafka topics (TOPIC1, TOPIC2, TOPIC3)  to it, it should be able to 
parallelize the processing (We just need to allocate right number of executors)
  *   To have separate processing logic for each entity, I just need some way 
to differentiate records of one type of entity from other type of entities.

-Beejal

From: naresh Goud [mailto:nareshgoud.du...@gmail.com]
Sent: Friday, February 23, 2018 8:56 AM
To: Vibhakar, Beejal <beejal.vibha...@fisglobal.com>
Subject: Re: Consuming Data in Parallel using Spark Streaming

You will have the same behavior both in local and hadoop cluster.
since there will be only one stream context in driver which runs in Single JVM).

On Wed, Feb 21, 2018 at 9:12 PM, Vibhakar, Beejal 
<beejal.vibha...@fisglobal.com<mailto:beejal.vibha...@fisglobal.com>> wrote:
I am trying to process data from 3 different Kafka topics using 3 InputDStream 
with a single StreamingContext. I am currently testing this under Sandbox where 
I see data processed from one Kafka topic followed by other.

Question#1: I want to understand that when I run this program in Hadoop 
cluster, will it process the data in parallel from 3 Kafka topics OR will I see 
the same behavior as I see in my Sandbox?

Question#2: I aim to process the data from all three Kafka topics in parallel.  
Can I achieve this without breaking this program into 3 separate smaller 
programs?

Here’s how the code template looks like..

   val ssc = new StreamingContext(sc, 30)

val topic1 = Array(“TOPIC1”)

   val dataStreamTopic1 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic1, kafkaParms))

 // Processing logic for dataStreamTopic1


val topic2 = Array(“TOPIC2”)

   val dataStreamTopic2 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic2, kafkaParms))

 // Processing logic for dataStreamTopic2


val topic3 = Array(“TOPIC3”)

   val dataStreamTopic3 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic3, kafkaParms))

 // Processing logic for dataStreamTopic3

// Start the Streaming
ssc.start()
ssc.awaitTermination()

Here’s how I submit my spark job on my sandbox…

./bin/spark-submit --class  --master local[*] 

Thanks,
Beejal


The information contained in this message is proprietary and/or confidential. 
If you are not the intended recipient, please: (i) delete the message and all 
copies; (ii) do not disclose, distribute or use the message in any manner; and 
(iii) notify the sender immediately. In addition, please be aware that any 
message addressed to our domain is subject to archiving and review by persons 
other than the intended recipient. Thank you.

The information contained in this message is proprietary and/or confidential. 
If you are not the intended recipient, please: (i) delete the message and all 
copies; (ii) do not disclose, distribute or use the message in any manner; and 
(iii) notify the sender immediately. In addition, please be aware that any 
message addressed to our domain is subject to archiving and review by persons 
other than the intended recipient. Thank you.


Consuming Data in Parallel using Spark Streaming

2018-02-21 Thread Vibhakar, Beejal
I am trying to process data from 3 different Kafka topics using 3 InputDStream 
with a single StreamingContext. I am currently testing this under Sandbox where 
I see data processed from one Kafka topic followed by other.

Question#1: I want to understand that when I run this program in Hadoop 
cluster, will it process the data in parallel from 3 Kafka topics OR will I see 
the same behavior as I see in my Sandbox?

Question#2: I aim to process the data from all three Kafka topics in parallel.  
Can I achieve this without breaking this program into 3 separate smaller 
programs?

Here's how the code template looks like..

   val ssc = new StreamingContext(sc, 30)

val topic1 = Array("TOPIC1")

   val dataStreamTopic1 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic1, kafkaParms))

 // Processing logic for dataStreamTopic1


val topic2 = Array("TOPIC2")

   val dataStreamTopic2 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic2, kafkaParms))

 // Processing logic for dataStreamTopic2


val topic3 = Array("TOPIC3")

   val dataStreamTopic3 = KafkaUtils.createDirectStream[Array[Byte], 
GenericRecord](
  ssc,
  PreferConsistent,
  Subscribe[Array[Byte], GenericRecord](topic3, kafkaParms))

 // Processing logic for dataStreamTopic3

// Start the Streaming
ssc.start()
ssc.awaitTermination()

Here's how I submit my spark job on my sandbox...

./bin/spark-submit --class  --master local[*] 

Thanks,
Beejal


The information contained in this message is proprietary and/or confidential. 
If you are not the intended recipient, please: (i) delete the message and all 
copies; (ii) do not disclose, distribute or use the message in any manner; and 
(iii) notify the sender immediately. In addition, please be aware that any 
message addressed to our domain is subject to archiving and review by persons 
other than the intended recipient. Thank you.