Thanks Supreeth and Shahbaz. I will try adding
spark.streaming.kafka.maxRatePerPartition.

Hi Shahbaz,

Please see comments, inline:


   - Which version of Spark you are using. ==> *1.5.2*
   - How big is the Kafka Cluster ==> *2 brokers*
   - What is the Message Size and type.==>
*String, 9,550 bytes (around) *
   - How big is the spark cluster (How many executors ,How many cores Per
   Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
   - What does your Spark Job looks like ==>


   val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
     ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)


         val parsedStream = inputStream
           .map(line => {
             val splitLines = line.split(",")
             (splitLines(1), splitLines.slice(2,
splitLines.length).map((_.trim.toLong)))
           })

         val state: DStream[(String, Array[Long])] =
parsedStream.updateStateByKey(
           (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
             prev.map(_ +: current).orElse(Some(current))
               .flatMap(as => Try(as.map(BDV(_)).reduce(_ +
_).toArray).toOption)
           })
         state.checkpoint(Duration(25000))
         state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
         ssc
       }


spark.streaming.backpressure.enabled set it to true and try?
 ==>


*yes, i had enabled it.*
Regards,
~Vinti

On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz <shahzadh...@gmail.com> wrote:

> Hello,
>
>    - Which version of Spark you are using.
>    - How big is the Kafka Cluster
>    - What is the Message Size and type.
>    - How big is the spark cluster (How many executors ,How many cores Per
>    Executor)
>    - What does your Spark Job looks like .
>
> spark.streaming.backpressure.enabled set it to true and try?
>
>
> Regards,
> Shahbaz
> +91-9986850670
>
> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth <supreeth....@gmail.com> wrote:
>
>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help
>> control the number of messages read from Kafka per partition on the spark
>> streaming consumer.
>>
>> -S
>>
>>
>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari <vinti.u...@gmail.com>
>> wrote:
>>
>> Hello,
>>
>> I am trying to figure out why my kafka+spark job is running slow. I found
>> that spark is consuming all the messages out of kafka into a single batch
>> itself and not sending any messages to the other batches.
>>
>> 2016/03/05 21:57:05
>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243825000>
>> 0 events - - queued 2016/03/05 21:57:00
>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243820000>
>> 0 events - - queued 2016/03/05 21:56:55
>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243815000>
>> 0 events - - queued 2016/03/05 21:56:50
>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243810000>
>> 0 events - - queued 2016/03/05 21:56:45
>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243805000>
>> 0 events - - queued 2016/03/05 21:56:40
>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243800000>
>> 4039573 events 6 ms - processing
>>
>> Does anyone know how this behavior can be changed so that the number of
>> messages are load balanced across all the batches?
>>
>> Thanks,
>> Vinti
>>
>>
>

Reply via email to