In my experience I would try the following
I use the standalone cluster manager. Each app gets it own performance web page . The streaming tab is really helpful. If processing time is greater than then your mini batch length you are going to run into performance problems Use the ³stages² tab to figure out where the bottleneck is In our experience we where working with large # of empty partitions. Reparation solved our s3 output problem Your milage may vary andy From: Todd Nist <tsind...@gmail.com> Date: Thursday, March 10, 2016 at 6:03 AM To: Vinti Maheshwari <vinti.u...@gmail.com> Cc: "user @spark" <user@spark.apache.org> Subject: Re: Spark Streaming, very slow processing and increasing scheduling delay of kafka input stream > Hi Vinti, > > All of your tasks are failing based on the screen shots provided. > > I think a few more details would be helpful. Is this YARN or a Standalone > cluster? How much overall memory is on your cluster? On each machine where > workers and executors are running? Are you using the Direct > (KafkaUtils.createDirectStream) or Receiver (KafkaUtils.createStream)? > > You may find this discussion of value on SO: > http://stackoverflow.com/questions/28901123/org-apache-spark-shuffle-metadataf > etchfailedexception-missing-an-output-locatio > > -Todd > > On Mon, Mar 7, 2016 at 5:52 PM, Vinti Maheshwari <vinti.u...@gmail.com> wrote: >> Hi, >> >> My spark-streaming program seems very slow. I am using Ambari for cluster >> setup and i am using Kafka for data input. >> I tried to use batch size 2 secs and check pointing duration 10 secs. But as >> i was seeing scheduling delay was keep increasing so i tried increasing the >> batch size to 5 and then 10 secs. But it seems noting changed in respect of >> performance. >> >> My program is doing two tasks: >> >> 1) Data aggregation >> >> 2) Data insertion into Hbase >> >> Action which took maximum time, when i called foreachRDD on Dstream object >> (state). >> state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) >> >> >> Program sample input coming from kafka: >> test_id, file1, 1,1,1,1,1 >> >> Code snippets: >> val parsedStream = inputStream >> .map(line => { >> val splitLines = line.split(",") >> (splitLines(1), splitLines.slice(2, >> splitLines.length).map((_.trim.toLong))) >> }) >> val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey( >> (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { >> prev.map(_ +: current).orElse(Some(current)) >> .flatMap(as => Try(as.map(BDV(_)).reduce(_ + >> _).toArray).toOption) >> }) >> state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) >> >> >> >> object Blaher { >> def blah(tup: (String, Array[Long])) { >> val hConf = HBaseConfiguration.create() >> ------ >> val hTable = new HTable(hConf, tableName) >> val thePut = new Put(Bytes.toBytes("file_data")) >> thePut.add(Bytes.toBytes("file_counts"), Bytes.toBytes(tup._1), >> Bytes.toBytes(tup._2.toList.toString)) >> new ImmutableBytesWritable(Bytes.toBytes("file_data")) >> >> hTable.put(thePut) >> } >> } >> >> >> My Cluster Specifications: >> 16 executors ( 1 core each and 2g memory) >> I have attached some screenshots of running execution. >> >> Anyone has idea what changes should i do to speedup the processing? >> >> Thanks & Regards, >> Vinti >> >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >