Would using mapPartitions instead of map help here?

~Pratik

On Tue, Mar 1, 2016 at 10:07 AM Cody Koeninger <c...@koeninger.org> wrote:

> You don't need an equal number of executor cores to partitions.  An
> executor can and will work on multiple partitions within a batch, one after
> the other.  The real issue is whether you are able to keep your processing
> time under your batch time, so that delay doesn't increase.
>
> On Tue, Mar 1, 2016 at 11:59 AM, Jatin Kumar <jku...@rocketfuelinc.com>
> wrote:
>
>> Thanks Cody!
>>
>> I understand what you said and if I am correct it will be using 224
>> executor cores just for fetching + stage-1 processing of 224 partitions. I
>> will obviously need more cores for processing further stages and fetching
>> next batch.
>>
>> I will start with higher number of executor cores and see how it goes.
>>
>> --
>> Thanks
>> Jatin Kumar | Rocket Scientist
>> +91-7696741743 m
>>
>> On Tue, Mar 1, 2016 at 9:07 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> > "How do I keep a balance of executors which receive data from Kafka
>>> and which process data"
>>>
>>> I think you're misunderstanding how the direct stream works.  The
>>> executor which receives data is also the executor which processes data,
>>> there aren't separate receivers.  If it's a single stage worth of work
>>> (e.g. straight map / filter), the processing of a given partition is going
>>> to be done by the executor that read it from kafka.  If you do something
>>> involving a shuffle (e.g. reduceByKey), other executors will do additional
>>> processing.  The question of which executor works on which tasks is up to
>>> the scheduler (and getPreferredLocations, which only matters if you're
>>> running spark on the same nodes as kafka)
>>>
>>> On Tue, Mar 1, 2016 at 2:36 AM, Jatin Kumar <
>>> jku...@rocketfuelinc.com.invalid> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I see that there are as of today 3 ways one can read from Kafka in
>>>> spark streaming:
>>>> 1. KafkaUtils.createStream() (here
>>>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
>>>> 2. KafkaUtils.createDirectStream() (here
>>>> <https://spark.apache.org/docs/1.3.0/streaming-kafka-integration.html>)
>>>> 3. Kafka-spark-consumer (here
>>>> <https://github.com/dibbhatt/kafka-spark-consumer>)
>>>>
>>>> My spark streaming application has to read from 1 kafka topic with
>>>> around 224 partitions, consuming data at around 150MB/s (~90,000
>>>> messages/sec) which reduces to around 3MB/s (~1400 messages/sec) after
>>>> filtering. After filtering I need to maintain top 10000 URL counts. I don't
>>>> really care about exactly once semantics as I am interested in rough
>>>> estimate.
>>>>
>>>> Code:
>>>>
>>>> sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
>>>> sparkConf.setAppName("KafkaReader")
>>>> val ssc = StreamingContext.getOrCreate(kCheckPointDir, 
>>>> createStreamingContext)
>>>>
>>>> val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>>> val kafkaParams = Map[String, String](
>>>>   "metadata.broker.list" -> "kafka.server.ip:9092",
>>>>   "group.id" -> consumer_group
>>>> )
>>>>
>>>> val lineStreams = (1 to N).map{ _ =>
>>>>   KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
>>>>     ssc, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK).map(_._2)
>>>> }
>>>>
>>>> ssc.union(
>>>>   lineStreams.map(stream => {
>>>>   stream.map(ParseStringToLogRecord)
>>>>     .filter(record => isGoodRecord(record))
>>>>     .map(record => record.url)
>>>>   })
>>>> ).window(Seconds(120), Seconds(120))  // 2 Minute window
>>>>   .countByValueAndWindow(Seconds(1800), Seconds(120), 28) // 30 Minute 
>>>> moving window, 28 will probably help in parallelism
>>>>   .filter(urlCountTuple => urlCountTuple._2 > MIN_COUNT_THRESHHOLD)
>>>>   .mapPartitions(iter => {
>>>>     iter.toArray.sortWith((r1, r2) => r1._2.compare(r2._2) < 0).slice(0, 
>>>> 1000).iterator
>>>>   }, true)
>>>>   .foreachRDD((latestRDD, rddTime) => {
>>>>       printTopFromRDD(rddTime, latestRDD.map(record => (record._2, 
>>>> record._1)).sortByKey(false).take(1000))
>>>>   })
>>>>
>>>> ssc.start()
>>>> ssc.awaitTermination()
>>>>
>>>> Questions:
>>>>
>>>> a) I used #2 but I found that I couldn't control how many executors
>>>> will be actually fetching from Kafka. How do I keep a balance of executors
>>>> which receive data from Kafka and which process data? Do they keep changing
>>>> for every batch?
>>>>
>>>> b) Now I am trying to use #1 creating multiple DStreams, filtering them
>>>> and then doing a union. I don't understand why would the number of events
>>>> processed per 120 seconds batch will change drastically. PFA the events/sec
>>>> graph while running with 1 receiver. How to debug this?
>>>>
>>>> c) What will be the most suitable method to integrate with Kafka from
>>>> above 3? Any recommendations for getting maximum performance, running the
>>>> streaming application reliably in production environment?
>>>>
>>>> --
>>>> Thanks
>>>> Jatin Kumar
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>
>>>
>>
>

Reply via email to