Should be socket.timeout.ms on the map of kafka config parameters. The lack of retry is probably due to the differences between running spark in local mode vs standalone / mesos / yarn.
On Mon, Jan 25, 2016 at 1:19 PM, Supreeth <supreeth....@gmail.com> wrote: > We are running a Kafka Consumer in local mode using Spark Streaming, > KafkaUtils.createDirectStream. > > The job runs as expected, however once in a very long time time, I see the > following exception. > > Wanted to check if others have faced a similar issue, and what are the > right > timeout parameters to change to avoid this issue. > > Job aborted due to stage failure: Task 5 in stage 30499.0 failed 1 times, > most recent failure: Lost task 5.0 in stage 30499.0 (TID 203307, > localhost): > java.net.ConnectException: Connection timed out > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.Net.connect(Net.java:449) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44) > at > kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142) > at > > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > at > > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) > at > > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192) > at > > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) > at > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189) > at > > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > When this failure happens, the executor is not retried either. > Any help appreciated. > > -S > > ps: My last attempt to post did not succeed, apologies if this happens to > be > a re-post of my earlier post a 40 mins ago > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Running-kafka-consumer-in-local-mode-error-connection-timed-out-tp26063.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >