I think you also can give a try to this consumer : http://spark-packages.org/package/dibbhatt/kafka-spark-consumer in your environment. This has been running fine for topic with large number of Kafka partition ( > 200 ) like yours without any issue.. no issue with connection as this consumer re-use kafka connection , and also can recover from any failures ( network loss , Kafka leader goes down, ZK down etc ..).
Regards, Dibyendu On Sat, Aug 22, 2015 at 7:35 PM, Shushant Arora <shushantaror...@gmail.com> wrote: > On trying the consumer without external connections or with low number of > external conections its working fine - > > so doubt is how socket got closed - > > java.io.EOFException: Received -1 when reading from channel, socket has > likely been closed. > > > > On Sat, Aug 22, 2015 at 7:24 PM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> Can you try some other consumer and see if the issue still exists? >> On Aug 22, 2015 12:47 AM, "Shushant Arora" <shushantaror...@gmail.com> >> wrote: >> >>> Exception comes when client has so many connections to some another >>> external server also. >>> So I think Exception is coming because of client side issue only- server >>> side there is no issue. >>> >>> >>> Want to understand is executor(simple consumer) not making new >>> connection to kafka broker at start of each task ? Or is it created once >>> only and that is getting closed somehow ? >>> >>> On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> it comes at start of each tasks when there is new data inserted in >>>> kafka.( data inserted is very few) >>>> kafka topic has 300 partitions - data inserted is ~10 MB. >>>> >>>> Tasks gets failed and it retries which succeed and after certain no of >>>> fail tasks it kills the job. >>>> >>>> >>>> >>>> >>>> On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das <ak...@sigmoidanalytics.com> >>>> wrote: >>>> >>>>> That looks like you are choking your kafka machine. Do a top on the >>>>> kafka machines and see the workload, it may happen that you are spending >>>>> too much time on disk io etc. >>>>> On Aug 21, 2015 7:32 AM, "Cody Koeninger" <c...@koeninger.org> wrote: >>>>> >>>>>> Sounds like that's happening consistently, not an occasional network >>>>>> problem? >>>>>> >>>>>> Look at the Kafka broker logs >>>>>> >>>>>> Make sure you've configured the correct kafka broker hosts / ports >>>>>> (note that direct stream does not use zookeeper host / port). >>>>>> >>>>>> Make sure that host / port is reachable from your driver and worker >>>>>> nodes, ie telnet or netcat to it. It looks like your driver can reach it >>>>>> (since there's partition info in the logs), but that doesn't mean the >>>>>> worker can. >>>>>> >>>>>> Use lsof / netstat to see what's going on with those ports while the >>>>>> job is running, or tcpdump if you need to. >>>>>> >>>>>> If you can't figure out what's going on from a networking point of >>>>>> view, post a minimal reproducible code sample that demonstrates the >>>>>> issue, >>>>>> so it can be tested in a different environment. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora < >>>>>> shushantaror...@gmail.com> wrote: >>>>>> >>>>>>> Hi >>>>>>> >>>>>>> >>>>>>> Getting below error in spark streaming 1.3 while consuming from kafka >>>>>>> using directkafka stream. Few of tasks are getting failed in each run. >>>>>>> >>>>>>> >>>>>>> What is the reason /solution of this error? >>>>>>> >>>>>>> >>>>>>> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in >>>>>>> stage 130.0 (TID 16332) >>>>>>> java.io.EOFException: Received -1 when reading from channel, socket has >>>>>>> likely been closed. >>>>>>> at kafka.utils.Utils$.read(Utils.scala:376) >>>>>>> at >>>>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) >>>>>>> at >>>>>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56) >>>>>>> at >>>>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) >>>>>>> at >>>>>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) >>>>>>> at >>>>>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81) >>>>>>> at >>>>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71) >>>>>>> at >>>>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) >>>>>>> at >>>>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) >>>>>>> at >>>>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109) >>>>>>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>>>>>> at >>>>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108) >>>>>>> at >>>>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) >>>>>>> at >>>>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108) >>>>>>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>>>>>> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) >>>>>>> at >>>>>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150) >>>>>>> at >>>>>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162) >>>>>>> at >>>>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >>>>>>> at >>>>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >>>>>>> at >>>>>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210) >>>>>>> at >>>>>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) >>>>>>> at >>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>>>>> at >>>>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>>>>>> at >>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got >>>>>>> assigned task 16348 >>>>>>> 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage >>>>>>> 130.0 (TID 16348) >>>>>>> 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic >>>>>>> test_hbrealtimeevents, partition 75 offsets 4701 -> 4718 >>>>>>> 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> >>>>>> >>>> >>> >