Again, that looks like you lost a kafka broker.  Executors will retry
failed tasks automatically up to the max failures.

spark.streaming.kafka.maxRetries controls the number of times the driver
will retry when attempting to get offsets.

If your broker isn't up / rebalance hasn't finished after N number of
retries, you've got operational problems you need to deal with.



On Thu, Sep 10, 2015 at 9:58 AM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> My bad  Got that exception in driver code of same job not in executor.
>
> But it says of socket close exception only.
>
> org.apache.spark.SparkException: ArrayBuffer(java.io.EOFException:
> Received -1 when reading from channel, socket has likely been closed.,
> org.apache.spark.SparkException: Couldn't find leader offsets for
> Set([topicname,51], [topicname,201], [topicname,54], [topicname,93],
> [topicname,297], [topicname,123], [topicname,147], [topicname,126],
> [topicname,189], [topicname,111], [topicname,159], [topicname,33],
> [topicname,36], [topicname,60], [topicname,216], [topicname,9],
> [topicname,12], [topicname,282], [topicname,39], [topicname,63],
> [topicname,231], [topicname,279], [topicname,18], [topicname,30],
> [topicname,276], [topicname,228], [topicname,84], [topicname,252],
> [topicname,48], [topicname,150], [topicname,132], [topicname,57],
> [topicname,72], [topicname,291], [topicname,234], [topicname,204],
> [topicname,186], [topicname,264], [topicname,288], [topicname,87],
> [topicname,78], [topicname,249], [topicname,102], [topicname,108],
> [topicname,237], [topicname,24], [topicname,96], [topicname,135],
> [topicname,198], [topicname,162], [topicname,42], [topicname,258],
> [topicname,0], [topicname,174], [topicname,207], [topicname,210],
> [topicname,246], [topicname,225], [topicname,270], [topicname,156],
> [topicname,183], [topicname,144], [topicname,117], [topicname,69],
> [topicname,45], [topicname,219], [topicname,177], [topicname,105],
> [topicname,171], [topicname,141], [topicname,285], [topicname,27],
> [topicname,168], [topicname,267], [topicname,213], [topicname,153],
> [topicname,138], [topicname,255], [topicname,222], [topicname,243],
> [topicname,261], [topicname,90], [topicname,114], [topicname,3],
> [topicname,81], [topicname,180], [topicname,21], [topicname,6],
> [topicname,195], [topicname,129], [topicname,192], [topicname,99],
> [topicname,294], [topicname,165], [topicname,240], [topicname,66],
> [topicname,75], [topicname,15], [topicname,273], [topicname,120]))
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:243)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:241)
> at scala.util.Try$.apply(Try.scala:161)
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:241)
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:177)
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:86)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 15/09/10 02:36:02 ERROR yarn.ApplicationMaster: User class threw
> exception: ArrayBuffer(java.io.EOFException: Received -1 when reading from
> channel, socket has likely been closed., org.apache.spark.SparkException:
> Couldn't find leader offsets for Set([topicname,51], [topicname,201],
> [topicname,54], [topicname,93], [topicname,297], [topicname,123],
> [topicname,147], [topicname,126], [topicname,189], [topicname,111],
> [topicname,159], [topicname,33], [topicname,36], [topicname,60],
> [topicname,216], [topicname,9], [topicname,12], [topicname,282],
> [topicname,39], [topicname,63], [topicname,231], [topicname,279],
> [topicname,18], [topicname,30], [topicname,276], [topicname,228],
> [topicname,84], [topicname,252], [topicname,48], [topicname,150],
> [topicname,132], [topicname,57], [topicname,72], [topicname,291],
> [topicname,234], [topicname,204], [topicname,186], [topicname,264],
> [topicname,288], [topicname,87], [topicname,78], [topicname,249],
> [topicname,102], [topicname,108], [topicname,237], [topicname,24],
> [topicname,96], [topicname,135], [topicname,198], [topicname,162],
> [topicname,42], [topicname,258], [topicname,0], [topicname,174],
> [topicname,207], [topicname,210], [topicname,246], [topicname,225],
> [topicname,270], [topicname,156], [topicname,183], [topicname,144],
> [topicname,117], [topicname,69], [topicname,45], [topicname,219],
> [topicname,177], [topicname,105], [topicname,171], [topicname,141],
> [topicname,285], [topicname,27], [topicname,168], [topicname,267],
> [topicname,213], [topicname,153], [topicname,138], [topicname,255],
> [topicname,222], [topicname,243], [topicname,261], [topicname,90],
> [topicname,114], [topicname,3], [topicname,81], [topicname,180],
> [topicname,21], [topicname,6], [topicname,195], [topicname,129],
> [topicname,192], [topicname,99], [topicname,294], [topicname,165],
> [topicname,240], [topicname,66], [topicname,75], [topicname,15],
> [topicname,273], [topicname,120]))
>
>
> On Thu, Sep 10, 2015 at 8:18 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> NotLeaderForPartitionException means you lost a kafka broker or had a
>> rebalance... why did you say " I am getting Connection tmeout in my
>> code."
>>
>> You've asked questions about this exact same situation before, the answer
>> remains the same
>>
>> On Thu, Sep 10, 2015 at 9:44 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Stack trace is
>>> 15/09/09 22:49:52 ERROR kafka.KafkaRDD: Lost leader for topic topicname
>>> partition 99,  sleeping for 200ms
>>> kafka.common.NotLeaderForPartitionException
>>>         at
>>> sun.reflect.GeneratedConstructorAccessor26.newInstance(Unknown Source)
>>>         at
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>         at
>>> java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>>         at java.lang.Class.newInstance(Class.java:374)
>>>         at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:142)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:151)
>>>         at
>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>>>         at
>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>         at
>>> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
>>>         at
>>> com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:147)
>>>         at
>>> com.example.hadoop.spark.consumer.KafkaStreamTransformations.call(KafkaStreamTransformations.java:35)
>>>         at
>>> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
>>>         at
>>> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:198)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
>>>         at
>>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
>>>         at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
>>>         at
>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
>>>         at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> 15/09/09 22:49:52 ERROR consumer.KafkaStreamTransformations: Error while
>>> consuming messages from kafka
>>>
>>>
>>>
>>>
>>> Actual code is :
>>>
>>> In driver :
>>> final KafkaStreamTransformations transformations = new
>>> KafkaStreamTransformations
>>> (...);
>>>
>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {
>>>
>>> @Override
>>> public Void call(JavaRDD<byte[][]> v1) throws Exception {
>>> v1.foreachPartition(transformations);
>>> return null;
>>> }
>>> });
>>> --------------------------------------------
>>>
>>> In KafkaStreamTransformations :
>>>
>>>
>>> @Override
>>> public void call(Iterator<byte[][]> t) throws Exception {
>>> try{
>>> while(t.hasNext()){
>>> ...long running task
>>> }
>>> }catch(Exception e){
>>> e.printStackTrace();
>>> logger.error("Error while consuming messages from kafka");
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Sep 10, 2015 at 6:58 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> Post the actual stacktrace you're getting
>>>>
>>>> On Thu, Sep 10, 2015 at 12:21 AM, Shushant Arora <
>>>> shushantaror...@gmail.com> wrote:
>>>>
>>>>> Executors in spark streaming 1.3 fetch messages from kafka in batches
>>>>> and what happens when executor takes longer time to complete a fetch batch
>>>>>
>>>>> say in
>>>>>
>>>>>
>>>>> directKafkaStream.foreachRDD(new Function<JavaRDD<byte[][]>, Void>() {
>>>>>
>>>>> @Override
>>>>> public Void call(JavaRDD<byte[][]> v1) throws Exception {
>>>>> v1.foreachPartition(new  VoidFunction<Iterator<byte[][]>>{
>>>>> @Override
>>>>> public void call(Iterator<byte[][]> t) throws Exception {
>>>>> //long running task
>>>>> }});}});
>>>>>
>>>>> Will this long running task drops the connectio of executor with kafka
>>>>> brokers-
>>>>> And how to handle that. I am getting Connection tmeout in my code.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to