That sounds like a networking issue to me.  Stuff to try
- make sure every executor node can talk to every kafka broker on relevant
ports
- look at firewalls / network  config.  Even if you can make the initial
connection, something may be happening after a while (we've seen ...
"interesting"... issues with aws networking for instance)
- look at kafka error logs
- look at lsof or even tcpdump to see whats happening with the relevant
ports when this occurs



On Thu, Oct 22, 2015 at 9:00 AM, Conor Fennell <conorapa...@gmail.com>
wrote:

> Hi,
>
> Firstly want to say a big thanks to Cody for contributing the kafka
> direct stream.
>
> I have been using the receiver based approach for months but the
> direct stream is a much better solution for my use case.
>
> The job in question is now ported over to the direct stream doing
> idempotent outputs to Cassandra and outputting to kafka.
> I am also saving the offsets to Cassandra.
>
> But unfortunately I am sporadically getting the error below.
> It recovers and continues but gives a large spike in the processing
> delay. And it can happen in every 3 or 4 batches.
> I still have other receiver jobs running and they never throw these
> exceptions.
>
> I would be very appreciative for any direction and I can happily
> provide more detail.
>
> Thanks,
> Conor
>
> 15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not
> found, computing it
> 15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events,
> partition 0 offsets 13630747 -> 13633001
> 15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties
> 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id
> is overridden to
> 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property
> zookeeper.connect is overridden to
> 15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to
> socket error: java.nio.channels.ClosedChannelException
> 15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in
> stage 654.0 (TID 5242)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got
> assigned task 5243
> 15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage
> 654.0 (TID 5243)
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to