Re: spark streaming 1.3 with kafka connection timeout

2015-09-10 Thread Cody Koeninger
Post the actual stacktrace you're getting

On Thu, Sep 10, 2015 at 12:21 AM, Shushant Arora 
wrote:

> Executors in spark streaming 1.3 fetch messages from kafka in batches and
> what happens when executor takes longer time to complete a fetch batch
>
> say in
>
>
> directKafkaStream.foreachRDD(new Function, Void>() {
>
> @Override
> public Void call(JavaRDD v1) throws Exception {
> v1.foreachPartition(new  VoidFunction>{
> @Override
> public void call(Iterator t) throws Exception {
> //long running task
> }});}});
>
> Will this long running task drops the connectio of executor with kafka
> brokers-
> And how to handle that. I am getting Connection tmeout in my code.
>
>
>
>


Re: spark streaming 1.3 with kafka connection timeout

2015-09-10 Thread Shushant Arora
My bad  Got that exception in driver code of same job not in executor.

But it says of socket close exception only.

org.apache.spark.SparkException: ArrayBuffer(java.io.EOFException: Received
-1 when reading from channel, socket has likely been closed.,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([topicname,51], [topicname,201], [topicname,54], [topicname,93],
[topicname,297], [topicname,123], [topicname,147], [topicname,126],
[topicname,189], [topicname,111], [topicname,159], [topicname,33],
[topicname,36], [topicname,60], [topicname,216], [topicname,9],
[topicname,12], [topicname,282], [topicname,39], [topicname,63],
[topicname,231], [topicname,279], [topicname,18], [topicname,30],
[topicname,276], [topicname,228], [topicname,84], [topicname,252],
[topicname,48], [topicname,150], [topicname,132], [topicname,57],
[topicname,72], [topicname,291], [topicname,234], [topicname,204],
[topicname,186], [topicname,264], [topicname,288], [topicname,87],
[topicname,78], [topicname,249], [topicname,102], [topicname,108],
[topicname,237], [topicname,24], [topicname,96], [topicname,135],
[topicname,198], [topicname,162], [topicname,42], [topicname,258],
[topicname,0], [topicname,174], [topicname,207], [topicname,210],
[topicname,246], [topicname,225], [topicname,270], [topicname,156],
[topicname,183], [topicname,144], [topicname,117], [topicname,69],
[topicname,45], [topicname,219], [topicname,177], [topicname,105],
[topicname,171], [topicname,141], [topicname,285], [topicname,27],
[topicname,168], [topicname,267], [topicname,213], [topicname,153],
[topicname,138], [topicname,255], [topicname,222], [topicname,243],
[topicname,261], [topicname,90], [topicname,114], [topicname,3],
[topicname,81], [topicname,180], [topicname,21], [topicname,6],
[topicname,195], [topicname,129], [topicname,192], [topicname,99],
[topicname,294], [topicname,165], [topicname,240], [topicname,66],
[topicname,75], [topicname,15], [topicname,273], [topicname,120]))
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
at scala.util.Try$.apply(Try.scala:161)
at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/09/10 02:36:02 ERROR yarn.ApplicationMaster: User class threw exception:
ArrayBuffer(java.io.EOFException: Received -1 when reading from channel,
socket has likely 

Re: spark streaming 1.3 with kafka connection timeout

2015-09-10 Thread Cody Koeninger
Again, that looks like you lost a kafka broker.  Executors will retry
failed tasks automatically up to the max failures.

spark.streaming.kafka.maxRetries controls the number of times the driver
will retry when attempting to get offsets.

If your broker isn't up / rebalance hasn't finished after N number of
retries, you've got operational problems you need to deal with.



On Thu, Sep 10, 2015 at 9:58 AM, Shushant Arora 
wrote:

> My bad  Got that exception in driver code of same job not in executor.
>
> But it says of socket close exception only.
>
> org.apache.spark.SparkException: ArrayBuffer(java.io.EOFException:
> Received -1 when reading from channel, socket has likely been closed.,
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([topicname,51], [topicname,201], [topicname,54], [topicname,93],
> [topicname,297], [topicname,123], [topicname,147], [topicname,126],
> [topicname,189], [topicname,111], [topicname,159], [topicname,33],
> [topicname,36], [topicname,60], [topicname,216], [topicname,9],
> [topicname,12], [topicname,282], [topicname,39], [topicname,63],
> [topicname,231], [topicname,279], [topicname,18], [topicname,30],
> [topicname,276], [topicname,228], [topicname,84], [topicname,252],
> [topicname,48], [topicname,150], [topicname,132], [topicname,57],
> [topicname,72], [topicname,291], [topicname,234], [topicname,204],
> [topicname,186], [topicname,264], [topicname,288], [topicname,87],
> [topicname,78], [topicname,249], [topicname,102], [topicname,108],
> [topicname,237], [topicname,24], [topicname,96], [topicname,135],
> [topicname,198], [topicname,162], [topicname,42], [topicname,258],
> [topicname,0], [topicname,174], [topicname,207], [topicname,210],
> [topicname,246], [topicname,225], [topicname,270], [topicname,156],
> [topicname,183], [topicname,144], [topicname,117], [topicname,69],
> [topicname,45], [topicname,219], [topicname,177], [topicname,105],
> [topicname,171], [topicname,141], [topicname,285], [topicname,27],
> [topicname,168], [topicname,267], [topicname,213], [topicname,153],
> [topicname,138], [topicname,255], [topicname,222], [topicname,243],
> [topicname,261], [topicname,90], [topicname,114], [topicname,3],
> [topicname,81], [topicname,180], [topicname,21], [topicname,6],
> [topicname,195], [topicname,129], [topicname,192], [topicname,99],
> [topicname,294], [topicname,165], [topicname,240], [topicname,66],
> [topicname,75], [topicname,15], [topicname,273], [topicname,120]))
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
> at scala.util.Try$.apply(Try.scala:161)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at 

Re: spark streaming 1.3 with kafka connection timeout

2015-09-10 Thread Shushant Arora
Stack trace is
15/09/09 22:49:52 ERROR kafka.KafkaRDD: Lost leader for topic topicname
partition 99,  sleeping for 200ms
kafka.common.NotLeaderForPartitionException
at sun.reflect.GeneratedConstructorAccessor26.newInstance(Unknown
Source)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:142)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:151)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
at
com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:147)
at
com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:35)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/09/09 22:49:52 ERROR consumer.KafkaStreamTransformations: Error while
consuming messages from kafka




Actual code is :

In driver :
final KafkaStreamTransformations transformations = new
KafkaStreamTransformations
(...);

directKafkaStream.foreachRDD(new Function, Void>() {

@Override
public Void call(JavaRDD v1) throws Exception {
v1.foreachPartition(transformations);
return null;
}
});


In KafkaStreamTransformations :


@Override
public void call(Iterator t) throws Exception {
try{
while(t.hasNext()){
...long running task
}
}catch(Exception e){
e.printStackTrace();
logger.error("Error while consuming messages from kafka");
}






On Thu, Sep 10, 2015 at 6:58 PM, Cody Koeninger  wrote:

> Post the actual stacktrace you're getting
>
> On Thu, Sep 10, 2015 at 12:21 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Executors in spark streaming 1.3 fetch messages from kafka in batches and
>> what happens when executor takes longer time to complete a fetch batch
>>
>> say in
>>
>>
>> directKafkaStream.foreachRDD(new Function, Void>() {
>>
>> @Override
>> public Void call(JavaRDD v1) throws Exception {
>> v1.foreachPartition(new  VoidFunction>{
>> @Override
>> public void call(Iterator t) throws Exception {
>> //long running task
>> }});}});
>>
>> Will this long running task drops the connectio of executor with kafka
>> brokers-
>> And how to handle that. I am getting Connection tmeout in my code.
>>
>>
>>
>>
>


Re: spark streaming 1.3 with kafka connection timeout

2015-09-10 Thread Cody Koeninger
NotLeaderForPartitionException means you lost a kafka broker or had a
rebalance... why did you say " I am getting Connection tmeout in my code."

You've asked questions about this exact same situation before, the answer
remains the same

On Thu, Sep 10, 2015 at 9:44 AM, Shushant Arora 
wrote:

> Stack trace is
> 15/09/09 22:49:52 ERROR kafka.KafkaRDD: Lost leader for topic topicname
> partition 99,  sleeping for 200ms
> kafka.common.NotLeaderForPartitionException
> at sun.reflect.GeneratedConstructorAccessor26.newInstance(Unknown
> Source)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:374)
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:142)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:151)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
> at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
> at
> com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:147)
> at
> com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:35)
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
> at
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/09/09 22:49:52 ERROR consumer.KafkaStreamTransformations: Error while
> consuming messages from kafka
>
>
>
>
> Actual code is :
>
> In driver :
> final KafkaStreamTransformations transformations = new
> KafkaStreamTransformations
> (...);
>
> directKafkaStream.foreachRDD(new Function, Void>() {
>
> @Override
> public Void call(JavaRDD v1) throws Exception {
> v1.foreachPartition(transformations);
> return null;
> }
> });
> 
>
> In KafkaStreamTransformations :
>
>
> @Override
> public void call(Iterator t) throws Exception {
> try{
> while(t.hasNext()){
> ...long running task
> }
> }catch(Exception e){
> e.printStackTrace();
> logger.error("Error while consuming messages from kafka");
> }
>
>
>
>
>
>
> On Thu, Sep 10, 2015 at 6:58 PM, Cody Koeninger 
> wrote:
>
>> Post the actual stacktrace you're getting
>>
>> On Thu, Sep 10, 2015 at 12:21 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Executors in spark streaming 1.3 fetch messages from kafka in batches
>>> and what happens when executor takes longer time to complete a fetch batch
>>>
>>> say in
>>>
>>>
>>> directKafkaStream.foreachRDD(new Function, Void>() {
>>>
>>> @Override
>>> public Void call(JavaRDD v1) throws Exception {
>>> v1.foreachPartition(new  VoidFunction>{
>>> @Override
>>> public void call(Iterator t) throws Exception {
>>> //long running task
>>> }});}});
>>>
>>> Will this long running task drops the connectio of executor with kafka
>>> brokers-
>>> And how to handle that. I am getting Connection tmeout in my code.
>>>
>>>
>>>
>>>
>>
>


spark streaming 1.3 with kafka connection timeout

2015-09-09 Thread Shushant Arora
Executors in spark streaming 1.3 fetch messages from kafka in batches and
what happens when executor takes longer time to complete a fetch batch

say in


directKafkaStream.foreachRDD(new Function, Void>() {

@Override
public Void call(JavaRDD v1) throws Exception {
v1.foreachPartition(new  VoidFunction>{
@Override
public void call(Iterator t) throws Exception {
//long running task
}});}});

Will this long running task drops the connectio of executor with kafka
brokers-
And how to handle that. I am getting Connection tmeout in my code.