Re: Spark driver not reusing HConnection
Corrosponding HBase bug: https://issues.apache.org/jira/browse/HBASE-12629 On Wed, Nov 23, 2016 at 1:55 PM, Mukesh Jha <me.mukesh@gmail.com> wrote: > The solution is to disable region size caluculation check. > > hbase.regionsizecalculator.enable: false > > On Sun, Nov 20, 2016 at 9:29 PM, Mukesh Jha <me.mukesh@gmail.com> > wrote: > >> Any ideas folks? >> >> On Fri, Nov 18, 2016 at 3:37 PM, Mukesh Jha <me.mukesh@gmail.com> >> wrote: >> >>> Hi >>> >>> I'm accessing multiple regions (~5k) of an HBase table using spark's >>> newAPIHadoopRDD. But the driver is trying to calculate the region size >>> of all the regions. >>> It is not even reusing the hconnection and creting a new connection for >>> every request (see below) which is taking lots of time. >>> >>> Is there a better approach to do this? >>> >>> >>> 8 Nov 2016 22:25:22,759] [INFO Driver] RecoverableZooKeeper: Process >>> identifier=*hconnection-0x1e7824af* connecting to ZooKeeper ensemble= >>> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181 >>> [18 Nov 2016 22:25:22,759] [INFO Driver] ZooKeeper: Initiating client >>> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181, >>> hbase28.cloud.com:2181 sessionTimeout=6 >>> watcher=hconnection-0x1e7824af0x0, quorum=hbase19.cloud.com:2181, >>> hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase >>> [18 Nov 2016 22:25:22,761] [INFO Driver-SendThread(hbase24.clou >>> d.com:2181)] ClientCnxn: Opening socket connection to server >>> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate >>> using SASL (unknown error) >>> [18 Nov 2016 22:25:22,763] [INFO Driver-SendThread(hbase24.clou >>> d.com:2181)] ClientCnxn: Socket connection established, initiating >>> session, client: /10.193.138.145:47891, server: >>> hbase24.cloud.com/10.193.150.217:2181 >>> [18 Nov 2016 22:25:22,766] [INFO Driver-SendThread(hbase24.clou >>> d.com:2181)] ClientCnxn: Session establishment complete on server >>> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e95, >>> negotiated timeout = 6 >>> [18 Nov 2016 22:25:22,766] [INFO Driver] RegionSizeCalculator: >>> Calculating region sizes for table "message". >>> [18 Nov 2016 22:25:27,867] [INFO Driver] >>> ConnectionManager$HConnectionImplementation: >>> Closing master protocol: MasterService >>> [18 Nov 2016 22:25:27,868] [INFO Driver] >>> ConnectionManager$HConnectionImplementation: >>> Closing zookeeper sessionid=0x2564f6f013e0e95 >>> [18 Nov 2016 22:25:27,869] [INFO Driver] ZooKeeper: Session: >>> 0x2564f6f013e0e95 closed >>> [18 Nov 2016 22:25:27,869] [INFO Driver-EventThread] ClientCnxn: >>> EventThread shut down >>> [18 Nov 2016 22:25:27,880] [INFO Driver] RecoverableZooKeeper: Process >>> identifier=*hconnection-0x6a8a1efa* connecting to ZooKeeper ensemble= >>> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181 >>> [18 Nov 2016 22:25:27,880] [INFO Driver] ZooKeeper: Initiating client >>> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181, >>> hbase28.cloud.com:2181 sessionTimeout=6 >>> watcher=hconnection-0x6a8a1efa0x0, quorum=hbase19.cloud.com:2181, >>> hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase >>> [18 Nov 2016 22:25:27,883] [INFO Driver-SendThread(hbase24.clou >>> d.com:2181)] ClientCnxn: Opening socket connection to server >>> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate >>> using SASL (unknown error) >>> [18 Nov 2016 22:25:27,885] [INFO Driver-SendThread(hbase24.clou >>> d.com:2181)] ClientCnxn: Socket connection established, initiating >>> session, client: /10.193.138.145:47894, server: >>> hbase24.cloud.com/10.193.150.217:2181 >>> [18 Nov 2016 22:25:27,887] [INFO Driver-SendThread(hbase24.clou >>> d.com:2181)] ClientCnxn: Session establishment complete on server >>> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e97, >>> negotiated timeout = 6 >>> [18 Nov 2016 22:25:27,888] [INFO Driver] RegionSizeCalculator: >>> Calculating region sizes for table "message". >>> >>> >>> -- >>> Thanks & Regards, >>> >>> *Mukesh Jha <me.mukesh@gmail.com>* >>> >> >> >> >> -- >> >> >> Thanks & Regards, >> >> *Mukesh Jha <me.mukesh@gmail.com>* >> > > > > -- > > > Thanks & Regards, > > *Mukesh Jha <me.mukesh@gmail.com>* > -- Thanks & Regards, *Mukesh Jha <me.mukesh@gmail.com>*
Re: Spark driver not reusing HConnection
The solution is to disable region size caluculation check. hbase.regionsizecalculator.enable: false On Sun, Nov 20, 2016 at 9:29 PM, Mukesh Jha <me.mukesh@gmail.com> wrote: > Any ideas folks? > > On Fri, Nov 18, 2016 at 3:37 PM, Mukesh Jha <me.mukesh@gmail.com> > wrote: > >> Hi >> >> I'm accessing multiple regions (~5k) of an HBase table using spark's >> newAPIHadoopRDD. But the driver is trying to calculate the region size >> of all the regions. >> It is not even reusing the hconnection and creting a new connection for >> every request (see below) which is taking lots of time. >> >> Is there a better approach to do this? >> >> >> 8 Nov 2016 22:25:22,759] [INFO Driver] RecoverableZooKeeper: Process >> identifier=*hconnection-0x1e7824af* connecting to ZooKeeper ensemble= >> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181 >> [18 Nov 2016 22:25:22,759] [INFO Driver] ZooKeeper: Initiating client >> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181, >> hbase28.cloud.com:2181 sessionTimeout=6 >> watcher=hconnection-0x1e7824af0x0, quorum=hbase19.cloud.com:2181, >> hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase >> [18 Nov 2016 22:25:22,761] [INFO Driver-SendThread(hbase24.cloud.com:2181)] >> ClientCnxn: Opening socket connection to server >> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate >> using SASL (unknown error) >> [18 Nov 2016 22:25:22,763] [INFO Driver-SendThread(hbase24.cloud.com:2181)] >> ClientCnxn: Socket connection established, initiating session, client: / >> 10.193.138.145:47891, server: hbase24.cloud.com/10.193.150.217:2181 >> [18 Nov 2016 22:25:22,766] [INFO Driver-SendThread(hbase24.cloud.com:2181)] >> ClientCnxn: Session establishment complete on server >> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e95, >> negotiated timeout = 6 >> [18 Nov 2016 22:25:22,766] [INFO Driver] RegionSizeCalculator: >> Calculating region sizes for table "message". >> [18 Nov 2016 22:25:27,867] [INFO Driver] >> ConnectionManager$HConnectionImplementation: >> Closing master protocol: MasterService >> [18 Nov 2016 22:25:27,868] [INFO Driver] >> ConnectionManager$HConnectionImplementation: >> Closing zookeeper sessionid=0x2564f6f013e0e95 >> [18 Nov 2016 22:25:27,869] [INFO Driver] ZooKeeper: Session: >> 0x2564f6f013e0e95 closed >> [18 Nov 2016 22:25:27,869] [INFO Driver-EventThread] ClientCnxn: >> EventThread shut down >> [18 Nov 2016 22:25:27,880] [INFO Driver] RecoverableZooKeeper: Process >> identifier=*hconnection-0x6a8a1efa* connecting to ZooKeeper ensemble= >> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181 >> [18 Nov 2016 22:25:27,880] [INFO Driver] ZooKeeper: Initiating client >> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181, >> hbase28.cloud.com:2181 sessionTimeout=6 >> watcher=hconnection-0x6a8a1efa0x0, quorum=hbase19.cloud.com:2181, >> hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase >> [18 Nov 2016 22:25:27,883] [INFO Driver-SendThread(hbase24.cloud.com:2181)] >> ClientCnxn: Opening socket connection to server >> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate >> using SASL (unknown error) >> [18 Nov 2016 22:25:27,885] [INFO Driver-SendThread(hbase24.cloud.com:2181)] >> ClientCnxn: Socket connection established, initiating session, client: / >> 10.193.138.145:47894, server: hbase24.cloud.com/10.193.150.217:2181 >> [18 Nov 2016 22:25:27,887] [INFO Driver-SendThread(hbase24.cloud.com:2181)] >> ClientCnxn: Session establishment complete on server >> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e97, >> negotiated timeout = 6 >> [18 Nov 2016 22:25:27,888] [INFO Driver] RegionSizeCalculator: >> Calculating region sizes for table "message". >> >> >> -- >> Thanks & Regards, >> >> *Mukesh Jha <me.mukesh@gmail.com>* >> > > > > -- > > > Thanks & Regards, > > *Mukesh Jha <me.mukesh@gmail.com>* > -- Thanks & Regards, *Mukesh Jha <me.mukesh@gmail.com>*
Re: Spark driver not reusing HConnection
Any ideas folks? On Fri, Nov 18, 2016 at 3:37 PM, Mukesh Jha <me.mukesh@gmail.com> wrote: > Hi > > I'm accessing multiple regions (~5k) of an HBase table using spark's > newAPIHadoopRDD. But the driver is trying to calculate the region size of > all the regions. > It is not even reusing the hconnection and creting a new connection for > every request (see below) which is taking lots of time. > > Is there a better approach to do this? > > > 8 Nov 2016 22:25:22,759] [INFO Driver] RecoverableZooKeeper: Process > identifier=*hconnection-0x1e7824af* connecting to ZooKeeper ensemble= > hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181 > [18 Nov 2016 22:25:22,759] [INFO Driver] ZooKeeper: Initiating client > connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181, > hbase28.cloud.com:2181 sessionTimeout=6 watcher=hconnection-0x1e7824af0x0, > quorum=hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28 > .cloud.com:2181, baseZNode=/hbase > [18 Nov 2016 22:25:22,761] [INFO Driver-SendThread(hbase24.cloud.com:2181)] > ClientCnxn: Opening socket connection to server > hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate > using SASL (unknown error) > [18 Nov 2016 22:25:22,763] [INFO Driver-SendThread(hbase24.cloud.com:2181)] > ClientCnxn: Socket connection established, initiating session, client: / > 10.193.138.145:47891, server: hbase24.cloud.com/10.193.150.217:2181 > [18 Nov 2016 22:25:22,766] [INFO Driver-SendThread(hbase24.cloud.com:2181)] > ClientCnxn: Session establishment complete on server > hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e95, > negotiated timeout = 6 > [18 Nov 2016 22:25:22,766] [INFO Driver] RegionSizeCalculator: Calculating > region sizes for table "message". > [18 Nov 2016 22:25:27,867] [INFO Driver] > ConnectionManager$HConnectionImplementation: > Closing master protocol: MasterService > [18 Nov 2016 22:25:27,868] [INFO Driver] > ConnectionManager$HConnectionImplementation: > Closing zookeeper sessionid=0x2564f6f013e0e95 > [18 Nov 2016 22:25:27,869] [INFO Driver] ZooKeeper: Session: > 0x2564f6f013e0e95 closed > [18 Nov 2016 22:25:27,869] [INFO Driver-EventThread] ClientCnxn: > EventThread shut down > [18 Nov 2016 22:25:27,880] [INFO Driver] RecoverableZooKeeper: Process > identifier=*hconnection-0x6a8a1efa* connecting to ZooKeeper ensemble= > hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181 > [18 Nov 2016 22:25:27,880] [INFO Driver] ZooKeeper: Initiating client > connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181, > hbase28.cloud.com:2181 sessionTimeout=6 watcher=hconnection-0x6a8a1efa0x0, > quorum=hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28 > .cloud.com:2181, baseZNode=/hbase > [18 Nov 2016 22:25:27,883] [INFO Driver-SendThread(hbase24.cloud.com:2181)] > ClientCnxn: Opening socket connection to server > hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate > using SASL (unknown error) > [18 Nov 2016 22:25:27,885] [INFO Driver-SendThread(hbase24.cloud.com:2181)] > ClientCnxn: Socket connection established, initiating session, client: / > 10.193.138.145:47894, server: hbase24.cloud.com/10.193.150.217:2181 > [18 Nov 2016 22:25:27,887] [INFO Driver-SendThread(hbase24.cloud.com:2181)] > ClientCnxn: Session establishment complete on server > hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e97, > negotiated timeout = 6 > [18 Nov 2016 22:25:27,888] [INFO Driver] RegionSizeCalculator: Calculating > region sizes for table "message". > > > -- > Thanks & Regards, > > *Mukesh Jha <me.mukesh@gmail.com>* > -- Thanks & Regards, *Mukesh Jha <me.mukesh@gmail.com>*
Spark driver not reusing HConnection
Hi I'm accessing multiple regions (~5k) of an HBase table using spark's newAPIHadoopRDD. But the driver is trying to calculate the region size of all the regions. It is not even reusing the hconnection and creting a new connection for every request (see below) which is taking lots of time. Is there a better approach to do this? 8 Nov 2016 22:25:22,759] [INFO Driver] RecoverableZooKeeper: Process identifier=*hconnection-0x1e7824af* connecting to ZooKeeper ensemble= hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181 [18 Nov 2016 22:25:22,759] [INFO Driver] ZooKeeper: Initiating client connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181, hbase28.cloud.com:2181 sessionTimeout=6 watcher=hconnection-0x1e7824af0x0, quorum=hbase19.cloud.com:2181, hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase [18 Nov 2016 22:25:22,761] [INFO Driver-SendThread(hbase24.cloud.com:2181)] ClientCnxn: Opening socket connection to server hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate using SASL (unknown error) [18 Nov 2016 22:25:22,763] [INFO Driver-SendThread(hbase24.cloud.com:2181)] ClientCnxn: Socket connection established, initiating session, client: / 10.193.138.145:47891, server: hbase24.cloud.com/10.193.150.217:2181 [18 Nov 2016 22:25:22,766] [INFO Driver-SendThread(hbase24.cloud.com:2181)] ClientCnxn: Session establishment complete on server hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e95, negotiated timeout = 6 [18 Nov 2016 22:25:22,766] [INFO Driver] RegionSizeCalculator: Calculating region sizes for table "message". [18 Nov 2016 22:25:27,867] [INFO Driver] ConnectionManager$HConnectionImplementation: Closing master protocol: MasterService [18 Nov 2016 22:25:27,868] [INFO Driver] ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x2564f6f013e0e95 [18 Nov 2016 22:25:27,869] [INFO Driver] ZooKeeper: Session: 0x2564f6f013e0e95 closed [18 Nov 2016 22:25:27,869] [INFO Driver-EventThread] ClientCnxn: EventThread shut down [18 Nov 2016 22:25:27,880] [INFO Driver] RecoverableZooKeeper: Process identifier=*hconnection-0x6a8a1efa* connecting to ZooKeeper ensemble= hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181 [18 Nov 2016 22:25:27,880] [INFO Driver] ZooKeeper: Initiating client connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181, hbase28.cloud.com:2181 sessionTimeout=6 watcher=hconnection-0x6a8a1efa0x0, quorum=hbase19.cloud.com:2181, hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase [18 Nov 2016 22:25:27,883] [INFO Driver-SendThread(hbase24.cloud.com:2181)] ClientCnxn: Opening socket connection to server hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate using SASL (unknown error) [18 Nov 2016 22:25:27,885] [INFO Driver-SendThread(hbase24.cloud.com:2181)] ClientCnxn: Socket connection established, initiating session, client: / 10.193.138.145:47894, server: hbase24.cloud.com/10.193.150.217:2181 [18 Nov 2016 22:25:27,887] [INFO Driver-SendThread(hbase24.cloud.com:2181)] ClientCnxn: Session establishment complete on server hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e97, negotiated timeout = 6 [18 Nov 2016 22:25:27,888] [INFO Driver] RegionSizeCalculator: Calculating region sizes for table "message". -- Thanks & Regards, *Mukesh Jha <me.mukesh@gmail.com>*
Re: Spark kafka integration issues
Thanks for the reply Cody. I found the below article on the same, very helpful. Thanks for the details, much appreciated. http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ On Tue, Sep 13, 2016 at 8:14 PM, Cody Koeninger <c...@koeninger.org> wrote: > 1. see http://spark.apache.org/docs/latest/streaming-kafka- > integration.html#approach-2-direct-approach-no-receivers > look for HasOffsetRange. If you really want the info per-message > rather than per-partition, createRDD has an overload that takes a > messageHandler from MessageAndMetadata to whatever you need > > 2. createRDD takes type parameters for the key and value decoder, so > specify them there > > 3. you can use spark-streaming-kafka-0-8 against 0.9 or 0.10 brokers. > There is a spark-streaming-kafka-0-10 package with additional features > that only works on brokers 0.10 or higher. A pull request for > documenting it has been merged, but not deployed. > > On Tue, Sep 13, 2016 at 6:46 PM, Mukesh Jha <me.mukesh@gmail.com> > wrote: > > Hello fellow sparkers, > > > > I'm using spark to consume messages from kafka in a non streaming > fashion. > > I'm suing the using spark-streaming-kafka-0-8_2.10 & sparkv2.0to do the > > same. > > > > I have a few queries for the same, please get back if you guys have > clues on > > the same. > > > > 1) Is there anyway to get the have the topic and partition & offset > > information for each item from the KafkaRDD. I'm using the > > KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder] to > create > > my kafka RDD. > > 2) How to pass my custom Decoder instead of using the String or Byte > decoder > > are there any examples for the same? > > 3) is there a newer version to consumer from kafka-0.10 & kafka-0.9 > clusters > > > > -- > > Thanks & Regards, > > > > Mukesh Jha > -- Thanks & Regards, *Mukesh Jha <me.mukesh@gmail.com>*
Spark kafka integration issues
Hello fellow sparkers, I'm using spark to consume messages from kafka in a non streaming fashion. I'm suing the using spark-streaming-kafka-0-8_2.10 & sparkv2.0to do the same. I have a few queries for the same, please get back if you guys have clues on the same. 1) Is there anyway to get the have the topic and partition & offset information for each item from the KafkaRDD. I'm using the *KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder]* to create my kafka RDD. 2) How to pass my custom Decoder instead of using the String or Byte decoder are there any examples for the same? 3) is there a newer version to consumer from kafka-0.10 & kafka-0.9 clusters -- Thanks & Regards, *Mukesh Jha <me.mukesh@gmail.com>*
Re: how to spark streaming application start working on next batch before completing on previous batch .
Try setting *spark*.streaming.*concurrent*. *jobs* to number of concurrent jobs you want to run. On 15 Dec 2015 17:35, "ikmal"wrote: > The best practice is to set batch interval lesser than processing time. I'm > sure your application is suffering from constantly increasing of scheduling > delay. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/how-to-spark-streaming-application-start-working-on-next-batch-before-completing-on-previous-batch-tp25559p25707.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: how to spark streaming application start working on next batch before completing on previous batch .
Are the issues related to wal based KafkaReliableReceivers or with any receiver in general. Any insights will be helpful. On 16 Dec 2015 05:44, "Tathagata Das" <t...@databricks.com> wrote: > Just to be clear. spark.treaming.concurrentJobs is NOT officially > supported. There are issues with fault-tolerance and data loss if that is > set to more than 1. > > > > On Tue, Dec 15, 2015 at 9:19 AM, Mukesh Jha <me.mukesh@gmail.com> > wrote: > >> Try setting *spark*.streaming.*concurrent*. *jobs* to number of >> concurrent jobs you want to run. >> On 15 Dec 2015 17:35, "ikmal" <ikmal.s...@gmail.com> wrote: >> >>> The best practice is to set batch interval lesser than processing time. >>> I'm >>> sure your application is suffering from constantly increasing of >>> scheduling >>> delay. >>> >>> >>> >>> -- >>> View this message in context: >>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-spark-streaming-application-start-working-on-next-batch-before-completing-on-previous-batch-tp25559p25707.html >>> Sent from the Apache Spark User List mailing list archive at Nabble.com. >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >
Re: SparkStreaming failing with exception Could not compute split, block input
I'm streamin data from kafka topic using kafkautils doing some computation and writing records to hbase. Storage level is memory-and-disk-ser On 27 Feb 2015 16:20, Akhil Das ak...@sigmoidanalytics.com wrote: You could be hitting this issue https://issues.apache.org/jira/browse/SPARK-4516 Apart from that little more information about your job would be helpful. Thanks Best Regards On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Experts, My Spark Job is failing with below error. From the logs I can see that input-3-1424842351600 was added at 5:32:32 and was never purged out of memory. Also the available free memory for the executor is *2.1G*. Please help me figure out why executors cannot fetch this input. Txz for any help, Cheers. *Logs* 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added input-3-1424842351600 in memory on chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB) . . 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919 (size: 232.3 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751 (size: 291.4 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842351600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842355600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: SparkStreaming failing with exception Could not compute split, block input
Also my job is map only so there is no shuffle/reduce phase. On Fri, Feb 27, 2015 at 7:10 PM, Mukesh Jha me.mukesh@gmail.com wrote: I'm streamin data from kafka topic using kafkautils doing some computation and writing records to hbase. Storage level is memory-and-disk-ser On 27 Feb 2015 16:20, Akhil Das ak...@sigmoidanalytics.com wrote: You could be hitting this issue https://issues.apache.org/jira/browse/SPARK-4516 Apart from that little more information about your job would be helpful. Thanks Best Regards On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Experts, My Spark Job is failing with below error. From the logs I can see that input-3-1424842351600 was added at 5:32:32 and was never purged out of memory. Also the available free memory for the executor is *2.1G*. Please help me figure out why executors cannot fetch this input. Txz for any help, Cheers. *Logs* 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added input-3-1424842351600 in memory on chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB) . . 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919 (size: 232.3 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751 (size: 291.4 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842351600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842355600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: SparkStreaming failing with exception Could not compute split, block input
On Wed, Feb 25, 2015 at 8:09 PM, Mukesh Jha me.mukesh@gmail.com wrote: My application runs fine for ~3/4 hours and then hits this issue. On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Experts, My Spark Job is failing with below error. From the logs I can see that input-3-1424842351600 was added at 5:32:32 and was never purged out of memory. Also the available free memory for the executor is *2.1G*. Please help me figure out why executors cannot fetch this input. Txz for any help, Cheers. *Logs* 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added input-3-1424842351600 in memory on chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB) . . 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919 (size: 232.3 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751 (size: 291.4 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842351600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842355600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: SparkStreaming failing with exception Could not compute split, block input
My application runs fine for ~3/4 hours and then hits this issue. On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Experts, My Spark Job is failing with below error. From the logs I can see that input-3-1424842351600 was added at 5:32:32 and was never purged out of memory. Also the available free memory for the executor is *2.1G*. Please help me figure out why executors cannot fetch this input. Txz for any help, Cheers. *Logs* 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added input-3-1424842351600 in memory on chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB) . . 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919 (size: 232.3 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751 (size: 291.4 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842351600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842355600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
SparkStreaming failing with exception Could not compute split, block input
Hi Experts, My Spark Job is failing with below error. From the logs I can see that input-3-1424842351600 was added at 5:32:32 and was never purged out of memory. Also the available free memory for the executor is *2.1G*. Please help me figure out why executors cannot fetch this input. Txz for any help, Cheers. *Logs* 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added input-3-1424842351600 in memory on chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB) . . 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919 (size: 232.3 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751 (size: 291.4 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842351600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842355600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: spark streaming: stderr does not roll
I'm also facing the same issue. I tried the configurations but it seems the executors spark's log4j.properties seems to override the passed values, so you have to change /etc/spark/conf/log4j.properties. Let me know if any of you have managed to get this fixes programatically. I am planning to use logrotate to rotate these logs. On Thu, Nov 13, 2014 at 1:45 AM, Nguyen, Duc duc.ngu...@pearson.com wrote: I've also tried setting the aforementioned properties using System.setProperty() as well as on the command line while submitting the job using --conf key=value. All to no success. When I go to the Spark UI and click on that particular streaming job and then the Environment tab, I can see the properties are correctly set. But regardless of what I've tried, the stderr log file on the worker nodes does not roll and continues to grow...leading to a crash of the cluster once it claims 100% of disk. Has anyone else encountered this? Anyone? On Fri, Nov 7, 2014 at 3:35 PM, Nguyen, Duc duc.ngu...@pearson.com wrote: We are running spark streaming jobs (version 1.1.0). After a sufficient amount of time, the stderr file grows until the disk is full at 100% and crashes the cluster. I've read this https://github.com/apache/spark/pull/895 and also read this http://spark.apache.org/docs/latest/configuration.html#spark-streaming So I've tried testing with this in an attempt to get the stderr log file to roll. sparkConf.set(spark.executor.logs.rolling.strategy, size) .set(spark.executor.logs.rolling.size.maxBytes, 1024) .set(spark.executor.logs.rolling.maxRetainedFiles, 3) Yet it does not roll and continues to grow. Am I missing something obvious? thanks, Duc -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: Cannot access Spark web UI
My Hadoop version is Hadoop 2.5.0-cdh5.3.0 From the Driver logs [3] I can see that SparkUI started on a specified port, also my YARN app tracking URL[1] points to that port which is in turn getting redirected to the proxy URL[2] which gives me java.net.BindException: Cannot assign requested address. If there was a port conflict issue the sparkUI stark will have issues but that id not the case. [1] YARN: application_1424814313649_0006 spark-realtime-MessageStoreWriter SPARK ciuser root.ciuser RUNNING UNDEFINED 10% http://host21.cloud.com:44648 [2] ProxyURL: http://host28.cloud.com:8088/proxy/application_1424814313649_0006/ [3] LOGS: 15/02/25 04:25:02 INFO util.Utils: Successfully started service 'SparkUI' on port 44648. 15/02/25 04:25:02 INFO ui.SparkUI: Started SparkUI at http://host21.cloud.com:44648 15/02/25 04:25:02 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 15/02/25 04:25:02 INFO netty.NettyBlockTransferService: Server created on 41518 On Wed, Feb 18, 2015 at 3:15 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: It seems like that its not able to get a port it needs are you sure that the required port is available. In what logs did you find this error? On Wed, Feb 18, 2015 at 2:21 PM, Akhil Das ak...@sigmoidanalytics.com wrote: The error says Cannot assign requested address. This means that you need to use the correct address for one of your network interfaces or 0.0.0.0 to accept connections from all interfaces. Can you paste your spark-env.sh file and /etc/hosts file. Thanks Best Regards On Wed, Feb 18, 2015 at 2:06 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I am running a spark-streaming app inside YARN. I have Spark History server running as well (Do we need it running to access UI?). The app is running fine as expected but the Spark's web UI is not accessible. When I try to access the ApplicationMaster of the Yarn application I get the below error. This looks very similar to https://issues.apache.org/jira/browse/SPARK-5837 but instead of java.net.ConnectException: Connection refused I am getting java.net.BindException: Cannot assign requested address as shown below. Please let me know if you have faced / fixed this issue, any help is greatly appreciated. *Exception* HTTP ERROR 500 Problem accessing /proxy/application_1424161379156_0001/. Reason: Cannot assign requested address Caused by: java.net.BindException: Cannot assign requested address at java.net.PlainSocketImpl.socketBind(Native Method) at java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376) at java.net.Socket.bind(Socket.java:631) at java.net.Socket.init(Socket.java:423) at java.net.Socket.init(Socket.java:280) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80) at org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122) at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707) at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387) at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:188) at org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:345) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834) at org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84) at com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795) at com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163) at com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58) at com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118) at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113) at org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212) at org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109
Cannot access Spark web UI
:549) at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212) at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404) at org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410) at org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582) Powered by Jetty:// -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: Spark streaming app shutting down
Thanks for the info guys. For now I'm using the high level consumer i will give this one a try. As far as the queries are concerned, check pointing helps. I'm still no t sure whats the best way to gracefully stop the application in yarn cluster mode. On 5 Feb 2015 09:38, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thanks Akhil for mentioning this Low Level Consumer ( https://github.com/dibbhatt/kafka-spark-consumer ) . Yes it has better fault tolerant mechanism than any existing Kafka consumer available . This has no data loss on receiver failure and have ability to reply or restart itself in-case of failure. You can definitely give it a try . Dibyendu On Thu, Feb 5, 2015 at 1:04 AM, Akhil Das ak...@sigmoidanalytics.com wrote: AFAIK, From Spark 1.2.0 you can have WAL (Write Ahead Logs) for fault tolerance, which means it can handle the receiver/driver failures. You can also look at the lowlevel kafka consumer https://github.com/dibbhatt/kafka-spark-consumer which has a better fault tolerance mechanism for receiver failures. This low level consumer will push the offset of the message being read into zookeeper for fault tolerance. In your case i think mostly the inflight data would be lost if you arent using any of the fault tolerance mechanism. Thanks Best Regards On Wed, Feb 4, 2015 at 5:24 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Sprakans, I'm running a spark streaming app which reads data from kafka topic does some processing and then persists the results in HBase. I am using spark 1.2.0 running on Yarn cluster with 3 executors (2gb, 8 cores each). I've enable checkpointing I am also rate limiting my kafkaReceivers so that the number of items read is not more than 10 records per sec. The kafkaReceiver I'm using is *not* ReliableKafkaReceiver. This app was running fine for ~3 days then there was an increased load on the HBase server because of some other process querying HBase tables. This led to increase in the batch processing time of the spark batches (processed 1 min batch in 10 min) which previously was finishing in 20 sec which in turn led to the shutdown of the spark application, PFA the executor logs. From the logs I'm getting below exceptions *[1]* *[2]* looks like there was some outstanding Jobs that didn't get processed or the Job couldn't find the input data. From the logs it looks seems that the shutdown hook gets invoked but it cannot process the in-flight block. I have a couple of queries on this 1) Does this mean that these jobs failed and the *in-flight data *is lost? 2) Does the Spark job *buffers kafka* input data while the Job is under processing state for 10 mins and on shutdown is that too lost? (I do not see any OOM error in the logs). 3) Can we have *explicit commits* enabled in the kafkaReceiver so that the offsets gets committed only when the RDD(s) get successfully processed? Also I'd like to know if there is a *graceful way to shutdown a spark app running on yarn*. Currently I'm killing the yarn app to stop it which leads to loss of that job's history wheras in this case the application stops and succeeds and thus preserves the logs history. *[1]* 15/02/02 19:30:11 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from hbase28.usdc2.cloud.com/10.193.150.221:43189 is closed *[2]* java.lang.Exception: Could not compute split, block input-2-1422901498800 not found *[3]* org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/spark/realtime-failover/msg_2378481654720966.avro (inode 879488): File does not exist. Holder DFSClient_NONMAPREDUCE_-148264920_63 does not have any open files. -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster
numStreams is 5 in my case. ListJavaPairDStreambyte[], byte[] kafkaStreams = new ArrayList(numStreams); for (int i = 0; i numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER())); } JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0), kafkaStreams); On Wed, Jan 21, 2015 at 3:19 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi Mukesh, How are you creating your receivers? Could you post the (relevant) code? -kr, Gerard. On Wed, Jan 21, 2015 at 9:42 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Guys, I've re partitioned my kafkaStream so that it gets evenly distributed among the executors and the results are better. Still from the executors page it seems that only 1 executors all 8 cores are getting used and other executors are using just 1 core. Is this the correct interpretation based on the below data? If so how can we fix this? [image: Inline image 1] On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Thats is kind of expected due to data locality. Though you should see some tasks running on the executors as the data gets replicated to other nodes and can therefore run tasks based on locality. You have two solutions 1. kafkaStream.repartition() to explicitly repartition the received data across the cluster. 2. Create multiple kafka streams and union them together. See http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com wrote: Thanks Sandy, It was the issue with the no of cores. Another issue I was facing is that tasks are not getting distributed evenly among all executors and are running on the NODE_LOCAL locality level i.e. all the tasks are running on the same executor where my kafkareceiver(s) are running even though other executors are idle. I configured spark.locality.wait=50 instead of the default 3000 ms, which forced the task rebalancing among nodes, let me know if there is a better way to deal with this. On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com wrote: Makes sense, I've also tries it in standalone mode where all 3 workers driver were running on the same 8 core box and the results were similar. Anyways I will share the results in YARN mode with 8 core yarn containers. On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com wrote: When running in standalone mode, each executor will be able to use all 8 cores on the box. When running on YARN, each executor will only have access to 2 cores. So the comparison doesn't seem fair, no? -Sandy On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com wrote: Nope, I am setting 5 executors with 2 cores each. Below is the command that I'm using to submit in YARN mode. This starts up 5 executor nodes and a drives as per the spark application master UI. spark-submit --master yarn-cluster --num-executors 5 --driver-memory 1024m --executor-memory 1024m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com wrote: *oops, I mean are you setting --executor-cores to 8 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Are you setting --num-executors to 8? On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com wrote: Sorry Sandy, The command is just for reference but I can confirm that there are 4 executors and a driver as shown in the spark UI page. Each of these machines is a 8 core box with ~15G of ram. On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Based on your spark-submit command, it looks like you're only running with 2 executors on YARN. Also, how many cores does each machine have? -Sandy On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm bench-marking Spark on YARN (https://spark.apache.org/docs/latest/running-on-yarn.html) vs a standalone spark cluster ( https://spark.apache.org/docs/latest/spark-standalone.html). I have a standalone cluster with 3 executors, and a spark app running on yarn with 4 executors as shown below. The spark job running inside yarn is 10x slower than the one running on the standalone cluster (even though the yarn has more number of workers), also in both the case all the executors are in the same datacenter so there shouldn't be any latency. On YARN each 5sec batch is reading data from kafka and processing it in 5sec on the standalone cluster each 5sec
Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster
Hello Guys, I've re partitioned my kafkaStream so that it gets evenly distributed among the executors and the results are better. Still from the executors page it seems that only 1 executors all 8 cores are getting used and other executors are using just 1 core. Is this the correct interpretation based on the below data? If so how can we fix this? [image: Inline image 1] On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Thats is kind of expected due to data locality. Though you should see some tasks running on the executors as the data gets replicated to other nodes and can therefore run tasks based on locality. You have two solutions 1. kafkaStream.repartition() to explicitly repartition the received data across the cluster. 2. Create multiple kafka streams and union them together. See http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com wrote: Thanks Sandy, It was the issue with the no of cores. Another issue I was facing is that tasks are not getting distributed evenly among all executors and are running on the NODE_LOCAL locality level i.e. all the tasks are running on the same executor where my kafkareceiver(s) are running even though other executors are idle. I configured spark.locality.wait=50 instead of the default 3000 ms, which forced the task rebalancing among nodes, let me know if there is a better way to deal with this. On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com wrote: Makes sense, I've also tries it in standalone mode where all 3 workers driver were running on the same 8 core box and the results were similar. Anyways I will share the results in YARN mode with 8 core yarn containers. On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com wrote: When running in standalone mode, each executor will be able to use all 8 cores on the box. When running on YARN, each executor will only have access to 2 cores. So the comparison doesn't seem fair, no? -Sandy On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com wrote: Nope, I am setting 5 executors with 2 cores each. Below is the command that I'm using to submit in YARN mode. This starts up 5 executor nodes and a drives as per the spark application master UI. spark-submit --master yarn-cluster --num-executors 5 --driver-memory 1024m --executor-memory 1024m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com wrote: *oops, I mean are you setting --executor-cores to 8 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Are you setting --num-executors to 8? On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com wrote: Sorry Sandy, The command is just for reference but I can confirm that there are 4 executors and a driver as shown in the spark UI page. Each of these machines is a 8 core box with ~15G of ram. On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Based on your spark-submit command, it looks like you're only running with 2 executors on YARN. Also, how many cores does each machine have? -Sandy On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm bench-marking Spark on YARN (https://spark.apache.org/docs/latest/running-on-yarn.html) vs a standalone spark cluster ( https://spark.apache.org/docs/latest/spark-standalone.html). I have a standalone cluster with 3 executors, and a spark app running on yarn with 4 executors as shown below. The spark job running inside yarn is 10x slower than the one running on the standalone cluster (even though the yarn has more number of workers), also in both the case all the executors are in the same datacenter so there shouldn't be any latency. On YARN each 5sec batch is reading data from kafka and processing it in 5sec on the standalone cluster each 5sec batch is getting processed in 0.4sec. Also, In YARN mode all the executors are not getting used up evenly as vm-13 vm-14 are running most of the tasks whereas in the standalone mode all the executors are running the tasks. Do I need to set up some configuration to evenly distribute the tasks? Also do you have any pointers on the reasons the yarn job is 10x slower than the standalone job? Any suggestion is greatly appreciated, Thanks in advance. YARN(5 workers + driver) Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms
Re: SPARKonYARN failing on CDH 5.3.0 : container cannot be fetched because of NumberFormatException
I am using pre built *spark-1.2.0-bin-hadoop2.4* from *[1] *to submit spark applications to yarn, I cannot find the pre built spark for *CDH-5.x* versions. So, In my case the org.apache.hadoop.yarn.util.ConverterUtils class is coming from the spark-assembly-1.1.0-hadoop2.4.0.jar which is part of the pre built spark and hence causing this issue. How / where can I get spark 1.2.0 built for CDH-5.3.0, Icheck in maven repo etc with no luck. *[1]* https://spark.apache.org/downloads.html On Fri, Jan 9, 2015 at 1:12 AM, Marcelo Vanzin van...@cloudera.com wrote: Just to add to Sandy's comment, check your client configuration (generally in /etc/spark/conf). If you're using CM, you may need to run the Deploy Client Configuration command on the cluster to update the configs to match the new version of CDH. On Thu, Jan 8, 2015 at 11:38 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Those line numbers in ConverterUtils in the stack trace don't appear to line up with CDH 5.3: https://github.com/cloudera/hadoop-common/blob/cdh5-2.5.0_5.3.0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java Is it possible you're still including the old jars on the classpath in some way? -Sandy On Thu, Jan 8, 2015 at 3:38 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Experts, I am running spark inside YARN job. The spark-streaming job is running fine in CDH-5.0.0 but after the upgrade to 5.3.0 it cannot fetch containers with the below errors. Looks like the container id is incorrect and a string is present in a pace where it's expecting a number. java.lang.IllegalArgumentException: Invalid ContainerId: container_e01_1420481081140_0006_01_01 Caused by: java.lang.NumberFormatException: For input string: e01 Is this a bug?? Did you face something similar and any ideas how to fix this? 15/01/08 09:50:28 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/01/08 09:50:29 ERROR yarn.ApplicationMaster: Uncaught exception: java.lang.IllegalArgumentException: Invalid ContainerId: container_e01_1420481081140_0006_01_01 at org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182) at org.apache.spark.deploy.yarn.YarnRMClientImpl.getAttemptId(YarnRMClientImpl.scala:79) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:79) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:515) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:513) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) Caused by: java.lang.NumberFormatException: For input string: e01 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:441) at java.lang.Long.parseLong(Long.java:483) at org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137) at org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177) ... 11 more 15/01/08 09:50:29 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 10, (reason: Uncaught exception: Invalid ContainerId: container_e01_1420481081140_0006_01_01) -- Thanks Regards, Mukesh Jha -- Marcelo -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
SPARKonYARN failing on CDH 5.3.0 : container cannot be fetched because of NumberFormatException
Hi Experts, I am running spark inside YARN job. The spark-streaming job is running fine in CDH-5.0.0 but after the upgrade to 5.3.0 it cannot fetch containers with the below errors. Looks like the container id is incorrect and a string is present in a pace where it's expecting a number. java.lang.IllegalArgumentException: Invalid ContainerId: container_e01_1420481081140_0006_01_01 Caused by: java.lang.NumberFormatException: For input string: e01 Is this a bug?? Did you face something similar and any ideas how to fix this? 15/01/08 09:50:28 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/01/08 09:50:29 ERROR yarn.ApplicationMaster: Uncaught exception: java.lang.IllegalArgumentException: Invalid ContainerId: container_e01_1420481081140_0006_01_01 at org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182) at org.apache.spark.deploy.yarn.YarnRMClientImpl.getAttemptId(YarnRMClientImpl.scala:79) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:79) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:515) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:513) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) Caused by: java.lang.NumberFormatException: For input string: e01 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:441) at java.lang.Long.parseLong(Long.java:483) at org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137) at org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177) ... 11 more 15/01/08 09:50:29 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 10, (reason: Uncaught exception: Invalid ContainerId: container_e01_1420481081140_0006_01_01) -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
KafkaUtils not consuming all the data from all partitions
Hi Guys, I have a kafka topic having 90 partitions and I running SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10 kafka-receivers. My streaming is running fine and there is no delay in processing, just that some partitions data is never getting picked up. From the kafka console I can see that each receiver is consuming data from 9 partitions but the lag for some offsets keeps on increasing. Below is my kafka-consumers parameters. Any of you have face this kind of issue, if so then do you have any pointers to fix it? MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, kafkaZkQuorum); kafkaConf.put(group.id, kafkaConsumerGroup); kafkaConf.put(consumer.timeout.ms, 3); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(fetch.message.max.bytes, 2000); kafkaConf.put(zookeeper.session.timeout.ms, 6000); kafkaConf.put(zookeeper.connection.timeout.ms, 6000); kafkaConf.put(zookeeper.sync.time.ms, 2000); kafkaConf.put(rebalance.backoff.ms, 1); kafkaConf.put(rebalance.max.retries, 20); -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: KafkaUtils not consuming all the data from all partitions
I understand that I've to create 10 parallel streams. My code is running fine when the no of partitions is ~20, but when I increase the no of partitions I keep getting in this issue. Below is my code to create kafka streams, along with the configs used. MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, kafkaZkQuorum); kafkaConf.put(group.id, kafkaConsumerGroup); kafkaConf.put(consumer.timeout.ms, 3); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(fetch.message.max.bytes, 2000); kafkaConf.put(zookeeper.session.timeout.ms, 6000); kafkaConf.put(zookeeper.connection.timeout.ms, 6000); kafkaConf.put(zookeeper.sync.time.ms, 2000); kafkaConf.put(rebalance.backoff.ms, 1); kafkaConf.put(rebalance.max.retries, 20); String[] topics = kafkaTopicsList; int numStreams = numKafkaThreads; // this is *10* MapString, Integer topicMap = new HashMap(); for (String topic: topics) { topicMap.put(topic, numStreams); } ListJavaPairDStreambyte[], byte[] kafkaStreams = new ArrayList(numStreams); for (int i = 0; i numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER())); } JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0), kafkaStreams); On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, Could you add the code where you create the Kafka consumer? -kr, Gerard. On Wed, Jan 7, 2015 at 3:43 PM, francois.garil...@typesafe.com wrote: Hi Mukesh, If my understanding is correct, each Stream only has a single Receiver. So, if you have each receiver consuming 9 partitions, you need 10 input DStreams to create 10 concurrent receivers: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving Would you mind sharing a bit more on how you achieve this ? -- FG On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Guys, I have a kafka topic having 90 partitions and I running SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10 kafka-receivers. My streaming is running fine and there is no delay in processing, just that some partitions data is never getting picked up. From the kafka console I can see that each receiver is consuming data from 9 partitions but the lag for some offsets keeps on increasing. Below is my kafka-consumers parameters. Any of you have face this kind of issue, if so then do you have any pointers to fix it? MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, kafkaZkQuorum); kafkaConf.put(group.id, kafkaConsumerGroup); kafkaConf.put(consumer.timeout.ms, 3); kafkaConf.put(auto.offset.reset, largest); kafkaConf.put(fetch.message.max.bytes, 2000); kafkaConf.put(zookeeper.session.timeout.ms, 6000); kafkaConf.put(zookeeper.connection.timeout.ms, 6000); kafkaConf.put(zookeeper.sync.time.ms, 2000); kafkaConf.put(rebalance.backoff.ms, 1); kafkaConf.put(rebalance.max.retries, 20); -- Thanks Regards, Mukesh Jha me.mukesh@gmail.com -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster
Thanks Sandy, It was the issue with the no of cores. Another issue I was facing is that tasks are not getting distributed evenly among all executors and are running on the NODE_LOCAL locality level i.e. all the tasks are running on the same executor where my kafkareceiver(s) are running even though other executors are idle. I configured *spark.locality.wait=50* instead of the default 3000 ms, which forced the task rebalancing among nodes, let me know if there is a better way to deal with this. On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com wrote: Makes sense, I've also tries it in standalone mode where all 3 workers driver were running on the same 8 core box and the results were similar. Anyways I will share the results in YARN mode with 8 core yarn containers. On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com wrote: When running in standalone mode, each executor will be able to use all 8 cores on the box. When running on YARN, each executor will only have access to 2 cores. So the comparison doesn't seem fair, no? -Sandy On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com wrote: Nope, I am setting 5 executors with 2 cores each. Below is the command that I'm using to submit in YARN mode. This starts up 5 executor nodes and a drives as per the spark application master UI. spark-submit --master yarn-cluster --num-executors 5 --driver-memory 1024m --executor-memory 1024m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com wrote: *oops, I mean are you setting --executor-cores to 8 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Are you setting --num-executors to 8? On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com wrote: Sorry Sandy, The command is just for reference but I can confirm that there are 4 executors and a driver as shown in the spark UI page. Each of these machines is a 8 core box with ~15G of ram. On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Based on your spark-submit command, it looks like you're only running with 2 executors on YARN. Also, how many cores does each machine have? -Sandy On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm bench-marking Spark on YARN ( https://spark.apache.org/docs/latest/running-on-yarn.html) vs a standalone spark cluster ( https://spark.apache.org/docs/latest/spark-standalone.html). I have a standalone cluster with 3 executors, and a spark app running on yarn with 4 executors as shown below. The spark job running inside yarn is 10x slower than the one running on the standalone cluster (even though the yarn has more number of workers), also in both the case all the executors are in the same datacenter so there shouldn't be any latency. On YARN each 5sec batch is reading data from kafka and processing it in 5sec on the standalone cluster each 5sec batch is getting processed in 0.4sec. Also, In YARN mode all the executors are not getting used up evenly as vm-13 vm-14 are running most of the tasks whereas in the standalone mode all the executors are running the tasks. Do I need to set up some configuration to evenly distribute the tasks? Also do you have any pointers on the reasons the yarn job is 10x slower than the standalone job? Any suggestion is greatly appreciated, Thanks in advance. YARN(5 workers + driver) Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0 B 2047.0 B 1710.0 B Thread Dump 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m 0.0 B 0.0 B 0.0 B Thread Dump 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m 0.0 B 1368.0 B 2.8 KB Thread Dump 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0 B 1368.0 B 1026.0 B Thread Dump 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B 1881.0 B 2.8 KB Thread Dump driver vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Thread Dump /homext/spark/bin/spark-submit --master yarn-cluster --num-executors 2 --driver-memory 512m --executor-memory 512m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 STANDALONE(3 workers + driver) == Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069 6.0 m 0.0 B 1534.0 B 3.0 KB Thread Dump 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057 5.9 m 0.0 B 1368.0 B 4.0 KB Thread Dump 2 vm-73
Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster
Sorry Sandy, The command is just for reference but I can confirm that there are 4 executors and a driver as shown in the spark UI page. Each of these machines is a 8 core box with ~15G of ram. On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Based on your spark-submit command, it looks like you're only running with 2 executors on YARN. Also, how many cores does each machine have? -Sandy On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm bench-marking Spark on YARN ( https://spark.apache.org/docs/latest/running-on-yarn.html) vs a standalone spark cluster ( https://spark.apache.org/docs/latest/spark-standalone.html). I have a standalone cluster with 3 executors, and a spark app running on yarn with 4 executors as shown below. The spark job running inside yarn is 10x slower than the one running on the standalone cluster (even though the yarn has more number of workers), also in both the case all the executors are in the same datacenter so there shouldn't be any latency. On YARN each 5sec batch is reading data from kafka and processing it in 5sec on the standalone cluster each 5sec batch is getting processed in 0.4sec. Also, In YARN mode all the executors are not getting used up evenly as vm-13 vm-14 are running most of the tasks whereas in the standalone mode all the executors are running the tasks. Do I need to set up some configuration to evenly distribute the tasks? Also do you have any pointers on the reasons the yarn job is 10x slower than the standalone job? Any suggestion is greatly appreciated, Thanks in advance. YARN(5 workers + driver) Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0 B 2047.0 B 1710.0 B Thread Dump 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m 0.0 B 0.0 B 0.0 B Thread Dump 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m 0.0 B 1368.0 B 2.8 KB Thread Dump 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0 B 1368.0 B 1026.0 B Thread Dump 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B 1881.0 B 2.8 KB Thread Dump driver vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Thread Dump /homext/spark/bin/spark-submit --master yarn-cluster --num-executors 2 --driver-memory 512m --executor-memory 512m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 STANDALONE(3 workers + driver) == Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069 6.0 m 0.0 B 1534.0 B 3.0 KB Thread Dump 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057 5.9 m 0.0 B 1368.0 B 4.0 KB Thread Dump 2 vm-73.cloud.com:37621 0 0.0B/265.0MB 0.0 B 1 0 1059 1060 5.9 m 0.0 B 2.0 KB 1368.0 B Thread Dump driver vm.cloud.com:58299 0 0.0B/265.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Thread Dump /homext/spark/bin/spark-submit --master spark://chsnmvproc71vm3.usdc2.oraclecloud.com:7077 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-standalone avro 1 5000 PS: I did go through the spark website and http://www.virdata.com/tuning-spark/, but was out of any luck. -- Cheers, Mukesh Jha -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster
And this is with spark version 1.2.0. On Mon, Dec 29, 2014 at 11:43 PM, Mukesh Jha me.mukesh@gmail.com wrote: Sorry Sandy, The command is just for reference but I can confirm that there are 4 executors and a driver as shown in the spark UI page. Each of these machines is a 8 core box with ~15G of ram. On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Based on your spark-submit command, it looks like you're only running with 2 executors on YARN. Also, how many cores does each machine have? -Sandy On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm bench-marking Spark on YARN ( https://spark.apache.org/docs/latest/running-on-yarn.html) vs a standalone spark cluster ( https://spark.apache.org/docs/latest/spark-standalone.html). I have a standalone cluster with 3 executors, and a spark app running on yarn with 4 executors as shown below. The spark job running inside yarn is 10x slower than the one running on the standalone cluster (even though the yarn has more number of workers), also in both the case all the executors are in the same datacenter so there shouldn't be any latency. On YARN each 5sec batch is reading data from kafka and processing it in 5sec on the standalone cluster each 5sec batch is getting processed in 0.4sec. Also, In YARN mode all the executors are not getting used up evenly as vm-13 vm-14 are running most of the tasks whereas in the standalone mode all the executors are running the tasks. Do I need to set up some configuration to evenly distribute the tasks? Also do you have any pointers on the reasons the yarn job is 10x slower than the standalone job? Any suggestion is greatly appreciated, Thanks in advance. YARN(5 workers + driver) Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0 B 2047.0 B 1710.0 B Thread Dump 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m 0.0 B 0.0 B 0.0 B Thread Dump 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m 0.0 B 1368.0 B 2.8 KB Thread Dump 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0 B 1368.0 B 1026.0 B Thread Dump 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B 1881.0 B 2.8 KB Thread Dump driver vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Thread Dump /homext/spark/bin/spark-submit --master yarn-cluster --num-executors 2 --driver-memory 512m --executor-memory 512m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 STANDALONE(3 workers + driver) == Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069 6.0 m 0.0 B 1534.0 B 3.0 KB Thread Dump 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057 5.9 m 0.0 B 1368.0 B 4.0 KB Thread Dump 2 vm-73.cloud.com:37621 0 0.0B/265.0MB 0.0 B 1 0 1059 1060 5.9 m 0.0 B 2.0 KB 1368.0 B Thread Dump driver vm.cloud.com:58299 0 0.0B/265.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Thread Dump /homext/spark/bin/spark-submit --master spark://chsnmvproc71vm3.usdc2.oraclecloud.com:7077 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-standalone avro 1 5000 PS: I did go through the spark website and http://www.virdata.com/tuning-spark/, but was out of any luck. -- Cheers, Mukesh Jha -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster
Nope, I am setting 5 executors with 2 cores each. Below is the command that I'm using to submit in YARN mode. This starts up 5 executor nodes and a drives as per the spark application master UI. spark-submit --master yarn-cluster --num-executors 5 --driver-memory 1024m --executor-memory 1024m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com wrote: *oops, I mean are you setting --executor-cores to 8 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Are you setting --num-executors to 8? On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com wrote: Sorry Sandy, The command is just for reference but I can confirm that there are 4 executors and a driver as shown in the spark UI page. Each of these machines is a 8 core box with ~15G of ram. On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Based on your spark-submit command, it looks like you're only running with 2 executors on YARN. Also, how many cores does each machine have? -Sandy On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm bench-marking Spark on YARN ( https://spark.apache.org/docs/latest/running-on-yarn.html) vs a standalone spark cluster ( https://spark.apache.org/docs/latest/spark-standalone.html). I have a standalone cluster with 3 executors, and a spark app running on yarn with 4 executors as shown below. The spark job running inside yarn is 10x slower than the one running on the standalone cluster (even though the yarn has more number of workers), also in both the case all the executors are in the same datacenter so there shouldn't be any latency. On YARN each 5sec batch is reading data from kafka and processing it in 5sec on the standalone cluster each 5sec batch is getting processed in 0.4sec. Also, In YARN mode all the executors are not getting used up evenly as vm-13 vm-14 are running most of the tasks whereas in the standalone mode all the executors are running the tasks. Do I need to set up some configuration to evenly distribute the tasks? Also do you have any pointers on the reasons the yarn job is 10x slower than the standalone job? Any suggestion is greatly appreciated, Thanks in advance. YARN(5 workers + driver) Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0 B 2047.0 B 1710.0 B Thread Dump 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m 0.0 B 0.0 B 0.0 B Thread Dump 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m 0.0 B 1368.0 B 2.8 KB Thread Dump 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0 B 1368.0 B 1026.0 B Thread Dump 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B 1881.0 B 2.8 KB Thread Dump driver vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Thread Dump /homext/spark/bin/spark-submit --master yarn-cluster --num-executors 2 --driver-memory 512m --executor-memory 512m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 STANDALONE(3 workers + driver) == Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069 6.0 m 0.0 B 1534.0 B 3.0 KB Thread Dump 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057 5.9 m 0.0 B 1368.0 B 4.0 KB Thread Dump 2 vm-73.cloud.com:37621 0 0.0B/265.0MB 0.0 B 1 0 1059 1060 5.9 m 0.0 B 2.0 KB 1368.0 B Thread Dump driver vm.cloud.com:58299 0 0.0B/265.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Thread Dump /homext/spark/bin/spark-submit --master spark://chsnmvproc71vm3.usdc2.oraclecloud.com:7077 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-standalone avro 1 5000 PS: I did go through the spark website and http://www.virdata.com/tuning-spark/, but was out of any luck. -- Cheers, Mukesh Jha -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster
Makes sense, I've also tries it in standalone mode where all 3 workers driver were running on the same 8 core box and the results were similar. Anyways I will share the results in YARN mode with 8 core yarn containers. On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com wrote: When running in standalone mode, each executor will be able to use all 8 cores on the box. When running on YARN, each executor will only have access to 2 cores. So the comparison doesn't seem fair, no? -Sandy On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com wrote: Nope, I am setting 5 executors with 2 cores each. Below is the command that I'm using to submit in YARN mode. This starts up 5 executor nodes and a drives as per the spark application master UI. spark-submit --master yarn-cluster --num-executors 5 --driver-memory 1024m --executor-memory 1024m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com wrote: *oops, I mean are you setting --executor-cores to 8 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Are you setting --num-executors to 8? On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com wrote: Sorry Sandy, The command is just for reference but I can confirm that there are 4 executors and a driver as shown in the spark UI page. Each of these machines is a 8 core box with ~15G of ram. On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Mukesh, Based on your spark-submit command, it looks like you're only running with 2 executors on YARN. Also, how many cores does each machine have? -Sandy On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm bench-marking Spark on YARN ( https://spark.apache.org/docs/latest/running-on-yarn.html) vs a standalone spark cluster ( https://spark.apache.org/docs/latest/spark-standalone.html). I have a standalone cluster with 3 executors, and a spark app running on yarn with 4 executors as shown below. The spark job running inside yarn is 10x slower than the one running on the standalone cluster (even though the yarn has more number of workers), also in both the case all the executors are in the same datacenter so there shouldn't be any latency. On YARN each 5sec batch is reading data from kafka and processing it in 5sec on the standalone cluster each 5sec batch is getting processed in 0.4sec. Also, In YARN mode all the executors are not getting used up evenly as vm-13 vm-14 are running most of the tasks whereas in the standalone mode all the executors are running the tasks. Do I need to set up some configuration to evenly distribute the tasks? Also do you have any pointers on the reasons the yarn job is 10x slower than the standalone job? Any suggestion is greatly appreciated, Thanks in advance. YARN(5 workers + driver) Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0 B 2047.0 B 1710.0 B Thread Dump 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m 0.0 B 0.0 B 0.0 B Thread Dump 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m 0.0 B 1368.0 B 2.8 KB Thread Dump 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0 B 1368.0 B 1026.0 B Thread Dump 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B 1881.0 B 2.8 KB Thread Dump driver vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Thread Dump /homext/spark/bin/spark-submit --master yarn-cluster --num-executors 2 --driver-memory 512m --executor-memory 512m --executor-cores 2 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000 STANDALONE(3 workers + driver) == Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input ShuffleRead ShuffleWrite Thread Dump 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069 6.0 m 0.0 B 1534.0 B 3.0 KB Thread Dump 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057 5.9 m 0.0 B 1368.0 B 4.0 KB Thread Dump 2 vm-73.cloud.com:37621 0 0.0B/265.0MB 0.0 B 1 0 1059 1060 5.9 m 0.0 B 2.0 KB 1368.0 B Thread Dump driver vm.cloud.com:58299 0 0.0B/265.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0 B 0.0 B Thread Dump /homext/spark/bin/spark-submit --master spark://chsnmvproc71vm3.usdc2.oraclecloud.com:7077 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-standalone avro 1 5000 PS: I did go through the spark website and http://www.virdata.com/tuning-spark/, but was out of any luck. -- Cheers, Mukesh Jha -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com
Re: KafkaUtils explicit acks
I agree that this is not a trivial task as in this approach the kafka ack's will be done by the SparkTasks that means a plug-able mean to ack your input data source i.e. changes in core. From my limited experience with Kafka + Spark what I've seem is If spark tasks takes longer time than the batch interval the next batch waits for the previous one to finish, so I was wondering if offset management can be done by spark too. I'm just trying to figure out if this seems to be a worthwhile addition to have? On Mon, Dec 15, 2014 at 11:39 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi, It is not a trivial work to acknowledge the offsets when RDD is fully processed, I think from my understanding only modify the KafakUtils is not enough to meet your requirement, you need to add a metadata management stuff for each block/RDD, and track them both in executor-driver side, and many other things should also be taken care J. Thanks Jerry *From:* mukh@gmail.com [mailto:mukh@gmail.com] *On Behalf Of *Mukesh Jha *Sent:* Monday, December 15, 2014 1:31 PM *To:* Tathagata Das *Cc:* francois.garil...@typesafe.com; user@spark.apache.org *Subject:* Re: KafkaUtils explicit acks Thanks TD Francois for the explanation documentation. I'm curious if we have any performance benchmark with without WAL for spark-streaming-kafka. Also In spark-streaming-kafka (as kafka provides a way to acknowledge logs) on top of WAL can we modify KafkaUtils to acknowledge the offsets only when the RRDs are fully processed and are getting evicted out of the Spark memory thus we can be cent percent sure that all the records are getting processed in the system. I was thinking if it's good to have the kafka offset information of each batch as part of RDDs metadata and commit the offsets once the RDDs lineage is complete. On Thu, Dec 11, 2014 at 6:26 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I am updating the docs right now. Here is a staged copy that you can have sneak peek of. This will be part of the Spark 1.2. http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html The updated fault-tolerance section tries to simplify the explanation of when and what data can be lost, and how to prevent that using the new experimental feature of write ahead logs. Any feedback will be much appreciated. TD On Wed, Dec 10, 2014 at 2:42 AM, francois.garil...@typesafe.com wrote: [sorry for the botched half-message] Hi Mukesh, There's been some great work on Spark Streaming reliability lately. https://www.youtube.com/watch?v=jcJq3ZalXD8 Look at the links from: https://issues.apache.org/jira/browse/SPARK-3129 I'm not aware of any doc yet (did I miss something ?) but you can look at the ReliableKafkaReceiver's test suite: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala -- FG On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Guys, Any insights on this?? If I'm not clear enough my question is how can I use kafka consumer and not loose any data in cases of failures with spark-streaming. On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm working on a spark app which reads data from kafka persists it in hbase. Spark documentation states the below [1] that in case of worker failure we can loose some data. If not how can I make my kafka stream more reliable? I have seen there is a simple consumer [2] but I'm not sure if it has been used/tested extensively. I was wondering if there is a way to explicitly acknowledge the kafka offsets once they are replicated in memory of other worker nodes (if it's not already done) to tackle this issue. Any help is appreciated in advance. Using any input source that receives data through a network - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. https://github.com/dibbhatt/kafka-spark-consumer Txz, Mukesh Jha -- Thanks Regards, Mukesh Jha -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: KafkaUtils explicit acks
Thanks TD Francois for the explanation documentation. I'm curious if we have any performance benchmark with without WAL for spark-streaming-kafka. Also In spark-streaming-kafka (as kafka provides a way to acknowledge logs) on top of WAL can we modify KafkaUtils to acknowledge the offsets only when the RRDs are fully processed and are getting evicted out of the Spark memory thus we can be cent percent sure that all the records are getting processed in the system. I was thinking if it's good to have the kafka offset information of each batch as part of RDDs metadata and commit the offsets once the RDDs lineage is complete. On Thu, Dec 11, 2014 at 6:26 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I am updating the docs right now. Here is a staged copy that you can have sneak peek of. This will be part of the Spark 1.2. http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html The updated fault-tolerance section tries to simplify the explanation of when and what data can be lost, and how to prevent that using the new experimental feature of write ahead logs. Any feedback will be much appreciated. TD On Wed, Dec 10, 2014 at 2:42 AM, francois.garil...@typesafe.com wrote: [sorry for the botched half-message] Hi Mukesh, There's been some great work on Spark Streaming reliability lately. https://www.youtube.com/watch?v=jcJq3ZalXD8 Look at the links from: https://issues.apache.org/jira/browse/SPARK-3129 I'm not aware of any doc yet (did I miss something ?) but you can look at the ReliableKafkaReceiver's test suite: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala -- FG On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Guys, Any insights on this?? If I'm not clear enough my question is how can I use kafka consumer and not loose any data in cases of failures with spark-streaming. On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm working on a spark app which reads data from kafka persists it in hbase. Spark documentation states the below [1] that in case of worker failure we can loose some data. If not how can I make my kafka stream more reliable? I have seen there is a simple consumer [2] but I'm not sure if it has been used/tested extensively. I was wondering if there is a way to explicitly acknowledge the kafka offsets once they are replicated in memory of other worker nodes (if it's not already done) to tackle this issue. Any help is appreciated in advance. Using any input source that receives data through a network - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. https://github.com/dibbhatt/kafka-spark-consumer Txz, Mukesh Jha -- Thanks Regards, Mukesh Jha -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: KafkaUtils explicit acks
Hello Guys, Any insights on this?? If I'm not clear enough my question is how can I use kafka consumer and not loose any data in cases of failures with spark-streaming. On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm working on a spark app which reads data from kafka persists it in hbase. Spark documentation states the below *[1]* that in case of worker failure we can loose some data. If not how can I make my kafka stream more reliable? I have seen there is a simple consumer *[2]* but I'm not sure if it has been used/tested extensively. I was wondering if there is a way to explicitly acknowledge the kafka offsets once they are replicated in memory of other worker nodes (if it's not already done) to tackle this issue. Any help is appreciated in advance. 1. *Using any input source that receives data through a network* - For network-based data sources like *Kafka *and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the *worker node where a network receiver was running fails, then a tiny bit of data may be lost*, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. 2. https://github.com/dibbhatt/kafka-spark-consumer Txz, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
KafkaUtils explicit acks
Hello Experts, I'm working on a spark app which reads data from kafka persists it in hbase. Spark documentation states the below *[1]* that in case of worker failure we can loose some data. If not how can I make my kafka stream more reliable? I have seen there is a simple consumer *[2]* but I'm not sure if it has been used/tested extensively. I was wondering if there is a way to explicitly acknowledge the kafka offsets once they are replicated in memory of other worker nodes (if it's not already done) to tackle this issue. Any help is appreciated in advance. 1. *Using any input source that receives data through a network* - For network-based data sources like *Kafka *and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the *worker node where a network receiver was running fails, then a tiny bit of data may be lost*, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. 2. https://github.com/dibbhatt/kafka-spark-consumer Txz, *Mukesh Jha me.mukesh@gmail.com*
Lifecycle of RDD in spark-streaming
Hey Experts, I wanted to understand in detail about the lifecycle of rdd(s) in a streaming app. From my current understanding - rdd gets created out of the realtime input stream. - Transform(s) functions are applied in a lazy fashion on the RDD to transform into another rdd(s). - Actions are taken on the final transformed rdds to get the data out of the system. Also rdd(s) are stored in the clusters RAM (disc if configured so) and are cleaned in LRU fashion. So I have the following questions on the same. - How spark (streaming) guarantees that all the actions are taken on each input rdd/batch. - How does spark determines that the life-cycle of a rdd is complete. Is there any chance that a RDD will be cleaned out of ram before all actions are taken on them? Thanks in advance for all your help. Also, I'm relatively new to scala spark so pardon me in case these are naive questions/assumptions. -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: Lifecycle of RDD in spark-streaming
Any pointers guys? On Tue, Nov 25, 2014 at 5:32 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hey Experts, I wanted to understand in detail about the lifecycle of rdd(s) in a streaming app. From my current understanding - rdd gets created out of the realtime input stream. - Transform(s) functions are applied in a lazy fashion on the RDD to transform into another rdd(s). - Actions are taken on the final transformed rdds to get the data out of the system. Also rdd(s) are stored in the clusters RAM (disc if configured so) and are cleaned in LRU fashion. So I have the following questions on the same. - How spark (streaming) guarantees that all the actions are taken on each input rdd/batch. - How does spark determines that the life-cycle of a rdd is complete. Is there any chance that a RDD will be cleaned out of ram before all actions are taken on them? Thanks in advance for all your help. Also, I'm relatively new to scala spark so pardon me in case these are naive questions/assumptions. -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Debugging spark java application
Hello experts, Is there an easy way to debug a spark java application? I'm putting debug logs in the map's function but there aren't any logs on the console. Also can i include my custom jars while launching spark-shell and do my poc there? This might me a naive question but any help here is appreciated.
Re: Functions in Spark
Thanks I did go through the video it was very informative, but I think I's looking for the Transformations section @ page https://spark.apache.org/docs/0.9.1/scala-programming-guide.html. On Mon, Nov 17, 2014 at 10:31 AM, Samarth Mailinglist mailinglistsama...@gmail.com wrote: Check this video out: https://www.youtube.com/watch?v=dmL0N3qfSc8list=UURzsq7k4-kT-h3TDUBQ82-w On Mon, Nov 17, 2014 at 9:43 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, Is there any way to know which of my functions perform better in Spark? In other words, say I have achieved same thing using two different implementations. How do I judge as to which implementation is better than the other. Is processing time the only metric that we can use to claim the goodness of one implementation to the other? Can anyone please share some thoughts on this? Thank You -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*