I have 2 machines in my cluster with the below specifications:
128 GB RAM and 8 cores machine

Regards,
~Vinti

On Sun, Mar 6, 2016 at 7:54 AM, Vinti Maheshwari <vinti.u...@gmail.com>
wrote:

> Thanks Supreeth and Shahbaz. I will try adding
> spark.streaming.kafka.maxRatePerPartition.
>
> Hi Shahbaz,
>
> Please see comments, inline:
>
>
>    - Which version of Spark you are using. ==> *1.5.2*
>    - How big is the Kafka Cluster ==> *2 brokers*
>    - What is the Message Size and type.==>
> *String, 9,550 bytes (around) *
>    - How big is the spark cluster (How many executors ,How many cores Per
>    Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
>    - What does your Spark Job looks like ==>
>
>
>    val messages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](
>      ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)
>
>
>          val parsedStream = inputStream
>            .map(line => {
>              val splitLines = line.split(",")
>              (splitLines(1), splitLines.slice(2, 
> splitLines.length).map((_.trim.toLong)))
>            })
>
>          val state: DStream[(String, Array[Long])] = 
> parsedStream.updateStateByKey(
>            (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>              prev.map(_ +: current).orElse(Some(current))
>                .flatMap(as => Try(as.map(BDV(_)).reduce(_ + 
> _).toArray).toOption)
>            })
>          state.checkpoint(Duration(25000))
>          state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
>          ssc
>        }
>
>
> spark.streaming.backpressure.enabled set it to true and try?
>  ==>
>
>
> *yes, i had enabled it.*
> Regards,
> ~Vinti
>
> On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz <shahzadh...@gmail.com> wrote:
>
>> Hello,
>>
>>    - Which version of Spark you are using.
>>    - How big is the Kafka Cluster
>>    - What is the Message Size and type.
>>    - How big is the spark cluster (How many executors ,How many cores
>>    Per Executor)
>>    - What does your Spark Job looks like .
>>
>> spark.streaming.backpressure.enabled set it to true and try?
>>
>>
>> Regards,
>> Shahbaz
>> +91-9986850670
>>
>> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth <supreeth....@gmail.com> wrote:
>>
>>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help
>>> control the number of messages read from Kafka per partition on the spark
>>> streaming consumer.
>>>
>>> -S
>>>
>>>
>>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>>> wrote:
>>>
>>> Hello,
>>>
>>> I am trying to figure out why my kafka+spark job is running slow. I
>>> found that spark is consuming all the messages out of kafka into a single
>>> batch itself and not sending any messages to the other batches.
>>>
>>> 2016/03/05 21:57:05
>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243825000>
>>> 0 events - - queued 2016/03/05 21:57:00
>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243820000>
>>> 0 events - - queued 2016/03/05 21:56:55
>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243815000>
>>> 0 events - - queued 2016/03/05 21:56:50
>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243810000>
>>> 0 events - - queued 2016/03/05 21:56:45
>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243805000>
>>> 0 events - - queued 2016/03/05 21:56:40
>>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243800000>
>>> 4039573 events 6 ms - processing
>>>
>>> Does anyone know how this behavior can be changed so that the number of
>>> messages are load balanced across all the batches?
>>>
>>> Thanks,
>>> Vinti
>>>
>>>
>>
>

Reply via email to