Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException
Hi, Can you check Kafka topic replication ? And leader information? Regards, Surendra M -- Surendra Manchikanti On Thu, Mar 17, 2016 at 7:28 PM, Ascot Mosswrote: > Hi, > > I have a SparkStream (with Kafka) job, after running several days, it > failed with following errors: > ERROR DirectKafkaInputDStream: > ArrayBuffer(java.nio.channels.ClosedChannelException) > > Any idea what would be wrong? will it be SparkStreaming buffer overflow > issue? > > > > Regards > > > > > > > *** from the log *** > > 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect is > overridden to > > 16/03/17 12:13:51 ERROR DirectKafkaInputDStream: > ArrayBuffer(java.nio.channels.ClosedChannelException) > > 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error: > java.nio.channels.ClosedChannelException > > 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time > 1458188031800 ms > > org.apache.spark.SparkException: > ArrayBuffer(java.nio.channels.ClosedChannelException) > > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > > at scala.Option.orElse(Option.scala:257) > > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) > > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) > > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) > > at scala.util.Try$.apply(Try.scala:161) > > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) > > at org.apache.spark.streaming.scheduler.JobGenerator.org > $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) > > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) > > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) > > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > Exception in thread "main" org.apache.spark.SparkException: > ArrayBuffer(java.nio.channels.ClosedChannelException) > > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > > at > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > > at >
Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException
Sounds like you're using one of the KafkaUtils.createDirectStream overloads that needs to do some broker communication in order to even construct the stream, because you aren't providing topicpartitions? Just wrap your construction attempt in a try / catch and retry in whatever way makes sense for you. I'd also look in to why it's failing to communicate with brokers, though. On Fri, Mar 18, 2016 at 1:03 PM, Bryan Jeffreywrote: > Cody et. al, > > I am seeing a similar error. I've increased the number of retries. Once > I've got a job up and running I'm seeing it retry correctly. However, I am > having trouble getting the job started - number of retries does not seem to > help with startup behavior. > > Thoughts? > > Regards, > > Bryan Jeffrey > > On Fri, Mar 18, 2016 at 10:44 AM, Cody Koeninger wrote: >> >> That's a networking error when the driver is attempting to contact >> leaders to get the latest available offsets. >> >> If it's a transient error, you can look at increasing the value of >> spark.streaming.kafka.maxRetries, see >> >> http://spark.apache.org/docs/latest/configuration.html >> >> If it's not a transient error, you need to look at your brokers + your >> network environment. >> >> On Thu, Mar 17, 2016 at 10:59 PM, Surendra , Manchikanti >> wrote: >> > Hi, >> > >> > Can you check Kafka topic replication ? And leader information? >> > >> > Regards, >> > Surendra M >> > >> > >> > >> > -- Surendra Manchikanti >> > >> > On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss >> > wrote: >> >> >> >> Hi, >> >> >> >> I have a SparkStream (with Kafka) job, after running several days, it >> >> failed with following errors: >> >> ERROR DirectKafkaInputDStream: >> >> ArrayBuffer(java.nio.channels.ClosedChannelException) >> >> >> >> Any idea what would be wrong? will it be SparkStreaming buffer overflow >> >> issue? >> >> >> >> >> >> >> >> Regards >> >> >> >> >> >> >> >> >> >> >> >> >> >> *** from the log *** >> >> >> >> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect >> >> is >> >> overridden to >> >> >> >> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream: >> >> ArrayBuffer(java.nio.channels.ClosedChannelException) >> >> >> >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error: >> >> java.nio.channels.ClosedChannelException >> >> >> >> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time >> >> 1458188031800 ms >> >> >> >> org.apache.spark.SparkException: >> >> ArrayBuffer(java.nio.channels.ClosedChannelException) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) >> >> >> >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) >> >> >> >> at scala.Option.orElse(Option.scala:257) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) >> >> >> >> at >> >> >> >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) >> >> >> >> at >> >> >> >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >> >> >> >> at >> >> >> >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >> >> >> >> at >> >> >> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> >> >> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >> >> >> at >> >> >> >> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) >> >> >> >> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) >> >> >> >> at >> >> >> >>
Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException
Cody et. al, I am seeing a similar error. I've increased the number of retries. Once I've got a job up and running I'm seeing it retry correctly. However, I am having trouble getting the job started - number of retries does not seem to help with startup behavior. Thoughts? Regards, Bryan Jeffrey On Fri, Mar 18, 2016 at 10:44 AM, Cody Koeningerwrote: > That's a networking error when the driver is attempting to contact > leaders to get the latest available offsets. > > If it's a transient error, you can look at increasing the value of > spark.streaming.kafka.maxRetries, see > > http://spark.apache.org/docs/latest/configuration.html > > If it's not a transient error, you need to look at your brokers + your > network environment. > > On Thu, Mar 17, 2016 at 10:59 PM, Surendra , Manchikanti > wrote: > > Hi, > > > > Can you check Kafka topic replication ? And leader information? > > > > Regards, > > Surendra M > > > > > > > > -- Surendra Manchikanti > > > > On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss > wrote: > >> > >> Hi, > >> > >> I have a SparkStream (with Kafka) job, after running several days, it > >> failed with following errors: > >> ERROR DirectKafkaInputDStream: > >> ArrayBuffer(java.nio.channels.ClosedChannelException) > >> > >> Any idea what would be wrong? will it be SparkStreaming buffer overflow > >> issue? > >> > >> > >> > >> Regards > >> > >> > >> > >> > >> > >> > >> *** from the log *** > >> > >> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect > is > >> overridden to > >> > >> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream: > >> ArrayBuffer(java.nio.channels.ClosedChannelException) > >> > >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error: > >> java.nio.channels.ClosedChannelException > >> > >> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time > >> 1458188031800 ms > >> > >> org.apache.spark.SparkException: > >> ArrayBuffer(java.nio.channels.ClosedChannelException) > >> > >> at > >> > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) > >> > >> at > >> > org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > >> > >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > >> > >> at scala.Option.orElse(Option.scala:257) > >> > >> at > >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > >> > >> at > >> > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > >> > >> at > >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > >> > >> at > >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > >> > >> at > >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > >> > >> at > >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > >> > >> at > >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > >> > >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >> > >> at > >> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > >> > >> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > >> > >> at > >> > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) > >> > >> at scala.util.Try$.apply(Try.scala:161) > >> > >> at > >> > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) > >> > >> at > >> org.apache.spark.streaming.scheduler.JobGenerator.org > $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) > >> > >> at > >> >
Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException
That's a networking error when the driver is attempting to contact leaders to get the latest available offsets. If it's a transient error, you can look at increasing the value of spark.streaming.kafka.maxRetries, see http://spark.apache.org/docs/latest/configuration.html If it's not a transient error, you need to look at your brokers + your network environment. On Thu, Mar 17, 2016 at 10:59 PM, Surendra , Manchikantiwrote: > Hi, > > Can you check Kafka topic replication ? And leader information? > > Regards, > Surendra M > > > > -- Surendra Manchikanti > > On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss wrote: >> >> Hi, >> >> I have a SparkStream (with Kafka) job, after running several days, it >> failed with following errors: >> ERROR DirectKafkaInputDStream: >> ArrayBuffer(java.nio.channels.ClosedChannelException) >> >> Any idea what would be wrong? will it be SparkStreaming buffer overflow >> issue? >> >> >> >> Regards >> >> >> >> >> >> >> *** from the log *** >> >> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect is >> overridden to >> >> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream: >> ArrayBuffer(java.nio.channels.ClosedChannelException) >> >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error: >> java.nio.channels.ClosedChannelException >> >> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time >> 1458188031800 ms >> >> org.apache.spark.SparkException: >> ArrayBuffer(java.nio.channels.ClosedChannelException) >> >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) >> >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) >> >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) >> >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) >> >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) >> >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) >> >> at >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) >> >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) >> >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) >> >> at scala.Option.orElse(Option.scala:257) >> >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) >> >> at >> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) >> >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) >> >> at >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) >> >> at >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >> >> at >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) >> >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >> at >> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) >> >> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) >> >> at >> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) >> >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247) >> >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245) >> >> at scala.util.Try$.apply(Try.scala:161) >> >> at >> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245) >> >> at >> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181) >> >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) >> >> at >> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86) >> >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> >> Exception in thread "main" org.apache.spark.SparkException: >> ArrayBuffer(java.nio.channels.ClosedChannelException) >> >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123) >> >> at >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145) >> >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) >> >> at >>