Exception comes when client has so many connections to some another
external server also.
So I think Exception is coming because of client side issue only- server
side there is no issue.


Want to understand is executor(simple consumer) not making new connection
to kafka broker at start of each task ? Or is it created once only and that
is getting closed somehow ?

On Sat, Aug 22, 2015 at 9:41 AM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> it comes at start of each tasks when there is new data inserted in kafka.(
> data inserted is very few)
> kafka topic has 300 partitions - data inserted is ~10 MB.
>
> Tasks gets failed and it retries which succeed and after certain no of
> fail tasks it kills the job.
>
>
>
>
> On Sat, Aug 22, 2015 at 2:08 AM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> That looks like you are choking your kafka machine. Do a top on the kafka
>> machines and see the workload, it may happen that you are spending too much
>> time on disk io etc.
>> On Aug 21, 2015 7:32 AM, "Cody Koeninger" <c...@koeninger.org> wrote:
>>
>>> Sounds like that's happening consistently, not an occasional network
>>> problem?
>>>
>>> Look at the Kafka broker logs
>>>
>>> Make sure you've configured the correct kafka broker hosts / ports (note
>>> that direct stream does not use zookeeper host / port).
>>>
>>> Make sure that host / port is reachable from your driver and worker
>>> nodes, ie telnet or netcat to it.  It looks like your driver can reach it
>>> (since there's partition info in the logs), but that doesn't mean the
>>> worker can.
>>>
>>> Use lsof / netstat to see what's going on with those ports while the job
>>> is running, or tcpdump if you need to.
>>>
>>> If you can't figure out what's going on from a networking point of view,
>>> post a minimal reproducible code sample that demonstrates the issue, so it
>>> can be tested in a different environment.
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Aug 21, 2015 at 4:06 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>>
>>>> Getting below error in spark streaming 1.3 while consuming from kafka 
>>>> using directkafka stream. Few of tasks are getting failed in each run.
>>>>
>>>>
>>>> What is the reason /solution of this error?
>>>>
>>>>
>>>> 15/08/21 08:54:54 ERROR executor.Executor: Exception in task 262.0 in 
>>>> stage 130.0 (TID 16332)
>>>> java.io.EOFException: Received -1 when reading from channel, socket has 
>>>> likely been closed.
>>>>    at kafka.utils.Utils$.read(Utils.scala:376)
>>>>    at 
>>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>>>>    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>>>>    at 
>>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>>>>    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>>>>    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
>>>>    at 
>>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
>>>>    at 
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>>>>    at 
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>    at 
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>>>>    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>    at 
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>>>>    at 
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>    at 
>>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>>>>    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>>>>    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>>>>    at 
>>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
>>>>    at 
>>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:162)
>>>>    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>    at 
>>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:210)
>>>>    at 
>>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
>>>>    at 
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>>>>    at 
>>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>>    at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>>    at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>    at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>    at java.lang.Thread.run(Thread.java:745)
>>>> 15/08/21 08:54:54 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
>>>> task 16348
>>>> 15/08/21 08:54:54 INFO executor.Executor: Running task 260.1 in stage 
>>>> 130.0 (TID 16348)
>>>> 15/08/21 08:54:54 INFO kafka.KafkaRDD: Computing topic 
>>>> test_hbrealtimeevents, partition 75 offsets 4701 -> 4718
>>>> 15/08/21 08:54:54 INFO utils.VerifiableProperties: Verifying properties
>>>>
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>>
>>>
>

Reply via email to