Would using mapPartitions instead of map help here? ~Pratik
On Tue, Mar 1, 2016 at 10:07 AM Cody Koeninger <c...@koeninger.org> wrote: > You don't need an equal number of executor cores to partitions. An > executor can and will work on multiple partitions within a batch, one after > the other. The real issue is whether you are able to keep your processing > time under your batch time, so that delay doesn't increase. > > On Tue, Mar 1, 2016 at 11:59 AM, Jatin Kumar <jku...@rocketfuelinc.com> > wrote: > >> Thanks Cody! >> >> I understand what you said and if I am correct it will be using 224 >> executor cores just for fetching + stage-1 processing of 224 partitions. I >> will obviously need more cores for processing further stages and fetching >> next batch. >> >> I will start with higher number of executor cores and see how it goes. >> >> -- >> Thanks >> Jatin Kumar | Rocket Scientist >> +91-7696741743 m >> >> On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> > "How do I keep a balance of executors which receive data from Kafka >>> and which process data" >>> >>> I think you're misunderstanding how the direct stream works. The >>> executor which receives data is also the executor which processes data, >>> there aren't separate receivers. If it's a single stage worth of work >>> (e.g. straight map / filter), the processing of a given partition is going >>> to be done by the executor that read it from kafka. If you do something >>> involving a shuffle (e.g. reduceByKey), other executors will do additional >>> processing. The question of which executor works on which tasks is up to >>> the scheduler (and getPreferredLocations, which only matters if you're >>> running spark on the same nodes as kafka) >>> >>> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar < >>> jku...@rocketfuelinc.com.invalid> wrote: >>> >>>> Hello all, >>>> >>>> I see that there are as of today 3 ways one can read from Kafka in >>>> spark streaming: >>>> 1. KafkaUtils.createStream() (here >>>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>) >>>> 2. KafkaUtils.createDirectStream() (here >>>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>) >>>> 3. Kafka-spark-consumer (here >>>> <https://github.com/dibbhatt/kafka-spark-consumer>) >>>> >>>> My spark streaming application has to read from 1 kafka topic with >>>> around 224 partitions, consuming data at around 150MB/s (~90,000 >>>> messages/sec) which reduces to around 3MB/s (~1400 messages/sec) after >>>> filtering. After filtering I need to maintain top 10000 URL counts. I don't >>>> really care about exactly once semantics as I am interested in rough >>>> estimate. >>>> >>>> Code: >>>> >>>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false") >>>> sparkConf.setAppName("KafkaReader") >>>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, >>>> createStreamingContext) >>>> >>>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap >>>> val kafkaParams = Map[String, String]( >>>> "metadata.broker.list" -> "kafka.server.ip:9092", >>>> "group.id" -> consumer_group >>>> ) >>>> >>>> val lineStreams = (1 to N).map{ _ => >>>> KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( >>>> ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2) >>>> } >>>> >>>> ssc.union( >>>> lineStreams.map(stream => { >>>> stream.map(ParseStringToLogRecord) >>>> .filter(record => isGoodRecord(record)) >>>> .map(record => record.url) >>>> }) >>>> ).window(Seconds(120), Seconds(120)) // 2 Minute window >>>> .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute >>>> moving window, 28 will probably help in parallelism >>>> .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD) >>>> .mapPartitions(iter => { >>>> iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, >>>> 1000).iterator >>>> }, true) >>>> .foreachRDD((latestRDD, rddTime) => { >>>> printTopFromRDD(rddTime, latestRDD.map(record => (record._2, >>>> record._1)).sortByKey(false).take(1000)) >>>> }) >>>> >>>> ssc.start() >>>> ssc.awaitTermination() >>>> >>>> Questions: >>>> >>>> a) I used #2 but I found that I couldn't control how many executors >>>> will be actually fetching from Kafka. How do I keep a balance of executors >>>> which receive data from Kafka and which process data? Do they keep changing >>>> for every batch? >>>> >>>> b) Now I am trying to use #1 creating multiple DStreams, filtering them >>>> and then doing a union. I don't understand why would the number of events >>>> processed per 120 seconds batch will change drastically. PFA the events/sec >>>> graph while running with 1 receiver. How to debug this? >>>> >>>> c) What will be the most suitable method to integrate with Kafka from >>>> above 3? Any recommendations for getting maximum performance, running the >>>> streaming application reliably in production environment? >>>> >>>> -- >>>> Thanks >>>> Jatin Kumar >>>> >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>> >>> >>> >> >