Thanks Cody! I understand what you said and if I am correct it will be using 224 executor cores just for fetching + stage-1 processing of 224 partitions. I will obviously need more cores for processing further stages and fetching next batch.
I will start with higher number of executor cores and see how it goes. -- Thanks Jatin Kumar | Rocket Scientist +91-7696741743 m On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger <c...@koeninger.org> wrote: > > "How do I keep a balance of executors which receive data from Kafka and > which process data" > > I think you're misunderstanding how the direct stream works. The executor > which receives data is also the executor which processes data, there aren't > separate receivers. If it's a single stage worth of work (e.g. straight > map / filter), the processing of a given partition is going to be done by > the executor that read it from kafka. If you do something involving a > shuffle (e.g. reduceByKey), other executors will do additional processing. > The question of which executor works on which tasks is up to the scheduler > (and getPreferredLocations, which only matters if you're running spark on > the same nodes as kafka) > > On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar < > jku...@rocketfuelinc.com.invalid> wrote: > >> Hello all, >> >> I see that there are as of today 3 ways one can read from Kafka in spark >> streaming: >> 1. KafkaUtils.createStream() (here >> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>) >> 2. KafkaUtils.createDirectStream() (here >> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>) >> 3. Kafka-spark-consumer (here >> <https://github.com/dibbhatt/kafka-spark-consumer>) >> >> My spark streaming application has to read from 1 kafka topic with around >> 224 partitions, consuming data at around 150MB/s (~90,000 messages/sec) >> which reduces to around 3MB/s (~1400 messages/sec) after filtering. After >> filtering I need to maintain top 10000 URL counts. I don't really care >> about exactly once semantics as I am interested in rough estimate. >> >> Code: >> >> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false") >> sparkConf.setAppName("KafkaReader") >> val ssc = StreamingContext.getOrCreate(kCheckPointDir, >> createStreamingContext) >> >> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap >> val kafkaParams = Map[String, String]( >> "metadata.broker.list" -> "kafka.server.ip:9092", >> "group.id" -> consumer_group >> ) >> >> val lineStreams = (1 to N).map{ _ => >> KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( >> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2) >> } >> >> ssc.union( >> lineStreams.map(stream => { >> stream.map(ParseStringToLogRecord) >> .filter(record => isGoodRecord(record)) >> .map(record => record.url) >> }) >> ).window(Seconds(120), Seconds(120)) // 2 Minute window >> .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute >> moving window, 28 will probably help in parallelism >> .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD) >> .mapPartitions(iter => { >> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, >> 1000).iterator >> }, true) >> .foreachRDD((latestRDD, rddTime) => { >> printTopFromRDD(rddTime, latestRDD.map(record => (record._2, >> record._1)).sortByKey(false).take(1000)) >> }) >> >> ssc.start() >> ssc.awaitTermination() >> >> Questions: >> >> a) I used #2 but I found that I couldn't control how many executors will >> be actually fetching from Kafka. How do I keep a balance of executors which >> receive data from Kafka and which process data? Do they keep changing for >> every batch? >> >> b) Now I am trying to use #1 creating multiple DStreams, filtering them >> and then doing a union. I don't understand why would the number of events >> processed per 120 seconds batch will change drastically. PFA the events/sec >> graph while running with 1 receiver. How to debug this? >> >> c) What will be the most suitable method to integrate with Kafka from >> above 3? Any recommendations for getting maximum performance, running the >> streaming application reliably in production environment? >> >> -- >> Thanks >> Jatin Kumar >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > >