Re: Spark driver not reusing HConnection

2016-11-23 Thread Mukesh Jha
Corrosponding HBase bug: https://issues.apache.org/jira/browse/HBASE-12629

On Wed, Nov 23, 2016 at 1:55 PM, Mukesh Jha <me.mukesh@gmail.com> wrote:

> The solution is to disable region size caluculation check.
>
> hbase.regionsizecalculator.enable: false
>
> On Sun, Nov 20, 2016 at 9:29 PM, Mukesh Jha <me.mukesh@gmail.com>
> wrote:
>
>> Any ideas folks?
>>
>> On Fri, Nov 18, 2016 at 3:37 PM, Mukesh Jha <me.mukesh@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> I'm accessing multiple regions (~5k) of an HBase table using spark's
>>> newAPIHadoopRDD. But the driver is trying to calculate the region size
>>> of all the regions.
>>> It is not even reusing the hconnection and creting a new connection for
>>> every request (see below) which is taking lots of time.
>>>
>>> Is there a better approach to do this?
>>>
>>>
>>> 8 Nov 2016 22:25:22,759] [INFO Driver] RecoverableZooKeeper: Process
>>> identifier=*hconnection-0x1e7824af* connecting to ZooKeeper ensemble=
>>> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
>>> [18 Nov 2016 22:25:22,759] [INFO Driver] ZooKeeper: Initiating client
>>> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
>>> hbase28.cloud.com:2181 sessionTimeout=6
>>> watcher=hconnection-0x1e7824af0x0, quorum=hbase19.cloud.com:2181,
>>> hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase
>>> [18 Nov 2016 22:25:22,761] [INFO Driver-SendThread(hbase24.clou
>>> d.com:2181)] ClientCnxn: Opening socket connection to server
>>> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
>>> using SASL (unknown error)
>>> [18 Nov 2016 22:25:22,763] [INFO Driver-SendThread(hbase24.clou
>>> d.com:2181)] ClientCnxn: Socket connection established, initiating
>>> session, client: /10.193.138.145:47891, server:
>>> hbase24.cloud.com/10.193.150.217:2181
>>> [18 Nov 2016 22:25:22,766] [INFO Driver-SendThread(hbase24.clou
>>> d.com:2181)] ClientCnxn: Session establishment complete on server
>>> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e95,
>>> negotiated timeout = 6
>>> [18 Nov 2016 22:25:22,766] [INFO Driver] RegionSizeCalculator:
>>> Calculating region sizes for table "message".
>>> [18 Nov 2016 22:25:27,867] [INFO Driver] 
>>> ConnectionManager$HConnectionImplementation:
>>> Closing master protocol: MasterService
>>> [18 Nov 2016 22:25:27,868] [INFO Driver] 
>>> ConnectionManager$HConnectionImplementation:
>>> Closing zookeeper sessionid=0x2564f6f013e0e95
>>> [18 Nov 2016 22:25:27,869] [INFO Driver] ZooKeeper: Session:
>>> 0x2564f6f013e0e95 closed
>>> [18 Nov 2016 22:25:27,869] [INFO Driver-EventThread] ClientCnxn:
>>> EventThread shut down
>>> [18 Nov 2016 22:25:27,880] [INFO Driver] RecoverableZooKeeper: Process
>>> identifier=*hconnection-0x6a8a1efa* connecting to ZooKeeper ensemble=
>>> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
>>> [18 Nov 2016 22:25:27,880] [INFO Driver] ZooKeeper: Initiating client
>>> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
>>> hbase28.cloud.com:2181 sessionTimeout=6
>>> watcher=hconnection-0x6a8a1efa0x0, quorum=hbase19.cloud.com:2181,
>>> hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase
>>> [18 Nov 2016 22:25:27,883] [INFO Driver-SendThread(hbase24.clou
>>> d.com:2181)] ClientCnxn: Opening socket connection to server
>>> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
>>> using SASL (unknown error)
>>> [18 Nov 2016 22:25:27,885] [INFO Driver-SendThread(hbase24.clou
>>> d.com:2181)] ClientCnxn: Socket connection established, initiating
>>> session, client: /10.193.138.145:47894, server:
>>> hbase24.cloud.com/10.193.150.217:2181
>>> [18 Nov 2016 22:25:27,887] [INFO Driver-SendThread(hbase24.clou
>>> d.com:2181)] ClientCnxn: Session establishment complete on server
>>> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e97,
>>> negotiated timeout = 6
>>> [18 Nov 2016 22:25:27,888] [INFO Driver] RegionSizeCalculator:
>>> Calculating region sizes for table "message".
>>> 
>>>
>>> --
>>> Thanks & Regards,
>>>
>>> *Mukesh Jha <me.mukesh@gmail.com>*
>>>
>>
>>
>>
>> --
>>
>>
>> Thanks & Regards,
>>
>> *Mukesh Jha <me.mukesh@gmail.com>*
>>
>
>
>
> --
>
>
> Thanks & Regards,
>
> *Mukesh Jha <me.mukesh@gmail.com>*
>



-- 


Thanks & Regards,

*Mukesh Jha <me.mukesh@gmail.com>*


Re: Spark driver not reusing HConnection

2016-11-23 Thread Mukesh Jha
The solution is to disable region size caluculation check.

hbase.regionsizecalculator.enable: false

On Sun, Nov 20, 2016 at 9:29 PM, Mukesh Jha <me.mukesh@gmail.com> wrote:

> Any ideas folks?
>
> On Fri, Nov 18, 2016 at 3:37 PM, Mukesh Jha <me.mukesh@gmail.com>
> wrote:
>
>> Hi
>>
>> I'm accessing multiple regions (~5k) of an HBase table using spark's
>> newAPIHadoopRDD. But the driver is trying to calculate the region size
>> of all the regions.
>> It is not even reusing the hconnection and creting a new connection for
>> every request (see below) which is taking lots of time.
>>
>> Is there a better approach to do this?
>>
>>
>> 8 Nov 2016 22:25:22,759] [INFO Driver] RecoverableZooKeeper: Process
>> identifier=*hconnection-0x1e7824af* connecting to ZooKeeper ensemble=
>> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
>> [18 Nov 2016 22:25:22,759] [INFO Driver] ZooKeeper: Initiating client
>> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
>> hbase28.cloud.com:2181 sessionTimeout=6
>> watcher=hconnection-0x1e7824af0x0, quorum=hbase19.cloud.com:2181,
>> hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase
>> [18 Nov 2016 22:25:22,761] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
>> ClientCnxn: Opening socket connection to server
>> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
>> using SASL (unknown error)
>> [18 Nov 2016 22:25:22,763] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
>> ClientCnxn: Socket connection established, initiating session, client: /
>> 10.193.138.145:47891, server: hbase24.cloud.com/10.193.150.217:2181
>> [18 Nov 2016 22:25:22,766] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
>> ClientCnxn: Session establishment complete on server
>> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e95,
>> negotiated timeout = 6
>> [18 Nov 2016 22:25:22,766] [INFO Driver] RegionSizeCalculator:
>> Calculating region sizes for table "message".
>> [18 Nov 2016 22:25:27,867] [INFO Driver] 
>> ConnectionManager$HConnectionImplementation:
>> Closing master protocol: MasterService
>> [18 Nov 2016 22:25:27,868] [INFO Driver] 
>> ConnectionManager$HConnectionImplementation:
>> Closing zookeeper sessionid=0x2564f6f013e0e95
>> [18 Nov 2016 22:25:27,869] [INFO Driver] ZooKeeper: Session:
>> 0x2564f6f013e0e95 closed
>> [18 Nov 2016 22:25:27,869] [INFO Driver-EventThread] ClientCnxn:
>> EventThread shut down
>> [18 Nov 2016 22:25:27,880] [INFO Driver] RecoverableZooKeeper: Process
>> identifier=*hconnection-0x6a8a1efa* connecting to ZooKeeper ensemble=
>> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
>> [18 Nov 2016 22:25:27,880] [INFO Driver] ZooKeeper: Initiating client
>> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
>> hbase28.cloud.com:2181 sessionTimeout=6
>> watcher=hconnection-0x6a8a1efa0x0, quorum=hbase19.cloud.com:2181,
>> hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase
>> [18 Nov 2016 22:25:27,883] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
>> ClientCnxn: Opening socket connection to server
>> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
>> using SASL (unknown error)
>> [18 Nov 2016 22:25:27,885] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
>> ClientCnxn: Socket connection established, initiating session, client: /
>> 10.193.138.145:47894, server: hbase24.cloud.com/10.193.150.217:2181
>> [18 Nov 2016 22:25:27,887] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
>> ClientCnxn: Session establishment complete on server
>> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e97,
>> negotiated timeout = 6
>> [18 Nov 2016 22:25:27,888] [INFO Driver] RegionSizeCalculator:
>> Calculating region sizes for table "message".
>> 
>>
>> --
>> Thanks & Regards,
>>
>> *Mukesh Jha <me.mukesh@gmail.com>*
>>
>
>
>
> --
>
>
> Thanks & Regards,
>
> *Mukesh Jha <me.mukesh@gmail.com>*
>



-- 


Thanks & Regards,

*Mukesh Jha <me.mukesh@gmail.com>*


Re: Spark driver not reusing HConnection

2016-11-20 Thread Mukesh Jha
Any ideas folks?

On Fri, Nov 18, 2016 at 3:37 PM, Mukesh Jha <me.mukesh@gmail.com> wrote:

> Hi
>
> I'm accessing multiple regions (~5k) of an HBase table using spark's
> newAPIHadoopRDD. But the driver is trying to calculate the region size of
> all the regions.
> It is not even reusing the hconnection and creting a new connection for
> every request (see below) which is taking lots of time.
>
> Is there a better approach to do this?
>
>
> 8 Nov 2016 22:25:22,759] [INFO Driver] RecoverableZooKeeper: Process
> identifier=*hconnection-0x1e7824af* connecting to ZooKeeper ensemble=
> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
> [18 Nov 2016 22:25:22,759] [INFO Driver] ZooKeeper: Initiating client
> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
> hbase28.cloud.com:2181 sessionTimeout=6 watcher=hconnection-0x1e7824af0x0,
> quorum=hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28
> .cloud.com:2181, baseZNode=/hbase
> [18 Nov 2016 22:25:22,761] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
> ClientCnxn: Opening socket connection to server
> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
> using SASL (unknown error)
> [18 Nov 2016 22:25:22,763] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
> ClientCnxn: Socket connection established, initiating session, client: /
> 10.193.138.145:47891, server: hbase24.cloud.com/10.193.150.217:2181
> [18 Nov 2016 22:25:22,766] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
> ClientCnxn: Session establishment complete on server
> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e95,
> negotiated timeout = 6
> [18 Nov 2016 22:25:22,766] [INFO Driver] RegionSizeCalculator: Calculating
> region sizes for table "message".
> [18 Nov 2016 22:25:27,867] [INFO Driver] 
> ConnectionManager$HConnectionImplementation:
> Closing master protocol: MasterService
> [18 Nov 2016 22:25:27,868] [INFO Driver] 
> ConnectionManager$HConnectionImplementation:
> Closing zookeeper sessionid=0x2564f6f013e0e95
> [18 Nov 2016 22:25:27,869] [INFO Driver] ZooKeeper: Session:
> 0x2564f6f013e0e95 closed
> [18 Nov 2016 22:25:27,869] [INFO Driver-EventThread] ClientCnxn:
> EventThread shut down
> [18 Nov 2016 22:25:27,880] [INFO Driver] RecoverableZooKeeper: Process
> identifier=*hconnection-0x6a8a1efa* connecting to ZooKeeper ensemble=
> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
> [18 Nov 2016 22:25:27,880] [INFO Driver] ZooKeeper: Initiating client
> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
> hbase28.cloud.com:2181 sessionTimeout=6 watcher=hconnection-0x6a8a1efa0x0,
> quorum=hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28
> .cloud.com:2181, baseZNode=/hbase
> [18 Nov 2016 22:25:27,883] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
> ClientCnxn: Opening socket connection to server
> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
> using SASL (unknown error)
> [18 Nov 2016 22:25:27,885] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
> ClientCnxn: Socket connection established, initiating session, client: /
> 10.193.138.145:47894, server: hbase24.cloud.com/10.193.150.217:2181
> [18 Nov 2016 22:25:27,887] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
> ClientCnxn: Session establishment complete on server
> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e97,
> negotiated timeout = 6
> [18 Nov 2016 22:25:27,888] [INFO Driver] RegionSizeCalculator: Calculating
> region sizes for table "message".
> 
>
> --
> Thanks & Regards,
>
> *Mukesh Jha <me.mukesh@gmail.com>*
>



-- 


Thanks & Regards,

*Mukesh Jha <me.mukesh@gmail.com>*


Spark driver not reusing HConnection

2016-11-18 Thread Mukesh Jha
Hi

I'm accessing multiple regions (~5k) of an HBase table using spark's
newAPIHadoopRDD. But the driver is trying to calculate the region size of
all the regions.
It is not even reusing the hconnection and creting a new connection for
every request (see below) which is taking lots of time.

Is there a better approach to do this?


8 Nov 2016 22:25:22,759] [INFO Driver] RecoverableZooKeeper: Process
identifier=*hconnection-0x1e7824af* connecting to ZooKeeper ensemble=
hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
[18 Nov 2016 22:25:22,759] [INFO Driver] ZooKeeper: Initiating client
connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
hbase28.cloud.com:2181 sessionTimeout=6
watcher=hconnection-0x1e7824af0x0, quorum=hbase19.cloud.com:2181,
hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase
[18 Nov 2016 22:25:22,761] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
ClientCnxn: Opening socket connection to server
hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
using SASL (unknown error)
[18 Nov 2016 22:25:22,763] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
ClientCnxn: Socket connection established, initiating session, client: /
10.193.138.145:47891, server: hbase24.cloud.com/10.193.150.217:2181
[18 Nov 2016 22:25:22,766] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
ClientCnxn: Session establishment complete on server
hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e95,
negotiated timeout = 6
[18 Nov 2016 22:25:22,766] [INFO Driver] RegionSizeCalculator: Calculating
region sizes for table "message".
[18 Nov 2016 22:25:27,867] [INFO Driver]
ConnectionManager$HConnectionImplementation: Closing master protocol:
MasterService
[18 Nov 2016 22:25:27,868] [INFO Driver]
ConnectionManager$HConnectionImplementation: Closing zookeeper
sessionid=0x2564f6f013e0e95
[18 Nov 2016 22:25:27,869] [INFO Driver] ZooKeeper: Session:
0x2564f6f013e0e95 closed
[18 Nov 2016 22:25:27,869] [INFO Driver-EventThread] ClientCnxn:
EventThread shut down
[18 Nov 2016 22:25:27,880] [INFO Driver] RecoverableZooKeeper: Process
identifier=*hconnection-0x6a8a1efa* connecting to ZooKeeper ensemble=
hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
[18 Nov 2016 22:25:27,880] [INFO Driver] ZooKeeper: Initiating client
connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
hbase28.cloud.com:2181 sessionTimeout=6
watcher=hconnection-0x6a8a1efa0x0, quorum=hbase19.cloud.com:2181,
hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase
[18 Nov 2016 22:25:27,883] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
ClientCnxn: Opening socket connection to server
hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
using SASL (unknown error)
[18 Nov 2016 22:25:27,885] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
ClientCnxn: Socket connection established, initiating session, client: /
10.193.138.145:47894, server: hbase24.cloud.com/10.193.150.217:2181
[18 Nov 2016 22:25:27,887] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
ClientCnxn: Session establishment complete on server
hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e97,
negotiated timeout = 6
[18 Nov 2016 22:25:27,888] [INFO Driver] RegionSizeCalculator: Calculating
region sizes for table "message".


-- 
Thanks & Regards,

*Mukesh Jha <me.mukesh@gmail.com>*


Re: Spark kafka integration issues

2016-09-14 Thread Mukesh Jha
Thanks for the reply Cody.

I found the below article on the same, very helpful. Thanks for the
details, much appreciated.

http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

On Tue, Sep 13, 2016 at 8:14 PM, Cody Koeninger <c...@koeninger.org> wrote:

> 1.  see http://spark.apache.org/docs/latest/streaming-kafka-
> integration.html#approach-2-direct-approach-no-receivers
>  look for HasOffsetRange.  If you really want the info per-message
> rather than per-partition, createRDD has an overload that takes a
> messageHandler from MessageAndMetadata to whatever you need
>
> 2. createRDD takes type parameters for the key and value decoder, so
> specify them there
>
> 3. you can use spark-streaming-kafka-0-8 against 0.9 or 0.10 brokers.
> There is a spark-streaming-kafka-0-10 package with additional features
> that only works on brokers 0.10 or higher.  A pull request for
> documenting it has been merged, but not deployed.
>
> On Tue, Sep 13, 2016 at 6:46 PM, Mukesh Jha <me.mukesh@gmail.com>
> wrote:
> > Hello fellow sparkers,
> >
> > I'm using spark to consume messages from kafka in a non streaming
> fashion.
> > I'm suing the using spark-streaming-kafka-0-8_2.10 & sparkv2.0to do the
> > same.
> >
> > I have a few queries for the same, please get back if you guys have
> clues on
> > the same.
> >
> > 1) Is there anyway to get the have the topic and partition & offset
> > information for each item from the KafkaRDD. I'm using the
> > KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder] to
> create
> > my kafka RDD.
> > 2) How to pass my custom Decoder instead of using the String or Byte
> decoder
> > are there any examples for the same?
> > 3) is there a newer version to consumer from kafka-0.10 & kafka-0.9
> clusters
> >
> > --
> > Thanks & Regards,
> >
> > Mukesh Jha
>



-- 


Thanks & Regards,

*Mukesh Jha <me.mukesh@gmail.com>*


Spark kafka integration issues

2016-09-13 Thread Mukesh Jha
Hello fellow sparkers,

I'm using spark to consume messages from kafka in a non streaming fashion.
I'm suing the using spark-streaming-kafka-0-8_2.10 & sparkv2.0to do the
same.

I have a few queries for the same, please get back if you guys have clues
on the same.

1) Is there anyway to get the have the topic and partition & offset
information for each item from the KafkaRDD. I'm using the
*KafkaUtils.createRDD[String,
String, StringDecoder, StringDecoder]* to create my kafka RDD.
2) How to pass my custom Decoder instead of using the String or Byte
decoder are there any examples for the same?
3) is there a newer version to consumer from kafka-0.10 & kafka-0.9 clusters

-- 
Thanks & Regards,

*Mukesh Jha <me.mukesh@gmail.com>*


Re: how to spark streaming application start working on next batch before completing on previous batch .

2015-12-15 Thread Mukesh Jha
Try setting *spark*.streaming.*concurrent*. *jobs* to number of concurrent
jobs you want to run.
On 15 Dec 2015 17:35, "ikmal"  wrote:

> The best practice is to set batch interval lesser than processing time. I'm
> sure your application is suffering from constantly increasing of scheduling
> delay.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-spark-streaming-application-start-working-on-next-batch-before-completing-on-previous-batch-tp25559p25707.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: how to spark streaming application start working on next batch before completing on previous batch .

2015-12-15 Thread Mukesh Jha
Are the issues related to wal based KafkaReliableReceivers or with any
receiver in general.

Any insights will be helpful.
On 16 Dec 2015 05:44, "Tathagata Das" <t...@databricks.com> wrote:

> Just to be clear. spark.treaming.concurrentJobs is NOT officially
> supported. There are issues with fault-tolerance and data loss if that is
> set to more than 1.
>
>
>
> On Tue, Dec 15, 2015 at 9:19 AM, Mukesh Jha <me.mukesh@gmail.com>
> wrote:
>
>> Try setting *spark*.streaming.*concurrent*. *jobs* to number of
>> concurrent jobs you want to run.
>> On 15 Dec 2015 17:35, "ikmal" <ikmal.s...@gmail.com> wrote:
>>
>>> The best practice is to set batch interval lesser than processing time.
>>> I'm
>>> sure your application is suffering from constantly increasing of
>>> scheduling
>>> delay.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-spark-streaming-application-start-working-on-next-batch-before-completing-on-previous-batch-tp25559p25707.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Re: SparkStreaming failing with exception Could not compute split, block input

2015-02-27 Thread Mukesh Jha
I'm streamin data from kafka topic using kafkautils  doing some
computation and writing records to hbase.

Storage level is memory-and-disk-ser
On 27 Feb 2015 16:20, Akhil Das ak...@sigmoidanalytics.com wrote:

 You could be hitting this issue
 https://issues.apache.org/jira/browse/SPARK-4516
 Apart from that little more information about your job would be helpful.

 Thanks
 Best Regards

 On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hi Experts,

 My Spark Job is failing with below error.

 From the logs I can see that input-3-1424842351600 was added at 5:32:32
 and was never purged out of memory. Also the available free memory for the
 executor is *2.1G*.

 Please help me figure out why executors cannot fetch this input.

 Txz for any help, Cheers.


 *Logs*
 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added
 input-3-1424842351600 in memory on
 chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB)
 .
 .
 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
 input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919
 (size: 232.3 KB, free: 2.1 GB)
 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
 input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751
 (size: 291.4 KB, free: 2.1 GB)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in
 stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in
 stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in
 stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in
 stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in
 stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in
 stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in
 stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in
 stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in
 stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in
 stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage
 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
 Could not compute split, block input-3-1424842351600 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage
 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
 Could not compute split, block input-3-1424842355600 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)

 --
 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*





Re: SparkStreaming failing with exception Could not compute split, block input

2015-02-27 Thread Mukesh Jha
Also my job is map only so there is no shuffle/reduce phase.

On Fri, Feb 27, 2015 at 7:10 PM, Mukesh Jha me.mukesh@gmail.com wrote:

 I'm streamin data from kafka topic using kafkautils  doing some
 computation and writing records to hbase.

 Storage level is memory-and-disk-ser
 On 27 Feb 2015 16:20, Akhil Das ak...@sigmoidanalytics.com wrote:

 You could be hitting this issue
 https://issues.apache.org/jira/browse/SPARK-4516
 Apart from that little more information about your job would be helpful.

 Thanks
 Best Regards

 On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hi Experts,

 My Spark Job is failing with below error.

 From the logs I can see that input-3-1424842351600 was added at 5:32:32
 and was never purged out of memory. Also the available free memory for the
 executor is *2.1G*.

 Please help me figure out why executors cannot fetch this input.

 Txz for any help, Cheers.


 *Logs*
 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added
 input-3-1424842351600 in memory on
 chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB)
 .
 .
 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
 input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919
 (size: 232.3 KB, free: 2.1 GB)
 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
 input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751
 (size: 291.4 KB, free: 2.1 GB)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in
 stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in
 stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in
 stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in
 stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in
 stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in
 stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in
 stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in
 stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in
 stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in
 stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage
 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
 Could not compute split, block input-3-1424842351600 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage
 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
 Could not compute split, block input-3-1424842355600 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)

 --
 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*





-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: SparkStreaming failing with exception Could not compute split, block input

2015-02-26 Thread Mukesh Jha
On Wed, Feb 25, 2015 at 8:09 PM, Mukesh Jha me.mukesh@gmail.com wrote:

 My application runs fine for ~3/4 hours and then hits this issue.

 On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hi Experts,

 My Spark Job is failing with below error.

 From the logs I can see that input-3-1424842351600 was added at 5:32:32
 and was never purged out of memory. Also the available free memory for the
 executor is *2.1G*.

 Please help me figure out why executors cannot fetch this input.

 Txz for any help, Cheers.


 *Logs*
 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added
 input-3-1424842351600 in memory on
 chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB)
 .
 .
 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
 input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919
 (size: 232.3 KB, free: 2.1 GB)
 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
 input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751
 (size: 291.4 KB, free: 2.1 GB)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in
 stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in
 stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in
 stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in
 stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in
 stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in
 stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in
 stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in
 stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in
 stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in
 stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage
 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
 Could not compute split, block input-3-1424842351600 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage
 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
 Could not compute split, block input-3-1424842355600 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)

 --
 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*




 --


 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*




-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: SparkStreaming failing with exception Could not compute split, block input

2015-02-25 Thread Mukesh Jha
My application runs fine for ~3/4 hours and then hits this issue.

On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha me.mukesh@gmail.com
wrote:

 Hi Experts,

 My Spark Job is failing with below error.

 From the logs I can see that input-3-1424842351600 was added at 5:32:32
 and was never purged out of memory. Also the available free memory for the
 executor is *2.1G*.

 Please help me figure out why executors cannot fetch this input.

 Txz for any help, Cheers.


 *Logs*
 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added
 input-3-1424842351600 in memory on
 chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB)
 .
 .
 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
 input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919
 (size: 232.3 KB, free: 2.1 GB)
 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
 input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751
 (size: 291.4 KB, free: 2.1 GB)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in
 stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in
 stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in
 stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in
 stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in
 stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in
 stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in
 stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in
 stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in
 stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in
 stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
 bytes)
 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage
 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
 Could not compute split, block input-3-1424842351600 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:56)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage
 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
 Could not compute split, block input-3-1424842355600 not found
 at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)

 --
 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*




-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


SparkStreaming failing with exception Could not compute split, block input

2015-02-24 Thread Mukesh Jha
Hi Experts,

My Spark Job is failing with below error.

From the logs I can see that input-3-1424842351600 was added at 5:32:32 and
was never purged out of memory. Also the available free memory for the
executor is *2.1G*.

Please help me figure out why executors cannot fetch this input.

Txz for any help, Cheers.


*Logs*
15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added
input-3-1424842351600 in memory on chsnmphbase31.usdc2.oraclecloud.com:50208
(size: 276.1 KB, free: 2.1 GB)
.
.
15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919
(size: 232.3 KB, free: 2.1 GB)
15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added
input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751
(size: 291.4 KB, free: 2.1 GB)
15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in
stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
bytes)
15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in
stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
bytes)
15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in
stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
bytes)
15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in
stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
bytes)
15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in
stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
bytes)
15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in
stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
bytes)
15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in
stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288
bytes)
15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in
stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288
bytes)
15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in
stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288
bytes)
15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in
stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288
bytes)
15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage
451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
Could not compute split, block input-3-1424842351600 not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage
451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception:
Could not compute split, block input-3-1424842355600 not found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)

-- 
Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: spark streaming: stderr does not roll

2015-02-24 Thread Mukesh Jha
I'm also facing the same issue.

I tried the configurations but it seems the executors spark's
log4j.properties seems to override the passed values, so you have to change
/etc/spark/conf/log4j.properties.

Let me know if any of you have managed to get this fixes programatically.

I am planning to use logrotate to rotate these logs.

On Thu, Nov 13, 2014 at 1:45 AM, Nguyen, Duc duc.ngu...@pearson.com wrote:

 I've also tried setting the aforementioned properties using
 System.setProperty() as well as on the command line while submitting the
 job using --conf key=value. All to no success. When I go to the Spark UI
 and click on that particular streaming job and then the Environment tab,
 I can see the properties are correctly set. But regardless of what I've
 tried, the stderr log file on the worker nodes does not roll and
 continues to grow...leading to a crash of the cluster once it claims 100%
 of disk. Has anyone else encountered this? Anyone?



 On Fri, Nov 7, 2014 at 3:35 PM, Nguyen, Duc duc.ngu...@pearson.com
 wrote:

 We are running spark streaming jobs (version 1.1.0). After a sufficient
 amount of time, the stderr file grows until the disk is full at 100% and
 crashes the cluster. I've read this

 https://github.com/apache/spark/pull/895

 and also read this

 http://spark.apache.org/docs/latest/configuration.html#spark-streaming


 So I've tried testing with this in an attempt to get the stderr log file
 to roll.

 sparkConf.set(spark.executor.logs.rolling.strategy, size)
 .set(spark.executor.logs.rolling.size.maxBytes, 1024)
 .set(spark.executor.logs.rolling.maxRetainedFiles, 3)


 Yet it does not roll and continues to grow. Am I missing something
 obvious?


 thanks,
 Duc






-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: Cannot access Spark web UI

2015-02-24 Thread Mukesh Jha
My Hadoop version is Hadoop 2.5.0-cdh5.3.0

From the Driver logs [3] I can see that SparkUI started on a specified
port, also my YARN app tracking URL[1] points to that port which is in turn
getting redirected to the proxy URL[2] which gives me
java.net.BindException: Cannot assign requested address.
If there was a port conflict issue the sparkUI stark will have issues but
that id not the case.
[1] YARN:
application_1424814313649_0006 spark-realtime-MessageStoreWriter SPARK
ciuser root.ciuser RUNNING UNDEFINED 10% http://host21.cloud.com:44648
[2] ProxyURL:
http://host28.cloud.com:8088/proxy/application_1424814313649_0006/
[3] LOGS:
15/02/25 04:25:02 INFO util.Utils: Successfully started service 'SparkUI'
on port 44648.
15/02/25 04:25:02 INFO ui.SparkUI: Started SparkUI at
http://host21.cloud.com:44648
15/02/25 04:25:02 INFO cluster.YarnClusterScheduler: Created
YarnClusterScheduler
15/02/25 04:25:02 INFO netty.NettyBlockTransferService: Server created on
41518

On Wed, Feb 18, 2015 at 3:15 PM, Arush Kharbanda ar...@sigmoidanalytics.com
 wrote:

 It seems like that its not able to get a port it needs are you sure that
 the required port is available. In what logs did you find this error?

 On Wed, Feb 18, 2015 at 2:21 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 The error says Cannot assign requested address. This means that you need
 to use the correct address for one of your network interfaces or 0.0.0.0 to
 accept connections from all interfaces. Can you paste your spark-env.sh
 file and /etc/hosts file.

 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 2:06 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Experts,

 I am running a spark-streaming app inside YARN. I have Spark History
 server running as well (Do we need it running to access UI?).

 The app is running fine as expected but the Spark's web UI is not
 accessible.

 When I try to access the ApplicationMaster of the Yarn application I get
 the below error.

 This looks very similar to
 https://issues.apache.org/jira/browse/SPARK-5837 but instead of 
 java.net.ConnectException:
 Connection refused I am getting java.net.BindException: Cannot assign
 requested address as shown below.

 Please let me know if you have faced / fixed this issue, any help is
 greatly appreciated.


 *Exception*

 HTTP ERROR 500

 Problem accessing /proxy/application_1424161379156_0001/. Reason:

 Cannot assign requested address

 Caused by:

 java.net.BindException: Cannot assign requested address
 at java.net.PlainSocketImpl.socketBind(Native Method)
 at
 java.net.AbstractPlainSocketImpl.bind(AbstractPlainSocketImpl.java:376)
 at java.net.Socket.bind(Socket.java:631)
 at java.net.Socket.init(Socket.java:423)
 at java.net.Socket.init(Socket.java:280)
 at
 org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:80)
 at
 org.apache.commons.httpclient.protocol.DefaultProtocolSocketFactory.createSocket(DefaultProtocolSocketFactory.java:122)
 at
 org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
 at
 org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
 at
 org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
 at
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
 at
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:346)
 at
 org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.proxyLink(WebAppProxyServlet.java:188)
 at
 org.apache.hadoop.yarn.server.webproxy.WebAppProxyServlet.doGet(WebAppProxyServlet.java:345)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
 at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
 at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:511)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1221)
 at
 com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:66)
 at
 com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:900)
 at
 com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:834)
 at
 org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppFilter.doFilter(RMWebAppFilter.java:84)
 at
 com.sun.jersey.spi.container.servlet.ServletContainer.doFilter(ServletContainer.java:795)
 at
 com.google.inject.servlet.FilterDefinition.doFilter(FilterDefinition.java:163)
 at
 com.google.inject.servlet.FilterChainInvocation.doFilter(FilterChainInvocation.java:58)
 at
 com.google.inject.servlet.ManagedFilterPipeline.dispatch(ManagedFilterPipeline.java:118)
 at com.google.inject.servlet.GuiceFilter.doFilter(GuiceFilter.java:113)
 at
 org.mortbay.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1212)
 at
 org.apache.hadoop.http.lib.StaticUserWebFilter$StaticUserFilter.doFilter(StaticUserWebFilter.java:109

Cannot access Spark web UI

2015-02-18 Thread Mukesh Jha
:549)
at org.mortbay.jetty.HttpParser.parseAvailable(HttpParser.java:212)
at org.mortbay.jetty.HttpConnection.handle(HttpConnection.java:404)
at
org.mortbay.io.nio.SelectChannelEndPoint.run(SelectChannelEndPoint.java:410)
at
org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)

Powered by Jetty://

-- 
Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: Spark streaming app shutting down

2015-02-09 Thread Mukesh Jha
Thanks for the info guys.
For now I'm using the high level consumer i will give this one a try.

As far as the queries are concerned, check pointing helps.

I'm still no t sure whats the best way to gracefully stop the application
in yarn cluster mode.
On 5 Feb 2015 09:38, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com
wrote:

 Thanks Akhil for mentioning this Low Level Consumer (
 https://github.com/dibbhatt/kafka-spark-consumer ) . Yes it has better
 fault tolerant mechanism than any existing Kafka consumer available . This
 has no data loss on receiver failure and have ability to reply or restart
 itself in-case of failure. You can definitely give it a try .

 Dibyendu

 On Thu, Feb 5, 2015 at 1:04 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 AFAIK, From Spark 1.2.0 you can have WAL (Write Ahead Logs) for fault
 tolerance, which means it can handle the receiver/driver failures. You can
 also look at the lowlevel kafka consumer
 https://github.com/dibbhatt/kafka-spark-consumer which has a better
 fault tolerance mechanism for receiver failures. This low level consumer
 will push the offset of the message being read into zookeeper for fault
 tolerance. In your case i think mostly the inflight data would be lost if
 you arent using any of the fault tolerance mechanism.

 Thanks
 Best Regards

 On Wed, Feb 4, 2015 at 5:24 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Sprakans,

 I'm running a spark streaming app which reads data from kafka topic does
 some processing and then persists the results in HBase.

 I am using spark 1.2.0 running on Yarn cluster with 3 executors (2gb, 8
 cores each). I've enable checkpointing  I am also  rate limiting my
 kafkaReceivers so that the number of items read is not more than 10 records
 per sec.
 The kafkaReceiver I'm using is *not* ReliableKafkaReceiver.

 This app was running fine for ~3 days then there was an increased load
 on the HBase server because of some other process querying HBase tables.
 This led to increase in the batch processing time of the spark batches
 (processed 1 min batch in 10 min) which previously was finishing in 20 sec
 which in turn led to the shutdown of the spark application, PFA the
 executor logs.

 From the logs I'm getting below exceptions *[1]*  *[2]* looks like
 there was some outstanding Jobs that didn't get processed or the Job
 couldn't find the input data. From the logs it looks seems that the
 shutdown hook gets invoked but it cannot process the in-flight block.

 I have a couple of queries on this
   1) Does this mean that these jobs failed and the *in-flight data *is
 lost?
   2) Does the Spark job *buffers kafka* input data while the Job is
 under processing state for 10 mins and on shutdown is that too lost? (I do
 not see any OOM error in the logs).
   3) Can we have *explicit commits* enabled in the kafkaReceiver so
 that the offsets gets committed only when the RDD(s) get successfully
 processed?

 Also I'd like to know if there is a *graceful way to shutdown a spark
 app running on yarn*. Currently I'm killing the yarn app to stop it
 which leads to loss of that job's history wheras in this case the
 application stops and succeeds and thus preserves the logs  history.

 *[1]* 15/02/02 19:30:11 ERROR client.TransportResponseHandler: Still
 have 1 requests outstanding when connection from
 hbase28.usdc2.cloud.com/10.193.150.221:43189 is closed
 *[2]* java.lang.Exception: Could not compute split, block
 input-2-1422901498800 not found
 *[3]* 
 org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on /tmp/spark/realtime-failover/msg_2378481654720966.avro (inode
 879488): File does not exist. Holder DFSClient_NONMAPREDUCE_-148264920_63
 does not have any open files.

 --
 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2015-01-21 Thread Mukesh Jha
numStreams is 5 in my case.

 ListJavaPairDStreambyte[], byte[] kafkaStreams = new
ArrayList(numStreams);
for (int i = 0; i  numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
topicMap, StorageLevel.MEMORY_ONLY_SER()));
}
JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0),
kafkaStreams);

On Wed, Jan 21, 2015 at 3:19 PM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi Mukesh,

 How are you creating your receivers? Could you post the (relevant) code?

 -kr, Gerard.

 On Wed, Jan 21, 2015 at 9:42 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Guys,

 I've re partitioned my kafkaStream so that it gets evenly distributed
 among the executors and the results are better.
 Still from the executors page it seems that only 1 executors all 8 cores
 are getting used and other executors are using just 1 core.

 Is this the correct interpretation based on the below data? If so how can
 we fix this?

 [image: Inline image 1]

 On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Thats is kind of expected due to data locality. Though you should see
 some tasks running on the executors as the data gets replicated to
 other nodes and can therefore run tasks based on locality. You have
 two solutions

 1. kafkaStream.repartition() to explicitly repartition the received
 data across the cluster.
 2. Create multiple kafka streams and union them together.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch

 On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:
  Thanks Sandy, It was the issue with the no of cores.
 
  Another issue I was facing is that tasks are not getting distributed
 evenly
  among all executors and are running on the NODE_LOCAL locality level
 i.e.
  all the tasks are running on the same executor where my
 kafkareceiver(s) are
  running even though other executors are idle.
 
  I configured spark.locality.wait=50 instead of the default 3000 ms,
 which
  forced the task rebalancing among nodes, let me know if there is a
 better
  way to deal with this.
 
 
  On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com
  wrote:
 
  Makes sense, I've also tries it in standalone mode where all 3
 workers 
  driver were running on the same 8 core box and the results were
 similar.
 
  Anyways I will share the results in YARN mode with 8 core yarn
 containers.
 
  On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com
 
  wrote:
 
  When running in standalone mode, each executor will be able to use
 all 8
  cores on the box.  When running on YARN, each executor will only
 have access
  to 2 cores.  So the comparison doesn't seem fair, no?
 
  -Sandy
 
  On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha 
 me.mukesh@gmail.com
  wrote:
 
  Nope, I am setting 5 executors with 2  cores each. Below is the
 command
  that I'm using to submit in YARN mode. This starts up 5 executor
 nodes and a
  drives as per the spark  application master UI.
 
  spark-submit --master yarn-cluster --num-executors 5 --driver-memory
  1024m --executor-memory 1024m --executor-cores 2 --class
  com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka
  spark-yarn avro 1 5000
 
  On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza 
 sandy.r...@cloudera.com
  wrote:
 
  *oops, I mean are you setting --executor-cores to 8
 
  On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza 
 sandy.r...@cloudera.com
  wrote:
 
  Are you setting --num-executors to 8?
 
  On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha 
 me.mukesh@gmail.com
  wrote:
 
  Sorry Sandy, The command is just for reference but I can confirm
 that
  there are 4 executors and a driver as shown in the spark UI page.
 
  Each of these machines is a 8 core box with ~15G of ram.
 
  On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza
  sandy.r...@cloudera.com wrote:
 
  Hi Mukesh,
 
  Based on your spark-submit command, it looks like you're only
  running with 2 executors on YARN.  Also, how many cores does
 each machine
  have?
 
  -Sandy
 
  On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha
  me.mukesh@gmail.com wrote:
 
  Hello Experts,
  I'm bench-marking Spark on YARN
  (https://spark.apache.org/docs/latest/running-on-yarn.html)
 vs a standalone
  spark cluster (
 https://spark.apache.org/docs/latest/spark-standalone.html).
  I have a standalone cluster with 3 executors, and a spark app
  running on yarn with 4 executors as shown below.
 
  The spark job running inside yarn is 10x slower than the one
  running on the standalone cluster (even though the yarn has
 more number of
  workers), also in both the case all the executors are in the
 same datacenter
  so there shouldn't be any latency. On YARN each 5sec batch is
 reading data
  from kafka and processing it in 5sec  on the standalone
 cluster each 5sec

Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2015-01-21 Thread Mukesh Jha
Hello Guys,

I've re partitioned my kafkaStream so that it gets evenly distributed among
the executors and the results are better.
Still from the executors page it seems that only 1 executors all 8 cores
are getting used and other executors are using just 1 core.

Is this the correct interpretation based on the below data? If so how can
we fix this?

[image: Inline image 1]

On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Thats is kind of expected due to data locality. Though you should see
 some tasks running on the executors as the data gets replicated to
 other nodes and can therefore run tasks based on locality. You have
 two solutions

 1. kafkaStream.repartition() to explicitly repartition the received
 data across the cluster.
 2. Create multiple kafka streams and union them together.

 See
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch

 On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:
  Thanks Sandy, It was the issue with the no of cores.
 
  Another issue I was facing is that tasks are not getting distributed
 evenly
  among all executors and are running on the NODE_LOCAL locality level i.e.
  all the tasks are running on the same executor where my kafkareceiver(s)
 are
  running even though other executors are idle.
 
  I configured spark.locality.wait=50 instead of the default 3000 ms, which
  forced the task rebalancing among nodes, let me know if there is a better
  way to deal with this.
 
 
  On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com
  wrote:
 
  Makes sense, I've also tries it in standalone mode where all 3 workers 
  driver were running on the same 8 core box and the results were similar.
 
  Anyways I will share the results in YARN mode with 8 core yarn
 containers.
 
  On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com
  wrote:
 
  When running in standalone mode, each executor will be able to use all
 8
  cores on the box.  When running on YARN, each executor will only have
 access
  to 2 cores.  So the comparison doesn't seem fair, no?
 
  -Sandy
 
  On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com
  wrote:
 
  Nope, I am setting 5 executors with 2  cores each. Below is the
 command
  that I'm using to submit in YARN mode. This starts up 5 executor
 nodes and a
  drives as per the spark  application master UI.
 
  spark-submit --master yarn-cluster --num-executors 5 --driver-memory
  1024m --executor-memory 1024m --executor-cores 2 --class
  com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka
  spark-yarn avro 1 5000
 
  On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com
 
  wrote:
 
  *oops, I mean are you setting --executor-cores to 8
 
  On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza 
 sandy.r...@cloudera.com
  wrote:
 
  Are you setting --num-executors to 8?
 
  On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha 
 me.mukesh@gmail.com
  wrote:
 
  Sorry Sandy, The command is just for reference but I can confirm
 that
  there are 4 executors and a driver as shown in the spark UI page.
 
  Each of these machines is a 8 core box with ~15G of ram.
 
  On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza
  sandy.r...@cloudera.com wrote:
 
  Hi Mukesh,
 
  Based on your spark-submit command, it looks like you're only
  running with 2 executors on YARN.  Also, how many cores does each
 machine
  have?
 
  -Sandy
 
  On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha
  me.mukesh@gmail.com wrote:
 
  Hello Experts,
  I'm bench-marking Spark on YARN
  (https://spark.apache.org/docs/latest/running-on-yarn.html) vs
 a standalone
  spark cluster (
 https://spark.apache.org/docs/latest/spark-standalone.html).
  I have a standalone cluster with 3 executors, and a spark app
  running on yarn with 4 executors as shown below.
 
  The spark job running inside yarn is 10x slower than the one
  running on the standalone cluster (even though the yarn has more
 number of
  workers), also in both the case all the executors are in the
 same datacenter
  so there shouldn't be any latency. On YARN each 5sec batch is
 reading data
  from kafka and processing it in 5sec  on the standalone cluster
 each 5sec
  batch is getting processed in 0.4sec.
  Also, In YARN mode all the executors are not getting used up
 evenly
  as vm-13  vm-14 are running most of the tasks whereas in the
 standalone
  mode all the executors are running the tasks.
 
  Do I need to set up some configuration to evenly distribute the
  tasks? Also do you have any pointers on the reasons the yarn job
 is 10x
  slower than the standalone job?
  Any suggestion is greatly appreciated, Thanks in advance.
 
  YARN(5 workers + driver)
  
  Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT
 Input
  ShuffleRead ShuffleWrite Thread Dump
  1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms

Re: SPARKonYARN failing on CDH 5.3.0 : container cannot be fetched because of NumberFormatException

2015-01-09 Thread Mukesh Jha
I am using pre built *spark-1.2.0-bin-hadoop2.4* from *[1] *to submit spark
applications to yarn, I cannot find the pre built spark for *CDH-5.x*
versions. So, In my case the org.apache.hadoop.yarn.util.ConverterUtils class
is coming from the spark-assembly-1.1.0-hadoop2.4.0.jar which is part of
the pre built spark and hence causing this issue.

How / where can I get spark 1.2.0 built for CDH-5.3.0, Icheck in maven repo
etc with no luck.

*[1]* https://spark.apache.org/downloads.html

On Fri, Jan 9, 2015 at 1:12 AM, Marcelo Vanzin van...@cloudera.com wrote:

 Just to add to Sandy's comment, check your client configuration
 (generally in /etc/spark/conf). If you're using CM, you may need to
 run the Deploy Client Configuration command on the cluster to update
 the configs to match the new version of CDH.

 On Thu, Jan 8, 2015 at 11:38 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:
  Hi Mukesh,
 
  Those line numbers in ConverterUtils in the stack trace don't appear to
 line
  up with CDH 5.3:
 
 https://github.com/cloudera/hadoop-common/blob/cdh5-2.5.0_5.3.0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ConverterUtils.java
 
  Is it possible you're still including the old jars on the classpath in
 some
  way?
 
  -Sandy
 
  On Thu, Jan 8, 2015 at 3:38 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:
 
  Hi Experts,
 
  I am running spark inside YARN job.
 
  The spark-streaming job is running fine in CDH-5.0.0 but after the
 upgrade
  to 5.3.0 it cannot fetch containers with the below errors. Looks like
 the
  container id is incorrect and a string is present in a pace where it's
  expecting a number.
 
 
 
  java.lang.IllegalArgumentException: Invalid ContainerId:
  container_e01_1420481081140_0006_01_01
 
  Caused by: java.lang.NumberFormatException: For input string: e01
 
 
 
  Is this a bug?? Did you face something similar and any ideas how to fix
  this?
 
 
 
  15/01/08 09:50:28 INFO yarn.ApplicationMaster: Registered signal
 handlers
  for [TERM, HUP, INT]
 
  15/01/08 09:50:29 ERROR yarn.ApplicationMaster: Uncaught exception:
 
  java.lang.IllegalArgumentException: Invalid ContainerId:
  container_e01_1420481081140_0006_01_01
 
  at
 
 org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182)
 
  at
 
 org.apache.spark.deploy.yarn.YarnRMClientImpl.getAttemptId(YarnRMClientImpl.scala:79)
 
  at
 
 org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:79)
 
  at
 
 org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:515)
 
  at
 
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
 
  at
 
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)
 
  at java.security.AccessController.doPrivileged(Native Method)
 
  at javax.security.auth.Subject.doAs(Subject.java:415)
 
  at
 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
 
  at
 
 org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
 
  at
 
 org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:513)
 
  at
 
 org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
 
  Caused by: java.lang.NumberFormatException: For input string: e01
 
  at
 
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
 
  at java.lang.Long.parseLong(Long.java:441)
 
  at java.lang.Long.parseLong(Long.java:483)
 
  at
 
 org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137)
 
  at
 
 org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177)
 
  ... 11 more
 
  15/01/08 09:50:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
  exitCode: 10, (reason: Uncaught exception: Invalid ContainerId:
  container_e01_1420481081140_0006_01_01)
 
 
  --
  Thanks  Regards,
 
  Mukesh Jha
 
 



 --
 Marcelo




-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


SPARKonYARN failing on CDH 5.3.0 : container cannot be fetched because of NumberFormatException

2015-01-08 Thread Mukesh Jha
Hi Experts,

I am running spark inside YARN job.

The spark-streaming job is running fine in CDH-5.0.0 but after the upgrade
to 5.3.0 it cannot fetch containers with the below errors. Looks like the
container id is incorrect and a string is present in a pace where it's
expecting a number.



java.lang.IllegalArgumentException: Invalid ContainerId:
container_e01_1420481081140_0006_01_01

Caused by: java.lang.NumberFormatException: For input string: e01



Is this a bug?? Did you face something similar and any ideas how to fix
this?



15/01/08 09:50:28 INFO yarn.ApplicationMaster: Registered signal handlers
for [TERM, HUP, INT]

15/01/08 09:50:29 ERROR yarn.ApplicationMaster: Uncaught exception:

java.lang.IllegalArgumentException: Invalid ContainerId:
container_e01_1420481081140_0006_01_01

at
org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182)

at
org.apache.spark.deploy.yarn.YarnRMClientImpl.getAttemptId(YarnRMClientImpl.scala:79)

at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:79)

at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:515)

at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)

at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:415)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)

at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)

at
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:513)

at
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)

Caused by: java.lang.NumberFormatException: For input string: e01

at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

at java.lang.Long.parseLong(Long.java:441)

at java.lang.Long.parseLong(Long.java:483)

at
org.apache.hadoop.yarn.util.ConverterUtils.toApplicationAttemptId(ConverterUtils.java:137)

at
org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:177)

... 11 more

15/01/08 09:50:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 10, (reason: Uncaught exception: Invalid ContainerId:
container_e01_1420481081140_0006_01_01)

-- 
Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


KafkaUtils not consuming all the data from all partitions

2015-01-07 Thread Mukesh Jha
Hi Guys,

I have a kafka topic having 90 partitions and I running
SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
kafka-receivers.

My streaming is running fine and there is no delay in processing, just that
some partitions data is never getting picked up. From the kafka console I
can see that each receiver is consuming data from 9 partitions but the lag
for some offsets keeps on increasing.

Below is my kafka-consumers parameters.

Any of you have face this kind of issue, if so then do you have any
pointers to fix it?

MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
kafkaConf.put(group.id, kafkaConsumerGroup);
kafkaConf.put(consumer.timeout.ms, 3);
kafkaConf.put(auto.offset.reset, largest);
kafkaConf.put(fetch.message.max.bytes, 2000);
kafkaConf.put(zookeeper.session.timeout.ms, 6000);
kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
kafkaConf.put(zookeeper.sync.time.ms, 2000);
kafkaConf.put(rebalance.backoff.ms, 1);
kafkaConf.put(rebalance.max.retries, 20);

-- 
Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: KafkaUtils not consuming all the data from all partitions

2015-01-07 Thread Mukesh Jha
I understand that I've to create 10 parallel streams. My code is running
fine when the no of partitions is ~20, but when I increase the no of
partitions I keep getting in this issue.

Below is my code to create kafka streams, along with the configs used.

MapString, String kafkaConf = new HashMapString, String();
kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
kafkaConf.put(group.id, kafkaConsumerGroup);
kafkaConf.put(consumer.timeout.ms, 3);
kafkaConf.put(auto.offset.reset, largest);
kafkaConf.put(fetch.message.max.bytes, 2000);
kafkaConf.put(zookeeper.session.timeout.ms, 6000);
kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
kafkaConf.put(zookeeper.sync.time.ms, 2000);
kafkaConf.put(rebalance.backoff.ms, 1);
kafkaConf.put(rebalance.max.retries, 20);
String[] topics = kafkaTopicsList;
int numStreams = numKafkaThreads; // this is *10*
MapString, Integer topicMap = new HashMap();
for (String topic: topics) {
  topicMap.put(topic, numStreams);
}

ListJavaPairDStreambyte[], byte[] kafkaStreams = new
ArrayList(numStreams);
for (int i = 0; i  numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
topicMap, StorageLevel.MEMORY_ONLY_SER()));
}
JavaPairDStreambyte[], byte[] ks = sc.union(kafkaStreams.remove(0),
kafkaStreams);


On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi,

 Could you add the code where you create the Kafka consumer?

 -kr, Gerard.

 On Wed, Jan 7, 2015 at 3:43 PM, francois.garil...@typesafe.com wrote:

 Hi Mukesh,

 If my understanding is correct, each Stream only has a single Receiver.
 So, if you have each receiver consuming 9 partitions, you need 10 input
 DStreams to create 10 concurrent receivers:


 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 Would you mind sharing a bit more on how you achieve this ?

 --
 FG


 On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hi Guys,

 I have a kafka topic having 90 partitions and I running
 SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10
 kafka-receivers.

 My streaming is running fine and there is no delay in processing, just
 that some partitions data is never getting picked up. From the kafka
 console I can see that each receiver is consuming data from 9 partitions
 but the lag for some offsets keeps on increasing.

 Below is my kafka-consumers parameters.

 Any of you have face this kind of issue, if so then do you have any
 pointers to fix it?

  MapString, String kafkaConf = new HashMapString, String();
  kafkaConf.put(zookeeper.connect, kafkaZkQuorum);
  kafkaConf.put(group.id, kafkaConsumerGroup);
  kafkaConf.put(consumer.timeout.ms, 3);
  kafkaConf.put(auto.offset.reset, largest);
  kafkaConf.put(fetch.message.max.bytes, 2000);
  kafkaConf.put(zookeeper.session.timeout.ms, 6000);
  kafkaConf.put(zookeeper.connection.timeout.ms, 6000);
  kafkaConf.put(zookeeper.sync.time.ms, 2000);
  kafkaConf.put(rebalance.backoff.ms, 1);
  kafkaConf.put(rebalance.max.retries, 20);

 --
 Thanks  Regards,

 Mukesh Jha me.mukesh@gmail.com






-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2014-12-30 Thread Mukesh Jha
Thanks Sandy, It was the issue with the no of cores.

Another issue I was facing is that tasks are not getting distributed evenly
among all executors and are running on the NODE_LOCAL locality level i.e.
all the tasks are running on the same executor where my kafkareceiver(s)
are running even though other executors are idle.

I configured *spark.locality.wait=50* instead of the default 3000 ms, which
forced the task rebalancing among nodes, let me know if there is a better
way to deal with this.


On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha me.mukesh@gmail.com
wrote:

 Makes sense, I've also tries it in standalone mode where all 3 workers 
 driver were running on the same 8 core box and the results were similar.

 Anyways I will share the results in YARN mode with 8 core yarn containers.

 On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 When running in standalone mode, each executor will be able to use all 8
 cores on the box.  When running on YARN, each executor will only have
 access to 2 cores.  So the comparison doesn't seem fair, no?

 -Sandy

 On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Nope, I am setting 5 executors with 2  cores each. Below is the command
 that I'm using to submit in YARN mode. This starts up 5 executor nodes and
 a drives as per the spark  application master UI.

 spark-submit --master yarn-cluster --num-executors 5 --driver-memory
 1024m --executor-memory 1024m --executor-cores 2 --class
 com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka
  spark-yarn avro 1 5000

 On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 *oops, I mean are you setting --executor-cores to 8

 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Are you setting --num-executors to 8?

 On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Sorry Sandy, The command is just for reference but I can confirm that
 there are 4 executors and a driver as shown in the spark UI page.

 Each of these machines is a 8 core box with ~15G of ram.

 On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com
  wrote:

 Hi Mukesh,

 Based on your spark-submit command, it looks like you're only
 running with 2 executors on YARN.  Also, how many cores does each 
 machine
 have?

 -Sandy

 On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com
  wrote:

 Hello Experts,
 I'm bench-marking Spark on YARN (
 https://spark.apache.org/docs/latest/running-on-yarn.html) vs a
 standalone spark cluster (
 https://spark.apache.org/docs/latest/spark-standalone.html).
 I have a standalone cluster with 3 executors, and a spark app
 running on yarn with 4 executors as shown below.

 The spark job running inside yarn is 10x slower than the one
 running on the standalone cluster (even though the yarn has more 
 number of
 workers), also in both the case all the executors are in the same
 datacenter so there shouldn't be any latency. On YARN each 5sec batch 
 is
 reading data from kafka and processing it in 5sec  on the standalone
 cluster each 5sec batch is getting processed in 0.4sec.
 Also, In YARN mode all the executors are not getting used up evenly
 as vm-13  vm-14 are running most of the tasks whereas in the 
 standalone
 mode all the executors are running the tasks.

 Do I need to set up some configuration to evenly distribute the
 tasks? Also do you have any pointers on the reasons the yarn job is 10x
 slower than the standalone job?
 Any suggestion is greatly appreciated, Thanks in advance.

 YARN(5 workers + driver)
 
 Executor ID Address RDD Blocks Memory Used DU  AT FT CT TT TT Input
 ShuffleRead ShuffleWrite Thread Dump
 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0
 B 2047.0 B 1710.0 B Thread Dump
 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m 0.0
 B 0.0 B 0.0 B Thread Dump
 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m 0.0
 B 1368.0 B 2.8 KB Thread Dump
 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0
 B 1368.0 B 1026.0 B Thread Dump
 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B 
 1881.0
 B 2.8 KB Thread Dump
 driver vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0
 B 0.0 B 0.0 B Thread Dump

 /homext/spark/bin/spark-submit
 --master yarn-cluster --num-executors 2 --driver-memory 512m
 --executor-memory 512m --executor-cores 2
 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka spark-yarn avro 1 5000

 STANDALONE(3 workers + driver)
 ==
 Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input 
 ShuffleRead
 ShuffleWrite Thread Dump
 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069 6.0 m 0.0
 B 1534.0 B 3.0 KB Thread Dump
 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057 5.9 m 0.0
 B 1368.0 B 4.0 KB Thread Dump
 2 vm-73

Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2014-12-29 Thread Mukesh Jha
Sorry Sandy, The command is just for reference but I can confirm that there
are 4 executors and a driver as shown in the spark UI page.

Each of these machines is a 8 core box with ~15G of ram.

On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 Hi Mukesh,

 Based on your spark-submit command, it looks like you're only running with
 2 executors on YARN.  Also, how many cores does each machine have?

 -Sandy

 On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Experts,
 I'm bench-marking Spark on YARN (
 https://spark.apache.org/docs/latest/running-on-yarn.html) vs a
 standalone spark cluster (
 https://spark.apache.org/docs/latest/spark-standalone.html).
 I have a standalone cluster with 3 executors, and a spark app running on
 yarn with 4 executors as shown below.

 The spark job running inside yarn is 10x slower than the one running on
 the standalone cluster (even though the yarn has more number of workers),
 also in both the case all the executors are in the same datacenter so there
 shouldn't be any latency. On YARN each 5sec batch is reading data from
 kafka and processing it in 5sec  on the standalone cluster each 5sec batch
 is getting processed in 0.4sec.
 Also, In YARN mode all the executors are not getting used up evenly as
 vm-13  vm-14 are running most of the tasks whereas in the standalone mode
 all the executors are running the tasks.

 Do I need to set up some configuration to evenly distribute the tasks?
 Also do you have any pointers on the reasons the yarn job is 10x slower
 than the standalone job?
 Any suggestion is greatly appreciated, Thanks in advance.

 YARN(5 workers + driver)
 
 Executor ID Address RDD Blocks Memory Used DU  AT FT CT TT TT Input 
 ShuffleRead
 ShuffleWrite Thread Dump
 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0 B 2047.0
 B 1710.0 B Thread Dump
 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m 0.0 B 0.0
 B 0.0 B Thread Dump
 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m 0.0 B 1368.0
 B 2.8 KB Thread Dump
 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0 B 1368.0
 B 1026.0 B Thread Dump
 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B 1881.0
 B 2.8 KB Thread Dump
 driver vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0
 B 0.0 B Thread Dump

 /homext/spark/bin/spark-submit
 --master yarn-cluster --num-executors 2 --driver-memory 512m
 --executor-memory 512m --executor-cores 2
 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka spark-yarn avro 1 5000

 STANDALONE(3 workers + driver)
 ==
 Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input 
 ShuffleRead
 ShuffleWrite Thread Dump
 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069 6.0 m 0.0 B 1534.0
 B 3.0 KB Thread Dump
 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057 5.9 m 0.0 B 1368.0
 B 4.0 KB Thread Dump
 2 vm-73.cloud.com:37621 0 0.0B/265.0MB 0.0 B 1 0 1059 1060 5.9 m 0.0 B 2.0
 KB 1368.0 B Thread Dump
 driver vm.cloud.com:58299 0 0.0B/265.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0
 B 0.0 B Thread Dump

 /homext/spark/bin/spark-submit
 --master spark://chsnmvproc71vm3.usdc2.oraclecloud.com:7077
 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka spark-standalone avro 1 5000

 PS: I did go through the spark website and
 http://www.virdata.com/tuning-spark/, but was out of any luck.

 --
 Cheers,
 Mukesh Jha





-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2014-12-29 Thread Mukesh Jha
And this is with spark version 1.2.0.

On Mon, Dec 29, 2014 at 11:43 PM, Mukesh Jha me.mukesh@gmail.com
wrote:

 Sorry Sandy, The command is just for reference but I can confirm that
 there are 4 executors and a driver as shown in the spark UI page.

 Each of these machines is a 8 core box with ~15G of ram.

 On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Mukesh,

 Based on your spark-submit command, it looks like you're only running
 with 2 executors on YARN.  Also, how many cores does each machine have?

 -Sandy

 On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Experts,
 I'm bench-marking Spark on YARN (
 https://spark.apache.org/docs/latest/running-on-yarn.html) vs a
 standalone spark cluster (
 https://spark.apache.org/docs/latest/spark-standalone.html).
 I have a standalone cluster with 3 executors, and a spark app running on
 yarn with 4 executors as shown below.

 The spark job running inside yarn is 10x slower than the one running on
 the standalone cluster (even though the yarn has more number of workers),
 also in both the case all the executors are in the same datacenter so there
 shouldn't be any latency. On YARN each 5sec batch is reading data from
 kafka and processing it in 5sec  on the standalone cluster each 5sec batch
 is getting processed in 0.4sec.
 Also, In YARN mode all the executors are not getting used up evenly as
 vm-13  vm-14 are running most of the tasks whereas in the standalone mode
 all the executors are running the tasks.

 Do I need to set up some configuration to evenly distribute the tasks?
 Also do you have any pointers on the reasons the yarn job is 10x slower
 than the standalone job?
 Any suggestion is greatly appreciated, Thanks in advance.

 YARN(5 workers + driver)
 
 Executor ID Address RDD Blocks Memory Used DU  AT FT CT TT TT Input 
 ShuffleRead
 ShuffleWrite Thread Dump
 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0 B 2047.0
 B 1710.0 B Thread Dump
 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m 0.0 B 0.0
 B 0.0 B Thread Dump
 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m 0.0 B 
 1368.0
 B 2.8 KB Thread Dump
 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0 B 1368.0
 B 1026.0 B Thread Dump
 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B 1881.0
 B 2.8 KB Thread Dump
 driver vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0
 B 0.0 B Thread Dump

 /homext/spark/bin/spark-submit
 --master yarn-cluster --num-executors 2 --driver-memory 512m
 --executor-memory 512m --executor-cores 2
 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka spark-yarn avro 1 5000

 STANDALONE(3 workers + driver)
 ==
 Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input 
 ShuffleRead
 ShuffleWrite Thread Dump
 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069 6.0 m 0.0 B 
 1534.0
 B 3.0 KB Thread Dump
 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057 5.9 m 0.0 B 
 1368.0
 B 4.0 KB Thread Dump
 2 vm-73.cloud.com:37621 0 0.0B/265.0MB 0.0 B 1 0 1059 1060 5.9 m 0.0 B 2.0
 KB 1368.0 B Thread Dump
 driver vm.cloud.com:58299 0 0.0B/265.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0
 B 0.0 B Thread Dump

 /homext/spark/bin/spark-submit
 --master spark://chsnmvproc71vm3.usdc2.oraclecloud.com:7077
 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka spark-standalone avro 1 5000

 PS: I did go through the spark website and
 http://www.virdata.com/tuning-spark/, but was out of any luck.

 --
 Cheers,
 Mukesh Jha





 --


 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*




-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2014-12-29 Thread Mukesh Jha
Nope, I am setting 5 executors with 2  cores each. Below is the command
that I'm using to submit in YARN mode. This starts up 5 executor nodes and
a drives as per the spark  application master UI.

spark-submit --master yarn-cluster --num-executors 5 --driver-memory 1024m
--executor-memory 1024m --executor-cores 2 --class com.oracle.ci.CmsgK2H
/homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka spark-yarn avro 1 5000

On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 *oops, I mean are you setting --executor-cores to 8

 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Are you setting --num-executors to 8?

 On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Sorry Sandy, The command is just for reference but I can confirm that
 there are 4 executors and a driver as shown in the spark UI page.

 Each of these machines is a 8 core box with ~15G of ram.

 On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Mukesh,

 Based on your spark-submit command, it looks like you're only running
 with 2 executors on YARN.  Also, how many cores does each machine have?

 -Sandy

 On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Experts,
 I'm bench-marking Spark on YARN (
 https://spark.apache.org/docs/latest/running-on-yarn.html) vs a
 standalone spark cluster (
 https://spark.apache.org/docs/latest/spark-standalone.html).
 I have a standalone cluster with 3 executors, and a spark app running
 on yarn with 4 executors as shown below.

 The spark job running inside yarn is 10x slower than the one running
 on the standalone cluster (even though the yarn has more number of
 workers), also in both the case all the executors are in the same
 datacenter so there shouldn't be any latency. On YARN each 5sec batch is
 reading data from kafka and processing it in 5sec  on the standalone
 cluster each 5sec batch is getting processed in 0.4sec.
 Also, In YARN mode all the executors are not getting used up evenly as
 vm-13  vm-14 are running most of the tasks whereas in the standalone mode
 all the executors are running the tasks.

 Do I need to set up some configuration to evenly distribute the tasks?
 Also do you have any pointers on the reasons the yarn job is 10x slower
 than the standalone job?
 Any suggestion is greatly appreciated, Thanks in advance.

 YARN(5 workers + driver)
 
 Executor ID Address RDD Blocks Memory Used DU  AT FT CT TT TT Input 
 ShuffleRead
 ShuffleWrite Thread Dump
 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0 B 2047.0
 B 1710.0 B Thread Dump
 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m 0.0
 B 0.0 B 0.0 B Thread Dump
 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m 0.0
 B 1368.0 B 2.8 KB Thread Dump
 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0 B 1368.0
 B 1026.0 B Thread Dump
 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B 1881.0
 B 2.8 KB Thread Dump
 driver vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0
 B 0.0 B Thread Dump

 /homext/spark/bin/spark-submit
 --master yarn-cluster --num-executors 2 --driver-memory 512m
 --executor-memory 512m --executor-cores 2
 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka spark-yarn avro 1 5000

 STANDALONE(3 workers + driver)
 ==
 Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input 
 ShuffleRead
 ShuffleWrite Thread Dump
 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069 6.0 m 0.0
 B 1534.0 B 3.0 KB Thread Dump
 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057 5.9 m 0.0
 B 1368.0 B 4.0 KB Thread Dump
 2 vm-73.cloud.com:37621 0 0.0B/265.0MB 0.0 B 1 0 1059 1060 5.9 m 0.0
 B 2.0 KB 1368.0 B Thread Dump
 driver vm.cloud.com:58299 0 0.0B/265.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0
 B 0.0 B Thread Dump

 /homext/spark/bin/spark-submit
 --master spark://chsnmvproc71vm3.usdc2.oraclecloud.com:7077
 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka spark-standalone avro 1 5000

 PS: I did go through the spark website and
 http://www.virdata.com/tuning-spark/, but was out of any luck.

 --
 Cheers,
 Mukesh Jha





 --


 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*






-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: SPARK-streaming app running 10x slower on YARN vs STANDALONE cluster

2014-12-29 Thread Mukesh Jha
Makes sense, I've also tries it in standalone mode where all 3 workers 
driver were running on the same 8 core box and the results were similar.

Anyways I will share the results in YARN mode with 8 core yarn containers.

On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza sandy.r...@cloudera.com
wrote:

 When running in standalone mode, each executor will be able to use all 8
 cores on the box.  When running on YARN, each executor will only have
 access to 2 cores.  So the comparison doesn't seem fair, no?

 -Sandy

 On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Nope, I am setting 5 executors with 2  cores each. Below is the command
 that I'm using to submit in YARN mode. This starts up 5 executor nodes and
 a drives as per the spark  application master UI.

 spark-submit --master yarn-cluster --num-executors 5 --driver-memory
 1024m --executor-memory 1024m --executor-cores 2 --class
 com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar vm.cloud.com:2181/kafka
  spark-yarn avro 1 5000

 On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 *oops, I mean are you setting --executor-cores to 8

 On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Are you setting --num-executors to 8?

 On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Sorry Sandy, The command is just for reference but I can confirm that
 there are 4 executors and a driver as shown in the spark UI page.

 Each of these machines is a 8 core box with ~15G of ram.

 On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza sandy.r...@cloudera.com
 wrote:

 Hi Mukesh,

 Based on your spark-submit command, it looks like you're only running
 with 2 executors on YARN.  Also, how many cores does each machine have?

 -Sandy

 On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha me.mukesh@gmail.com
 wrote:

 Hello Experts,
 I'm bench-marking Spark on YARN (
 https://spark.apache.org/docs/latest/running-on-yarn.html) vs a
 standalone spark cluster (
 https://spark.apache.org/docs/latest/spark-standalone.html).
 I have a standalone cluster with 3 executors, and a spark app
 running on yarn with 4 executors as shown below.

 The spark job running inside yarn is 10x slower than the one running
 on the standalone cluster (even though the yarn has more number of
 workers), also in both the case all the executors are in the same
 datacenter so there shouldn't be any latency. On YARN each 5sec batch is
 reading data from kafka and processing it in 5sec  on the standalone
 cluster each 5sec batch is getting processed in 0.4sec.
 Also, In YARN mode all the executors are not getting used up evenly
 as vm-13  vm-14 are running most of the tasks whereas in the standalone
 mode all the executors are running the tasks.

 Do I need to set up some configuration to evenly distribute the
 tasks? Also do you have any pointers on the reasons the yarn job is 10x
 slower than the standalone job?
 Any suggestion is greatly appreciated, Thanks in advance.

 YARN(5 workers + driver)
 
 Executor ID Address RDD Blocks Memory Used DU  AT FT CT TT TT Input 
 ShuffleRead
 ShuffleWrite Thread Dump
 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms 0.0 B 
 2047.0
 B 1710.0 B Thread Dump
 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 5.5 m 0.0
 B 0.0 B 0.0 B Thread Dump
 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 5.2 m 0.0
 B 1368.0 B 2.8 KB Thread Dump
 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms 0.0 B 
 1368.0
 B 1026.0 B Thread Dump
 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms 0.0 B 
 1881.0
 B 2.8 KB Thread Dump
 driver vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0
 B 0.0 B Thread Dump

 /homext/spark/bin/spark-submit
 --master yarn-cluster --num-executors 2 --driver-memory 512m
 --executor-memory 512m --executor-cores 2
 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka spark-yarn avro 1 5000

 STANDALONE(3 workers + driver)
 ==
 Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT Input 
 ShuffleRead
 ShuffleWrite Thread Dump
 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069 6.0 m 0.0
 B 1534.0 B 3.0 KB Thread Dump
 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057 5.9 m 0.0
 B 1368.0 B 4.0 KB Thread Dump
 2 vm-73.cloud.com:37621 0 0.0B/265.0MB 0.0 B 1 0 1059 1060 5.9 m 0.0
 B 2.0 KB 1368.0 B Thread Dump
 driver vm.cloud.com:58299 0 0.0B/265.0MB 0.0 B 0 0 0 0 0 ms 0.0 B 0.0
 B 0.0 B Thread Dump

 /homext/spark/bin/spark-submit
 --master spark://chsnmvproc71vm3.usdc2.oraclecloud.com:7077
 --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
 vm.cloud.com:2181/kafka spark-standalone avro 1 5000

 PS: I did go through the spark website and
 http://www.virdata.com/tuning-spark/, but was out of any luck.

 --
 Cheers,
 Mukesh Jha





 --


 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com

Re: KafkaUtils explicit acks

2014-12-16 Thread Mukesh Jha
I agree that this is not a trivial task as in this approach the kafka ack's
will be done by the SparkTasks that means a plug-able mean to ack your
input data source i.e. changes in core.

From my limited experience with Kafka + Spark what I've seem is If spark
tasks takes longer time than the batch interval the next batch waits for
the previous one to finish, so I was wondering if offset management can be
done by spark too.

I'm just trying to figure out if this seems to be a worthwhile addition to
have?

On Mon, Dec 15, 2014 at 11:39 AM, Shao, Saisai saisai.s...@intel.com
wrote:

  Hi,



 It is not a trivial work to acknowledge the offsets when RDD is fully
 processed, I think from my understanding only modify the KafakUtils is not
 enough to meet your requirement, you need to add a metadata management
 stuff for each block/RDD, and track them both in executor-driver side, and
 many other things should also be taken care J.



 Thanks

 Jerry



 *From:* mukh@gmail.com [mailto:mukh@gmail.com] *On Behalf Of *Mukesh
 Jha
 *Sent:* Monday, December 15, 2014 1:31 PM
 *To:* Tathagata Das
 *Cc:* francois.garil...@typesafe.com; user@spark.apache.org
 *Subject:* Re: KafkaUtils explicit acks



 Thanks TD  Francois for the explanation  documentation. I'm curious if
 we have any performance benchmark with  without WAL for
 spark-streaming-kafka.



 Also In spark-streaming-kafka (as kafka provides a way to acknowledge
 logs) on top of WAL can we modify KafkaUtils to acknowledge the offsets
 only when the RRDs are fully processed and are getting evicted out of the
 Spark memory thus we can be cent percent sure that all the records are
 getting processed in the system.

 I was thinking if it's good to have the kafka offset information of each
 batch as part of RDDs metadata and commit the offsets once the RDDs lineage
 is complete.



 On Thu, Dec 11, 2014 at 6:26 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 I am updating the docs right now. Here is a staged copy that you can
 have sneak peek of. This will be part of the Spark 1.2.


 http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html

 The updated fault-tolerance section tries to simplify the explanation
 of when and what data can be lost, and how to prevent that using the
 new experimental feature of write ahead logs.
 Any feedback will be much appreciated.

 TD


 On Wed, Dec 10, 2014 at 2:42 AM,  francois.garil...@typesafe.com wrote:
  [sorry for the botched half-message]
 
  Hi Mukesh,
 
  There's been some great work on Spark Streaming reliability lately.
  https://www.youtube.com/watch?v=jcJq3ZalXD8
  Look at the links from:
  https://issues.apache.org/jira/browse/SPARK-3129
 
  I'm not aware of any doc yet (did I miss something ?) but you can look at
  the ReliableKafkaReceiver's test suite:
 
 
 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
 
  --
  FG
 
 
  On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha me.mukesh@gmail.com
  wrote:
 
  Hello Guys,
 
  Any insights on this??
  If I'm not clear enough my question is how can I use kafka consumer and
  not loose any data in cases of failures with spark-streaming.
 
  On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha me.mukesh@gmail.com
  wrote:
 
  Hello Experts,
 
  I'm working on a spark app which reads data from kafka  persists it in
  hbase.
 
  Spark documentation states the below [1] that in case of worker failure
  we can loose some data. If not how can I make my kafka stream more
 reliable?
  I have seen there is a simple consumer [2] but I'm not sure if it has
  been used/tested extensively.
 
  I was wondering if there is a way to explicitly acknowledge the kafka
  offsets once they are replicated in memory of other worker nodes (if
 it's
  not already done) to tackle this issue.
 
  Any help is appreciated in advance.
 
 
  Using any input source that receives data through a network - For
  network-based data sources like Kafka and Flume, the received input
 data is
  replicated in memory between nodes of the cluster (default replication
  factor is 2). So if a worker node fails, then the system can recompute
 the
  lost from the the left over copy of the input data. However, if the
 worker
  node where a network receiver was running fails, then a tiny bit of
 data may
  be lost, that is, the data received by the system but not yet
 replicated to
  other node(s). The receiver will be started on a different node and it
 will
  continue to receive data.
  https://github.com/dibbhatt/kafka-spark-consumer
 
  Txz,
 
  Mukesh Jha
 
 
 
 
  --
 
 
  Thanks  Regards,
 
  Mukesh Jha
 
 




 --





 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*



-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: KafkaUtils explicit acks

2014-12-14 Thread Mukesh Jha
Thanks TD  Francois for the explanation  documentation. I'm curious if we
have any performance benchmark with  without WAL for spark-streaming-kafka.

Also In spark-streaming-kafka (as kafka provides a way to acknowledge logs)
on top of WAL can we modify KafkaUtils to acknowledge the offsets only when
the RRDs are fully processed and are getting evicted out of the Spark
memory thus we can be cent percent sure that all the records are getting
processed in the system.
I was thinking if it's good to have the kafka offset information of each
batch as part of RDDs metadata and commit the offsets once the RDDs lineage
is complete.

On Thu, Dec 11, 2014 at 6:26 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 I am updating the docs right now. Here is a staged copy that you can
 have sneak peek of. This will be part of the Spark 1.2.


 http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html

 The updated fault-tolerance section tries to simplify the explanation
 of when and what data can be lost, and how to prevent that using the
 new experimental feature of write ahead logs.
 Any feedback will be much appreciated.

 TD

 On Wed, Dec 10, 2014 at 2:42 AM,  francois.garil...@typesafe.com wrote:
  [sorry for the botched half-message]
 
  Hi Mukesh,
 
  There's been some great work on Spark Streaming reliability lately.
  https://www.youtube.com/watch?v=jcJq3ZalXD8
  Look at the links from:
  https://issues.apache.org/jira/browse/SPARK-3129
 
  I'm not aware of any doc yet (did I miss something ?) but you can look at
  the ReliableKafkaReceiver's test suite:
 
 
 external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
 
  --
  FG
 
 
  On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha me.mukesh@gmail.com
  wrote:
 
  Hello Guys,
 
  Any insights on this??
  If I'm not clear enough my question is how can I use kafka consumer and
  not loose any data in cases of failures with spark-streaming.
 
  On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha me.mukesh@gmail.com
  wrote:
 
  Hello Experts,
 
  I'm working on a spark app which reads data from kafka  persists it in
  hbase.
 
  Spark documentation states the below [1] that in case of worker failure
  we can loose some data. If not how can I make my kafka stream more
 reliable?
  I have seen there is a simple consumer [2] but I'm not sure if it has
  been used/tested extensively.
 
  I was wondering if there is a way to explicitly acknowledge the kafka
  offsets once they are replicated in memory of other worker nodes (if
 it's
  not already done) to tackle this issue.
 
  Any help is appreciated in advance.
 
 
  Using any input source that receives data through a network - For
  network-based data sources like Kafka and Flume, the received input
 data is
  replicated in memory between nodes of the cluster (default replication
  factor is 2). So if a worker node fails, then the system can recompute
 the
  lost from the the left over copy of the input data. However, if the
 worker
  node where a network receiver was running fails, then a tiny bit of
 data may
  be lost, that is, the data received by the system but not yet
 replicated to
  other node(s). The receiver will be started on a different node and it
 will
  continue to receive data.
  https://github.com/dibbhatt/kafka-spark-consumer
 
  Txz,
 
  Mukesh Jha
 
 
 
 
  --
 
 
  Thanks  Regards,
 
  Mukesh Jha
 
 



-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: KafkaUtils explicit acks

2014-12-10 Thread Mukesh Jha
Hello Guys,

Any insights on this??
If I'm not clear enough my question is how can I use kafka consumer and not
loose any data in cases of failures with spark-streaming.

On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha me.mukesh@gmail.com wrote:

 Hello Experts,

 I'm working on a spark app which reads data from kafka  persists it in
 hbase.

 Spark documentation states the below *[1]* that in case of worker failure
 we can loose some data. If not how can I make my kafka stream more reliable?
 I have seen there is a simple consumer *[2]* but I'm not sure if it has
 been used/tested extensively.

 I was wondering if there is a way to explicitly acknowledge the kafka
 offsets once they are replicated in memory of other worker nodes (if it's
 not already done) to tackle this issue.

 Any help is appreciated in advance.


1. *Using any input source that receives data through a network* - For
network-based data sources like *Kafka *and Flume, the received input
data is replicated in memory between nodes of the cluster (default
replication factor is 2). So if a worker node fails, then the system can
recompute the lost from the the left over copy of the input data. However,
if the *worker node where a network receiver was running fails, then a
tiny bit of data may be lost*, that is, the data received by the
system but not yet replicated to other node(s). The receiver will be
started on a different node and it will continue to receive data.
2. https://github.com/dibbhatt/kafka-spark-consumer

 Txz,

 *Mukesh Jha me.mukesh@gmail.com*




-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


KafkaUtils explicit acks

2014-12-09 Thread Mukesh Jha
Hello Experts,

I'm working on a spark app which reads data from kafka  persists it in
hbase.

Spark documentation states the below *[1]* that in case of worker failure
we can loose some data. If not how can I make my kafka stream more reliable?
I have seen there is a simple consumer *[2]* but I'm not sure if it has
been used/tested extensively.

I was wondering if there is a way to explicitly acknowledge the kafka
offsets once they are replicated in memory of other worker nodes (if it's
not already done) to tackle this issue.

Any help is appreciated in advance.


   1. *Using any input source that receives data through a network* - For
   network-based data sources like *Kafka *and Flume, the received input
   data is replicated in memory between nodes of the cluster (default
   replication factor is 2). So if a worker node fails, then the system can
   recompute the lost from the the left over copy of the input data. However,
   if the *worker node where a network receiver was running fails, then a
   tiny bit of data may be lost*, that is, the data received by the system
   but not yet replicated to other node(s). The receiver will be started on a
   different node and it will continue to receive data.
   2. https://github.com/dibbhatt/kafka-spark-consumer

Txz,

*Mukesh Jha me.mukesh@gmail.com*


Lifecycle of RDD in spark-streaming

2014-11-25 Thread Mukesh Jha
Hey Experts,

I wanted to understand in detail about the lifecycle of rdd(s) in a
streaming app.

From my current understanding
- rdd gets created out of the realtime input stream.
- Transform(s) functions are applied in a lazy fashion on the RDD to
transform into another rdd(s).
- Actions are taken on the final transformed rdds to get the data out of
the system.

Also rdd(s) are stored in the clusters RAM (disc if configured so) and are
cleaned in LRU fashion.

So I have the following questions on the same.
- How spark (streaming) guarantees that all the actions are taken on each
input rdd/batch.
- How does spark determines that the life-cycle of a rdd is complete. Is
there any chance that a RDD will be cleaned out of ram before all actions
are taken on them?

Thanks in advance for all your help. Also, I'm relatively new to scala 
spark so pardon me in case these are naive questions/assumptions.

-- 
Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: Lifecycle of RDD in spark-streaming

2014-11-25 Thread Mukesh Jha
Any pointers guys?

On Tue, Nov 25, 2014 at 5:32 PM, Mukesh Jha me.mukesh@gmail.com wrote:

 Hey Experts,

 I wanted to understand in detail about the lifecycle of rdd(s) in a
 streaming app.

 From my current understanding
 - rdd gets created out of the realtime input stream.
 - Transform(s) functions are applied in a lazy fashion on the RDD to
 transform into another rdd(s).
 - Actions are taken on the final transformed rdds to get the data out of
 the system.

 Also rdd(s) are stored in the clusters RAM (disc if configured so) and are
 cleaned in LRU fashion.

 So I have the following questions on the same.
 - How spark (streaming) guarantees that all the actions are taken on each
 input rdd/batch.
 - How does spark determines that the life-cycle of a rdd is complete. Is
 there any chance that a RDD will be cleaned out of ram before all actions
 are taken on them?

 Thanks in advance for all your help. Also, I'm relatively new to scala 
 spark so pardon me in case these are naive questions/assumptions.

 --
 Thanks  Regards,

 *Mukesh Jha me.mukesh@gmail.com*




-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Debugging spark java application

2014-11-19 Thread Mukesh Jha
Hello experts,

Is there an easy way to debug a spark java application?

I'm putting debug logs in the map's function but there aren't any logs on
the console.

Also can i include my custom jars while launching spark-shell and do my poc
there?

This might me a naive question but any help here is appreciated.


Re: Functions in Spark

2014-11-16 Thread Mukesh Jha
Thanks I did go through the video it was very informative, but I think I's
looking for the Transformations section @ page
https://spark.apache.org/docs/0.9.1/scala-programming-guide.html.


On Mon, Nov 17, 2014 at 10:31 AM, Samarth Mailinglist 
mailinglistsama...@gmail.com wrote:

 Check this video out:
 https://www.youtube.com/watch?v=dmL0N3qfSc8list=UURzsq7k4-kT-h3TDUBQ82-w

 On Mon, Nov 17, 2014 at 9:43 AM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 Hi,
 Is there any way to know which of my functions perform better in Spark?
 In other words, say I have achieved same thing using two different
 implementations. How do I judge as to which implementation is better than
 the other. Is processing time the only metric that we can use to claim the
 goodness of one implementation to the other?
 Can anyone please share some thoughts on this?

 Thank You





-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*