Thanks Cody!

I understand what you said and if I am correct it will be using 224
executor cores just for fetching + stage-1 processing of 224 partitions. I
will obviously need more cores for processing further stages and fetching
next batch.

I will start with higher number of executor cores and see how it goes.

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger <c...@koeninger.org> wrote:

> > "How do I keep a balance of executors which receive data from Kafka and
> which process data"
>
> I think you're misunderstanding how the direct stream works.  The executor
> which receives data is also the executor which processes data, there aren't
> separate receivers.  If it's a single stage worth of work (e.g. straight
> map / filter), the processing of a given partition is going to be done by
> the executor that read it from kafka.  If you do something involving a
> shuffle (e.g. reduceByKey), other executors will do additional processing.
> The question of which executor works on which tasks is up to the scheduler
> (and getPreferredLocations, which only matters if you're running spark on
> the same nodes as kafka)
>
> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
> jku...@rocketfuelinc.com.invalid> wrote:
>
>> Hello all,
>>
>> I see that there are as of today 3 ways one can read from Kafka in spark
>> streaming:
>> 1. KafkaUtils.createStream() (here
>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
>> 2. KafkaUtils.createDirectStream() (here
>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
>> 3. Kafka-spark-consumer (here
>> <https://github.com/dibbhatt/kafka-spark-consumer>)
>>
>> My spark streaming application has to read from 1 kafka topic with around
>> 224 partitions, consuming data at around 150MB/s (~90,000 messages/sec)
>> which reduces to around 3MB/s (~1400 messages/sec) after filtering. After
>> filtering I need to maintain top 10000 URL counts. I don't really care
>> about exactly once semantics as I am interested in rough estimate.
>>
>> Code:
>>
>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
>> sparkConf.setAppName("KafkaReader")
>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
>> createStreamingContext)
>>
>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>> val kafkaParams = Map[String, String](
>>   "metadata.broker.list" -> "kafka.server.ip:9092",
>>   "group.id" -> consumer_group
>> )
>>
>> val lineStreams = (1 to N).map{ _ =>
>>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>>     ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
>> }
>>
>> ssc.union(
>>   lineStreams.map(stream => {
>>   stream.map(ParseStringToLogRecord)
>>     .filter(record => isGoodRecord(record))
>>     .map(record => record.url)
>>   })
>> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
>> moving window, 28 will probably help in parallelism
>>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>>   .mapPartitions(iter => {
>>     iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
>> 1000).iterator
>>   }, true)
>>   .foreachRDD((latestRDD, rddTime) => {
>>       printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
>> record._1)).sortByKey(false).take(1000))
>>   })
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>> Questions:
>>
>> a) I used #2 but I found that I couldn't control how many executors will
>> be actually fetching from Kafka. How do I keep a balance of executors which
>> receive data from Kafka and which process data? Do they keep changing for
>> every batch?
>>
>> b) Now I am trying to use #1 creating multiple DStreams, filtering them
>> and then doing a union. I don't understand why would the number of events
>> processed per 120 seconds batch will change drastically. PFA the events/sec
>> graph while running with 1 receiver. How to debug this?
>>
>> c) What will be the most suitable method to integrate with Kafka from
>> above 3? Any recommendations for getting maximum performance, running the
>> streaming application reliably in production environment?
>>
>> --
>> Thanks
>> Jatin Kumar
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>

Reply via email to