Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException

2016-03-20 Thread Surendra , Manchikanti
Hi,

Can you check Kafka topic replication ? And leader information?

Regards,
Surendra M



-- Surendra Manchikanti

On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss  wrote:

> Hi,
>
> I have a SparkStream (with Kafka) job, after running several days, it
> failed with following errors:
> ERROR DirectKafkaInputDStream:
> ArrayBuffer(java.nio.channels.ClosedChannelException)
>
> Any idea what would be wrong? will it be SparkStreaming buffer overflow
> issue?
>
>
>
> Regards
>
>
>
>
>
>
> *** from the log ***
>
> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect is
> overridden to
>
> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream:
> ArrayBuffer(java.nio.channels.ClosedChannelException)
>
> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
> java.nio.channels.ClosedChannelException
>
> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
> 1458188031800 ms
>
> org.apache.spark.SparkException:
> ArrayBuffer(java.nio.channels.ClosedChannelException)
>
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>
> at scala.Option.orElse(Option.scala:257)
>
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
>
> at scala.util.Try$.apply(Try.scala:161)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
>
> at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> Exception in thread "main" org.apache.spark.SparkException:
> ArrayBuffer(java.nio.channels.ClosedChannelException)
>
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>
> at
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>
> at
> 

Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException

2016-03-19 Thread Cody Koeninger
Sounds like you're using one of the KafkaUtils.createDirectStream
overloads that needs to do some broker communication in order to even
construct the stream, because you aren't providing topicpartitions?
Just wrap your construction attempt in a try / catch and retry in
whatever way makes sense for you.

I'd also look in to why it's failing to communicate with brokers, though.

On Fri, Mar 18, 2016 at 1:03 PM, Bryan Jeffrey  wrote:
> Cody et. al,
>
> I am seeing a similar error.  I've increased the number of retries.  Once
> I've got a job up and running I'm seeing it retry correctly. However, I am
> having trouble getting the job started - number of retries does not seem to
> help with startup behavior.
>
> Thoughts?
>
> Regards,
>
> Bryan Jeffrey
>
> On Fri, Mar 18, 2016 at 10:44 AM, Cody Koeninger  wrote:
>>
>> That's a networking error when the driver is attempting to contact
>> leaders to get the latest available offsets.
>>
>> If it's a transient error, you can look at increasing the value of
>> spark.streaming.kafka.maxRetries, see
>>
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> If it's not a transient error, you need to look at your brokers + your
>> network environment.
>>
>> On Thu, Mar 17, 2016 at 10:59 PM, Surendra , Manchikanti
>>  wrote:
>> > Hi,
>> >
>> > Can you check Kafka topic replication ? And leader information?
>> >
>> > Regards,
>> > Surendra M
>> >
>> >
>> >
>> > -- Surendra Manchikanti
>> >
>> > On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss 
>> > wrote:
>> >>
>> >> Hi,
>> >>
>> >> I have a SparkStream (with Kafka) job, after running several days, it
>> >> failed with following errors:
>> >> ERROR DirectKafkaInputDStream:
>> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
>> >>
>> >> Any idea what would be wrong? will it be SparkStreaming buffer overflow
>> >> issue?
>> >>
>> >>
>> >>
>> >> Regards
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> *** from the log ***
>> >>
>> >> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect
>> >> is
>> >> overridden to
>> >>
>> >> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream:
>> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
>> >>
>> >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
>> >> java.nio.channels.ClosedChannelException
>> >>
>> >> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
>> >> 1458188031800 ms
>> >>
>> >> org.apache.spark.SparkException:
>> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> >>
>> >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> >>
>> >> at scala.Option.orElse(Option.scala:257)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>> >>
>> >> at
>> >>
>> >> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>> >>
>> >> at
>> >>
>> >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> >>
>> >> at
>> >>
>> >> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> >>
>> >> at
>> >>
>> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> >>
>> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >>
>> >> at
>> >>
>> >> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> >>
>> >> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> >>
>> >> at
>> >>
>> >> 

Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException

2016-03-19 Thread Bryan Jeffrey
Cody et. al,

I am seeing a similar error.  I've increased the number of retries.  Once
I've got a job up and running I'm seeing it retry correctly. However, I am
having trouble getting the job started - number of retries does not seem to
help with startup behavior.

Thoughts?

Regards,

Bryan Jeffrey

On Fri, Mar 18, 2016 at 10:44 AM, Cody Koeninger  wrote:

> That's a networking error when the driver is attempting to contact
> leaders to get the latest available offsets.
>
> If it's a transient error, you can look at increasing the value of
> spark.streaming.kafka.maxRetries, see
>
> http://spark.apache.org/docs/latest/configuration.html
>
> If it's not a transient error, you need to look at your brokers + your
> network environment.
>
> On Thu, Mar 17, 2016 at 10:59 PM, Surendra , Manchikanti
>  wrote:
> > Hi,
> >
> > Can you check Kafka topic replication ? And leader information?
> >
> > Regards,
> > Surendra M
> >
> >
> >
> > -- Surendra Manchikanti
> >
> > On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss 
> wrote:
> >>
> >> Hi,
> >>
> >> I have a SparkStream (with Kafka) job, after running several days, it
> >> failed with following errors:
> >> ERROR DirectKafkaInputDStream:
> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
> >>
> >> Any idea what would be wrong? will it be SparkStreaming buffer overflow
> >> issue?
> >>
> >>
> >>
> >> Regards
> >>
> >>
> >>
> >>
> >>
> >>
> >> *** from the log ***
> >>
> >> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect
> is
> >> overridden to
> >>
> >> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream:
> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
> >>
> >> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
> >> java.nio.channels.ClosedChannelException
> >>
> >> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
> >> 1458188031800 ms
> >>
> >> org.apache.spark.SparkException:
> >> ArrayBuffer(java.nio.channels.ClosedChannelException)
> >>
> >> at
> >>
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
> >>
> >> at
> >>
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> >>
> >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> >>
> >> at scala.Option.orElse(Option.scala:257)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> >>
> >> at
> >>
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> >>
> >> at
> >>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> >>
> >> at
> >>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> >>
> >> at
> >>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >>
> >> at
> >>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> >>
> >> at
> >>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >>
> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >>
> >> at
> >>
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> >>
> >> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> >>
> >> at
> >>
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
> >>
> >> at
> >>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
> >>
> >> at
> >>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
> >>
> >> at scala.util.Try$.apply(Try.scala:161)
> >>
> >> at
> >>
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
> >>
> >> at
> >> org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
> >>
> >> at
> >>
> 

Re: ERROR ArrayBuffer(java.nio.channels.ClosedChannelException

2016-03-19 Thread Cody Koeninger
That's a networking error when the driver is attempting to contact
leaders to get the latest available offsets.

If it's a transient error, you can look at increasing the value of
spark.streaming.kafka.maxRetries, see

http://spark.apache.org/docs/latest/configuration.html

If it's not a transient error, you need to look at your brokers + your
network environment.

On Thu, Mar 17, 2016 at 10:59 PM, Surendra , Manchikanti
 wrote:
> Hi,
>
> Can you check Kafka topic replication ? And leader information?
>
> Regards,
> Surendra M
>
>
>
> -- Surendra Manchikanti
>
> On Thu, Mar 17, 2016 at 7:28 PM, Ascot Moss  wrote:
>>
>> Hi,
>>
>> I have a SparkStream (with Kafka) job, after running several days, it
>> failed with following errors:
>> ERROR DirectKafkaInputDStream:
>> ArrayBuffer(java.nio.channels.ClosedChannelException)
>>
>> Any idea what would be wrong? will it be SparkStreaming buffer overflow
>> issue?
>>
>>
>>
>> Regards
>>
>>
>>
>>
>>
>>
>> *** from the log ***
>>
>> 16/03/18 09:15:18 INFO VerifiableProperties: Property zookeeper.connect is
>> overridden to
>>
>> 16/03/17 12:13:51 ERROR DirectKafkaInputDStream:
>> ArrayBuffer(java.nio.channels.ClosedChannelException)
>>
>> 16/03/17 12:13:52 INFO SimpleConsumer: Reconnect due to socket error:
>> java.nio.channels.ClosedChannelException
>>
>> 16/03/17 12:13:52 ERROR JobScheduler: Error generating jobs for time
>> 1458188031800 ms
>>
>> org.apache.spark.SparkException:
>> ArrayBuffer(java.nio.channels.ClosedChannelException)
>>
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>>
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>>
>> at scala.Option.orElse(Option.scala:257)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>>
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>>
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>>
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>
>> at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>>
>> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>>
>> at
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
>>
>> at scala.util.Try$.apply(Try.scala:161)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
>>
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>> Exception in thread "main" org.apache.spark.SparkException:
>> ArrayBuffer(java.nio.channels.ClosedChannelException)
>>
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
>>
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>>
>> at
>>