Re: Spark streaming from Kafka best fit

2016-03-07 Thread pratik khadloya
Would using mapPartitions instead of map help here?

~Pratik

On Tue, Mar 1, 2016 at 10:07 AM Cody Koeninger  wrote:

> You don't need an equal number of executor cores to partitions.  An
> executor can and will work on multiple partitions within a batch, one after
> the other.  The real issue is whether you are able to keep your processing
> time under your batch time, so that delay doesn't increase.
>
> On Tue, Mar 1, 2016 at 11:59 AM, Jatin Kumar 
> wrote:
>
>> Thanks Cody!
>>
>> I understand what you said and if I am correct it will be using 224
>> executor cores just for fetching + stage-1 processing of 224 partitions. I
>> will obviously need more cores for processing further stages and fetching
>> next batch.
>>
>> I will start with higher number of executor cores and see how it goes.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger 
>> wrote:
>>
>>> > "How do I keep a balance of executors which receive data from Kafka
>>> and which process data"
>>>
>>> I think you're misunderstanding how the direct stream works.  The
>>> executor which receives data is also the executor which processes data,
>>> there aren't separate receivers.  If it's a single stage worth of work
>>> (e.g. straight map / filter), the processing of a given partition is going
>>> to be done by the executor that read it from kafka.  If you do something
>>> involving a shuffle (e.g. reduceByKey), other executors will do additional
>>> processing.  The question of which executor works on which tasks is up to
>>> the scheduler (and getPreferredLocations, which only matters if you're
>>> running spark on the same nodes as kafka)
>>>
>>> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
>>> jku...@rocketfuelinc.com.invalid> wrote:
>>>
 Hello all,

 I see that there are as of today 3 ways one can read from Kafka in
 spark streaming:
 1. KafkaUtils.createStream() (here
 )
 2. KafkaUtils.createDirectStream() (here
 )
 3. Kafka-spark-consumer (here
 )

 My spark streaming application has to read from 1 kafka topic with
 around 224 partitions, consuming data at around 150MB/s (~90,000
 messages/sec) which reduces to around 3MB/s (~1400 messages/sec) after
 filtering. After filtering I need to maintain top 1 URL counts. I don't
 really care about exactly once semantics as I am interested in rough
 estimate.

 Code:

 sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
 sparkConf.setAppName("KafkaReader")
 val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
 createStreamingContext)

 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
 val kafkaParams = Map[String, String](
   "metadata.broker.list" -> "kafka.server.ip:9092",
   "group.id" -> consumer_group
 )

 val lineStreams = (1 to N).map{ _ =>
   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
 ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
 }

 ssc.union(
   lineStreams.map(stream => {
   stream.map(ParseStringToLogRecord)
 .filter(record => isGoodRecord(record))
 .map(record => record.url)
   })
 ).window(Seconds(120), Seconds(120))  // 2 Minute window
   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
 moving window, 28 will probably help in parallelism
   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
   .mapPartitions(iter => {
 iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
 1000).iterator
   }, true)
   .foreachRDD((latestRDD, rddTime) => {
   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
 record._1)).sortByKey(false).take(1000))
   })

 ssc.start()
 ssc.awaitTermination()

 Questions:

 a) I used #2 but I found that I couldn't control how many executors
 will be actually fetching from Kafka. How do I keep a balance of executors
 which receive data from Kafka and which process data? Do they keep changing
 for every batch?

 b) Now I am trying to use #1 creating multiple DStreams, filtering them
 and then doing a union. I don't understand why would the number of events
 processed per 120 seconds batch will change drastically. PFA the events/sec
 graph while running with 1 receiver. How to debug this?

 c) What will be the most suitable method to integrate with Kafka from
 above 3? Any recommendations for getting maximum performance, running the
 streaming application reliably in production environment?

 --
 Thanks
 Jatin Kumar


>>

Re: Spark streaming from Kafka best fit

2016-03-01 Thread Cody Koeninger
You don't need an equal number of executor cores to partitions.  An
executor can and will work on multiple partitions within a batch, one after
the other.  The real issue is whether you are able to keep your processing
time under your batch time, so that delay doesn't increase.

On Tue, Mar 1, 2016 at 11:59 AM, Jatin Kumar 
wrote:

> Thanks Cody!
>
> I understand what you said and if I am correct it will be using 224
> executor cores just for fetching + stage-1 processing of 224 partitions. I
> will obviously need more cores for processing further stages and fetching
> next batch.
>
> I will start with higher number of executor cores and see how it goes.
>
> --
> Thanks
> Jatin Kumar | Rocket Scientist
> +91-7696741743 m
>
> On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger  wrote:
>
>> > "How do I keep a balance of executors which receive data from Kafka
>> and which process data"
>>
>> I think you're misunderstanding how the direct stream works.  The
>> executor which receives data is also the executor which processes data,
>> there aren't separate receivers.  If it's a single stage worth of work
>> (e.g. straight map / filter), the processing of a given partition is going
>> to be done by the executor that read it from kafka.  If you do something
>> involving a shuffle (e.g. reduceByKey), other executors will do additional
>> processing.  The question of which executor works on which tasks is up to
>> the scheduler (and getPreferredLocations, which only matters if you're
>> running spark on the same nodes as kafka)
>>
>> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
>> jku...@rocketfuelinc.com.invalid> wrote:
>>
>>> Hello all,
>>>
>>> I see that there are as of today 3 ways one can read from Kafka in spark
>>> streaming:
>>> 1. KafkaUtils.createStream() (here
>>> )
>>> 2. KafkaUtils.createDirectStream() (here
>>> )
>>> 3. Kafka-spark-consumer (here
>>> )
>>>
>>> My spark streaming application has to read from 1 kafka topic with
>>> around 224 partitions, consuming data at around 150MB/s (~90,000
>>> messages/sec) which reduces to around 3MB/s (~1400 messages/sec) after
>>> filtering. After filtering I need to maintain top 1 URL counts. I don't
>>> really care about exactly once semantics as I am interested in rough
>>> estimate.
>>>
>>> Code:
>>>
>>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
>>> sparkConf.setAppName("KafkaReader")
>>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
>>> createStreamingContext)
>>>
>>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>> val kafkaParams = Map[String, String](
>>>   "metadata.broker.list" -> "kafka.server.ip:9092",
>>>   "group.id" -> consumer_group
>>> )
>>>
>>> val lineStreams = (1 to N).map{ _ =>
>>>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>>> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
>>> }
>>>
>>> ssc.union(
>>>   lineStreams.map(stream => {
>>>   stream.map(ParseStringToLogRecord)
>>> .filter(record => isGoodRecord(record))
>>> .map(record => record.url)
>>>   })
>>> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>>>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
>>> moving window, 28 will probably help in parallelism
>>>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>>>   .mapPartitions(iter => {
>>> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
>>> 1000).iterator
>>>   }, true)
>>>   .foreachRDD((latestRDD, rddTime) => {
>>>   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
>>> record._1)).sortByKey(false).take(1000))
>>>   })
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>> Questions:
>>>
>>> a) I used #2 but I found that I couldn't control how many executors will
>>> be actually fetching from Kafka. How do I keep a balance of executors which
>>> receive data from Kafka and which process data? Do they keep changing for
>>> every batch?
>>>
>>> b) Now I am trying to use #1 creating multiple DStreams, filtering them
>>> and then doing a union. I don't understand why would the number of events
>>> processed per 120 seconds batch will change drastically. PFA the events/sec
>>> graph while running with 1 receiver. How to debug this?
>>>
>>> c) What will be the most suitable method to integrate with Kafka from
>>> above 3? Any recommendations for getting maximum performance, running the
>>> streaming application reliably in production environment?
>>>
>>> --
>>> Thanks
>>> Jatin Kumar
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>>
>


Re: Spark streaming from Kafka best fit

2016-03-01 Thread Jatin Kumar
Thanks Cody!

I understand what you said and if I am correct it will be using 224
executor cores just for fetching + stage-1 processing of 224 partitions. I
will obviously need more cores for processing further stages and fetching
next batch.

I will start with higher number of executor cores and see how it goes.

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger  wrote:

> > "How do I keep a balance of executors which receive data from Kafka and
> which process data"
>
> I think you're misunderstanding how the direct stream works.  The executor
> which receives data is also the executor which processes data, there aren't
> separate receivers.  If it's a single stage worth of work (e.g. straight
> map / filter), the processing of a given partition is going to be done by
> the executor that read it from kafka.  If you do something involving a
> shuffle (e.g. reduceByKey), other executors will do additional processing.
> The question of which executor works on which tasks is up to the scheduler
> (and getPreferredLocations, which only matters if you're running spark on
> the same nodes as kafka)
>
> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
> jku...@rocketfuelinc.com.invalid> wrote:
>
>> Hello all,
>>
>> I see that there are as of today 3 ways one can read from Kafka in spark
>> streaming:
>> 1. KafkaUtils.createStream() (here
>> )
>> 2. KafkaUtils.createDirectStream() (here
>> )
>> 3. Kafka-spark-consumer (here
>> )
>>
>> My spark streaming application has to read from 1 kafka topic with around
>> 224 partitions, consuming data at around 150MB/s (~90,000 messages/sec)
>> which reduces to around 3MB/s (~1400 messages/sec) after filtering. After
>> filtering I need to maintain top 1 URL counts. I don't really care
>> about exactly once semantics as I am interested in rough estimate.
>>
>> Code:
>>
>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
>> sparkConf.setAppName("KafkaReader")
>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
>> createStreamingContext)
>>
>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>> val kafkaParams = Map[String, String](
>>   "metadata.broker.list" -> "kafka.server.ip:9092",
>>   "group.id" -> consumer_group
>> )
>>
>> val lineStreams = (1 to N).map{ _ =>
>>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
>> }
>>
>> ssc.union(
>>   lineStreams.map(stream => {
>>   stream.map(ParseStringToLogRecord)
>> .filter(record => isGoodRecord(record))
>> .map(record => record.url)
>>   })
>> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
>> moving window, 28 will probably help in parallelism
>>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>>   .mapPartitions(iter => {
>> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
>> 1000).iterator
>>   }, true)
>>   .foreachRDD((latestRDD, rddTime) => {
>>   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
>> record._1)).sortByKey(false).take(1000))
>>   })
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>> Questions:
>>
>> a) I used #2 but I found that I couldn't control how many executors will
>> be actually fetching from Kafka. How do I keep a balance of executors which
>> receive data from Kafka and which process data? Do they keep changing for
>> every batch?
>>
>> b) Now I am trying to use #1 creating multiple DStreams, filtering them
>> and then doing a union. I don't understand why would the number of events
>> processed per 120 seconds batch will change drastically. PFA the events/sec
>> graph while running with 1 receiver. How to debug this?
>>
>> c) What will be the most suitable method to integrate with Kafka from
>> above 3? Any recommendations for getting maximum performance, running the
>> streaming application reliably in production environment?
>>
>> --
>> Thanks
>> Jatin Kumar
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


Re: Spark streaming from Kafka best fit

2016-03-01 Thread Cody Koeninger
> "How do I keep a balance of executors which receive data from Kafka and
which process data"

I think you're misunderstanding how the direct stream works.  The executor
which receives data is also the executor which processes data, there aren't
separate receivers.  If it's a single stage worth of work (e.g. straight
map / filter), the processing of a given partition is going to be done by
the executor that read it from kafka.  If you do something involving a
shuffle (e.g. reduceByKey), other executors will do additional processing.
The question of which executor works on which tasks is up to the scheduler
(and getPreferredLocations, which only matters if you're running spark on
the same nodes as kafka)

On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> Hello all,
>
> I see that there are as of today 3 ways one can read from Kafka in spark
> streaming:
> 1. KafkaUtils.createStream() (here
> )
> 2. KafkaUtils.createDirectStream() (here
> )
> 3. Kafka-spark-consumer (here
> )
>
> My spark streaming application has to read from 1 kafka topic with around
> 224 partitions, consuming data at around 150MB/s (~90,000 messages/sec)
> which reduces to around 3MB/s (~1400 messages/sec) after filtering. After
> filtering I need to maintain top 1 URL counts. I don't really care
> about exactly once semantics as I am interested in rough estimate.
>
> Code:
>
> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
> sparkConf.setAppName("KafkaReader")
> val ssc = StreamingContext.getOrCreate(kCheckPointDir, createStreamingContext)
>
> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
> val kafkaParams = Map[String, String](
>   "metadata.broker.list" -> "kafka.server.ip:9092",
>   "group.id" -> consumer_group
> )
>
> val lineStreams = (1 to N).map{ _ =>
>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
> }
>
> ssc.union(
>   lineStreams.map(stream => {
>   stream.map(ParseStringToLogRecord)
> .filter(record => isGoodRecord(record))
> .map(record => record.url)
>   })
> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute moving 
> window, 28 will probably help in parallelism
>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>   .mapPartitions(iter => {
> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
> 1000).iterator
>   }, true)
>   .foreachRDD((latestRDD, rddTime) => {
>   printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
> record._1)).sortByKey(false).take(1000))
>   })
>
> ssc.start()
> ssc.awaitTermination()
>
> Questions:
>
> a) I used #2 but I found that I couldn't control how many executors will
> be actually fetching from Kafka. How do I keep a balance of executors which
> receive data from Kafka and which process data? Do they keep changing for
> every batch?
>
> b) Now I am trying to use #1 creating multiple DStreams, filtering them
> and then doing a union. I don't understand why would the number of events
> processed per 120 seconds batch will change drastically. PFA the events/sec
> graph while running with 1 receiver. How to debug this?
>
> c) What will be the most suitable method to integrate with Kafka from
> above 3? Any recommendations for getting maximum performance, running the
> streaming application reliably in production environment?
>
> --
> Thanks
> Jatin Kumar
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>