In my experience I would try the following

I use the standalone cluster manager. Each app gets it own performance web
page . The streaming tab is really helpful. If processing time is greater
than then your mini batch length you are going to run into performance
problems

Use the ³stages² tab to figure out where the bottleneck is

In our experience we where working with large # of empty partitions.
Reparation solved our s3 output problem

Your milage may vary

andy

From:  Todd Nist <tsind...@gmail.com>
Date:  Thursday, March 10, 2016 at 6:03 AM
To:  Vinti Maheshwari <vinti.u...@gmail.com>
Cc:  "user @spark" <user@spark.apache.org>
Subject:  Re: Spark Streaming, very slow processing and increasing
scheduling delay of kafka input stream

> Hi Vinti,
> 
> All of your tasks are failing based on the screen shots provided.
> 
> I think a few more details would be helpful.  Is this YARN or a Standalone
> cluster?  How much overall memory is on your cluster?  On each machine where
> workers and executors are running?  Are you using the Direct
> (KafkaUtils.createDirectStream) or Receiver (KafkaUtils.createStream)?
> 
> You may find this discussion of value on SO:
> http://stackoverflow.com/questions/28901123/org-apache-spark-shuffle-metadataf
> etchfailedexception-missing-an-output-locatio
> 
> -Todd
> 
> On Mon, Mar 7, 2016 at 5:52 PM, Vinti Maheshwari <vinti.u...@gmail.com> wrote:
>> Hi,
>> 
>> My spark-streaming program seems very slow. I am using Ambari for cluster
>> setup and i am using Kafka for data input.
>> I tried to use batch size 2 secs and check pointing duration 10 secs. But as
>> i was seeing scheduling delay was keep increasing so i tried increasing the
>> batch size to 5 and then 10 secs. But it seems noting changed in respect of
>> performance.
>> 
>> My program is doing two tasks:
>> 
>> 1) Data aggregation
>> 
>> 2) Data insertion into Hbase
>> 
>> Action which took maximum time, when i called foreachRDD on Dstream object
>> (state). 
>> state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>> 
>> 
>> Program sample input coming from kafka:
>> test_id, file1, 1,1,1,1,1
>> 
>> Code snippets:
>> val parsedStream = inputStream
>>   .map(line => {
>>     val splitLines = line.split(",")
>>     (splitLines(1), splitLines.slice(2,
>> splitLines.length).map((_.trim.toLong)))
>>   })      
>> val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
>>         (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>>           prev.map(_ +: current).orElse(Some(current))
>>             .flatMap(as => Try(as.map(BDV(_)).reduce(_ +
>> _).toArray).toOption)
>> })
>> state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
>> 
>> 
>> 
>> object Blaher {
>>   def blah(tup: (String, Array[Long])) {
>>     val hConf = HBaseConfiguration.create()
>>     ------
>>     val hTable = new HTable(hConf, tableName)
>>     val thePut = new Put(Bytes.toBytes("file_data"))
>>     thePut.add(Bytes.toBytes("file_counts"), Bytes.toBytes(tup._1),
>> Bytes.toBytes(tup._2.toList.toString))
>>     new ImmutableBytesWritable(Bytes.toBytes("file_data"))
>> 
>>     hTable.put(thePut)
>>   }
>> }
>> 
>> 
>> My Cluster Specifications:
>> 16 executors ( 1 core each and 2g memory)
>> I have attached some screenshots of running execution.
>> 
>> Anyone has idea what changes should i do to speedup the processing?
>> 
>> Thanks & Regards,
>> Vinti
>> 
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Reply via email to