I have 2 machines in my cluster with the below specifications: 128 GB RAM and 8 cores machine
Regards, ~Vinti On Sun, Mar 6, 2016 at 7:54 AM, Vinti Maheshwari <vinti.u...@gmail.com> wrote: > Thanks Supreeth and Shahbaz. I will try adding > spark.streaming.kafka.maxRatePerPartition. > > Hi Shahbaz, > > Please see comments, inline: > > > - Which version of Spark you are using. ==> *1.5.2* > - How big is the Kafka Cluster ==> *2 brokers* > - What is the Message Size and type.==> > *String, 9,550 bytes (around) * > - How big is the spark cluster (How many executors ,How many cores Per > Executor)==>* 2 Nodes, 16 executors, 1 core per executor* > - What does your Spark Job looks like ==> > > > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder]( > ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2) > > > val parsedStream = inputStream > .map(line => { > val splitLines = line.split(",") > (splitLines(1), splitLines.slice(2, > splitLines.length).map((_.trim.toLong))) > }) > > val state: DStream[(String, Array[Long])] = > parsedStream.updateStateByKey( > (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { > prev.map(_ +: current).orElse(Some(current)) > .flatMap(as => Try(as.map(BDV(_)).reduce(_ + > _).toArray).toOption) > }) > state.checkpoint(Duration(25000)) > state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase > ssc > } > > > spark.streaming.backpressure.enabled set it to true and try? > ==> > > > *yes, i had enabled it.* > Regards, > ~Vinti > > On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz <shahzadh...@gmail.com> wrote: > >> Hello, >> >> - Which version of Spark you are using. >> - How big is the Kafka Cluster >> - What is the Message Size and type. >> - How big is the spark cluster (How many executors ,How many cores >> Per Executor) >> - What does your Spark Job looks like . >> >> spark.streaming.backpressure.enabled set it to true and try? >> >> >> Regards, >> Shahbaz >> +91-9986850670 >> >> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth <supreeth....@gmail.com> wrote: >> >>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help >>> control the number of messages read from Kafka per partition on the spark >>> streaming consumer. >>> >>> -S >>> >>> >>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari <vinti.u...@gmail.com> >>> wrote: >>> >>> Hello, >>> >>> I am trying to figure out why my kafka+spark job is running slow. I >>> found that spark is consuming all the messages out of kafka into a single >>> batch itself and not sending any messages to the other batches. >>> >>> 2016/03/05 21:57:05 >>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243825000> >>> 0 events - - queued 2016/03/05 21:57:00 >>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243820000> >>> 0 events - - queued 2016/03/05 21:56:55 >>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243815000> >>> 0 events - - queued 2016/03/05 21:56:50 >>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243810000> >>> 0 events - - queued 2016/03/05 21:56:45 >>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243805000> >>> 0 events - - queued 2016/03/05 21:56:40 >>> <http://ttsv-lab-vmdb-02.englab.juniper.net:8088/proxy/application_1457242523248_0003/streaming/batch?id=1457243800000> >>> 4039573 events 6 ms - processing >>> >>> Does anyone know how this behavior can be changed so that the number of >>> messages are load balanced across all the batches? >>> >>> Thanks, >>> Vinti >>> >>> >> >