Should be socket.timeout.ms on the map of kafka config parameters.  The
lack of retry is probably due to the differences between running spark in
local mode vs standalone / mesos / yarn.



On Mon, Jan 25, 2016 at 1:19 PM, Supreeth <supreeth....@gmail.com> wrote:

> We are running a Kafka Consumer in local mode using Spark Streaming,
> KafkaUtils.createDirectStream.
>
> The job runs as expected, however once in a very long time time, I see the
> following exception.
>
> Wanted to check if others have faced a similar issue, and what are the
> right
> timeout parameters to change to avoid this issue.
>
> Job aborted due to stage failure: Task 5 in stage 30499.0 failed 1 times,
> most recent failure: Lost task 5.0 in stage 30499.0 (TID 203307,
> localhost):
> java.net.ConnectException: Connection timed out
>         at sun.nio.ch.Net.connect0(Native Method)
>         at sun.nio.ch.Net.connect(Net.java:457)
>         at sun.nio.ch.Net.connect(Net.java:449)
>         at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647)
>         at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>         at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>         at
> kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
>         at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>         at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>         at
>
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
>         at
>
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
>         at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>         at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         at
>
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>         at
>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> When this failure happens, the executor is not retried either.
> Any help appreciated.
>
> -S
>
> ps: My last attempt to post did not succeed, apologies if this happens to
> be
> a re-post of my earlier post a 40 mins ago
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Running-kafka-consumer-in-local-mode-error-connection-timed-out-tp26063.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to