Hi,

I have a SparkStream (with Kafka) job, after running several days, it
failed with following errors:
ERROR DirectKafkaInputDStream:
ArrayBuffer(java.nio.channels.ClosedChannelException)

Any idea what would be wrong? will it be SparkStreaming buffer overflow
issue?



Regards






*** from the log ***

16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect is
overridden to

16/03/17 12:13:51 ERROR DirectKafkaInputDStream:
ArrayBuffer(java.nio.channels.ClosedChannelException)

16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
java.nio.channels.ClosedChannelException

16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
1458188031800 ms

org.apache.spark.SparkException:
ArrayBuffer(java.nio.channels.ClosedChannelException)

at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)

at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)

at scala.Option.orElse(Option.scala:257)

at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)

at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)

at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)

at scala.util.Try$.apply(Try.scala:161)

at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)

at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Exception in thread "main" org.apache.spark.SparkException:
ArrayBuffer(java.nio.channels.ClosedChannelException)

at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)

at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)

at scala.Option.orElse(Option.scala:257)

at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)

at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)

at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)

at scala.util.Try$.apply(Try.scala:161)

at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)

at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

16/03/17 12:13:52 INFO StreamingContext: Invoking
stop(stopGracefully=false) from shutdown hook

16/03/17 12:13:52 INFO JobGenerator: Stopping JobGenerator immediately

16/03/17 12:13:52 INFO RecurringTimer: Stopped timer for JobGenerator after
time 1458188032000

16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
java.nio.channels.ClosedChannelException

16/03/17 12:13:52 ERROR DirectKafkaInputDStream:
ArrayBuffer(java.nio.channels.ClosedChannelException)

16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
java.nio.channels.ClosedChannelException

16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
1458188031900 ms

org.apache.spark.SparkException:
ArrayBuffer(java.nio.channels.ClosedChannelException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([Allergy,0]))

at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)

at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)

at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)

at scala.Option.orElse(Option.scala:257)

at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)

at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)

at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)

at scala.util.Try$.apply(Try.scala:161)

at
org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)

at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)

at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
java.nio.channels.ClosedChannelException

16/03/17 12:13:52 ERROR DirectKafkaInputDStream:
ArrayBuffer(java.nio.channels.ClosedChannelException)

16/03/17 12:13:52 INFO JobGenerator: Stopped JobGenerator

16/03/17 12:13:52 INFO JobScheduler: Stopped JobScheduler

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/streaming,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/streaming/batch,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/static/streaming,null}

16/03/17 12:13:52 INFO StreamingContext: StreamingContext stopped
successfully

16/03/17 12:13:52 INFO SparkContext: Invoking stop() from shutdown hook

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/streaming/batch/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/streaming/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/api,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/static,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/threadDump,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/executors,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/environment/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/environment,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/rdd/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/rdd,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/storage,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/pool/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/pool,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/stage,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/stages,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/job/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/job,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs/json,null}

16/03/17 12:13:52 INFO ContextHandler: stopped
o.s.j.s.ServletContextHandler{/jobs,null}

16/03/17 12:13:52 INFO SparkUI: Stopped Spark web UI at
http://192.168.10.31:4042

16/03/17 12:13:52 INFO DAGScheduler: Stopping DAGScheduler

16/03/17 12:13:52 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!

16/03/17 12:13:52 INFO MemoryStore: MemoryStore cleared

16/03/17 12:13:52 INFO BlockManager: BlockManager stopped

16/03/17 12:13:52 INFO BlockManagerMaster: BlockManagerMaster stopped

16/03/17 12:13:52 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!

16/03/17 12:13:52 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.

16/03/17 12:13:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.

16/03/17 12:13:52 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.

16/03/17 12:13:52 INFO SparkContext: Successfully stopped SparkContext

16/03/17 12:13:52 INFO ShutdownHookManager: Shutdown hook called

16/03/17 12:13:52 INFO ShutdownHookManager: Deleting directory
/tmp/spark-c6376f77-9f5a-4f76-bbaf-6fa8eb37870a

Reply via email to