[ 
https://issues.apache.org/jira/browse/SPARK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Goldenberg updated SPARK-27529:
--------------------------------------
    Description: 
We have a Spark Streaming consumer which at a certain point started 
consistently failing upon a restart with the below error.

Some details:
 * Spark version is 1.5.0.
 * Kafka version is 0.8.2.1 (2.10-0.8.2.1).
 * The topic is configured with: retention.ms=1471228928, 
max.message.bytes=100000000.
 * The consumer runs with auto.offset.reset=smallest.
 * No checkpointing is currently enabled.

I don't see anything in the Spark or Kafka doc to understand why this is 
happening. From googling around,
{noformat}
https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

Finally, I’ll repeat that any semantics beyond at-most-once require that you 
have sufficient log retention in Kafka. If you’re seeing things like 
OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka 
storage, not because something’s wrong with Spark or Kafka.{noformat}
Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible 
causes.
{noformat}
You've under-provisioned Kafka storage and / or Spark compute capacity.
The result is that data is being deleted before it has been processed.{noformat}
All we're trying to do is start the consumer and consume from the topic from 
the earliest available offset. Why would we not be able to do that? How can the 
offsets be out of range if we're saying, just read from the earliest available?

Since we have the retention.ms set to 1 year and we created the topic just a 
few weeks ago, I'd not expect any deletion being done by Kafka as we're 
consuming.

The behavior we're seeing on the consumer side does not feel intuitive or 
cohesive to me. If it is, I'd like to know how to work around it.

Stack traces:
{noformat}

2019-04-19 11:35:17,147 ERROR org.apache.spark.scheduler

.TaskSetManager: Task 10 in stage 147.0 failed 4 times; aborting job

2019-04-19 11:35:17,160 ERROR 
org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming 
job 1555682554000 ms.0

org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in 
stage 147.0 failed 4 times, most recent failure: Lost task

10.3 in stage 147.0 (TID 2368, 10.150.0.58): 
kafka.common.OffsetOutOfRangeException

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at java.lang.Class.newInstance(Class.java:442)

at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)

at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)

at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)

at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)

at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)

at 
com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69)

at 
com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24)

at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)

at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)

at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)

at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)

at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)

at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

at org.apache.spark.scheduler.Task.run(Task.scala:88)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)




Driver stacktrace:

at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sca

la:1280) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
 ~[spark-assembly-1.5.0-hadoop2.4

.0.jar:1.5.0]

at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
 ~[spark-assembly-1.5.0-hadoop2.4

.0.jar:1.5.0]

at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.ja

r:?]

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
 ~[spark-assembly-1.5.0-h

adoop2.4.0.jar:1.5.0]

at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
 ~[spark-assembly-1.5.0-h

adoop2.4.0.jar:1.5.0]

at scala.Option.foreach(Option.scala:236) 
~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.

5.0]

at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
 ~[spark-assembly-1.5.0-hadoop2.4

.0.jar:1.5.0]

at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
 ~[spark-assembly-1.5.0-hadoop2.4.0

.jar:1.5.0]

at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
 ~[spark-assembly-1.5.0-hadoop2.4.0

.jar:1.5.0]

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.

5.0]

at 
org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47)
 ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1

.5.0]

at 
com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:218)
 ~[acme-ingest-kafka-spa

rk-2.0.0-SNAPSHOT.jar:?]

at 
com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:207)
 ~[acme-ingest-kafka-spa

rk-2.0.0-SNAPSHOT.jar:?]

at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
 ~[spark-assembly-1

.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
 ~[spark-assembly-1

.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
 ~[spark-ass

embly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
 ~[spark-ass

embly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
 ~[

spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
 ~[spark-a

ssembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
 ~[spark-a

ssembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
 ~[spark-assembly-1.5.0-hadoop2.4.0

.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
 ~[spark-assembly-1.5.0-had

oop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 ~[spark-assembly-1.5.0-hadoop2.4.

0.jar:1.5.0]

at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 ~[spark-assembly-1.5.0-hadoop2.4.

0.jar:1.5.0]

at scala.util.Try$.apply(Try.scala:161) 
~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
 ~[spark-asse

mbly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
 ~[spark-assembly-1.

5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
 ~[spark-assembly-1.

5.0-hadoop2.4.0.jar:1.5.0]

at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
 ~[spark-assembly-1.5.0-hadoop2.4.0.j

ar:1.5.0]

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_201]

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_201]

at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]

Caused by: kafka.common.OffsetOutOfRangeException

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
~[?:1.8.0_201]

at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 ~[?:1.8.0_201]

at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 ~[?:1.8.0_201]

at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
~[?:1.8.0_201]

at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_201]

at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) 
~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
 ~[acme-ingest-kafka-spark-2.

0.0-SNAPSHOT.jar:?]

at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
 ~[acme-ingest-kafka-spark-2.0.0-

SNAPSHOT.jar:?]

at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
 ~[acme-ingest-kafka-spark-2.0.0-SNA

PSHOT.jar:?]

at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]

at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) 
~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:

?]

at 
com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69)
 ~[acme-ingest-kafka-spark-2.

0.0-SNAPSHOT.jar:?]

at 
com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24)
 ~[acme-ingest-kafka-spark-2.

0.0-SNAPSHOT.jar:?]

at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
 ~[spark-assembly-1.5.0-hadoop2

.4.0.jar:1.5.0]

at 
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
 ~[spark-assembly-1.5.0-hadoop2

.4.0.jar:1.5.0]

at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
 ~[spark-assembly-1.5.0-hadoop2.4.

0.jar:1.5.0]

at 
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
 ~[spark-assembly-1.5.0-hadoop2.4.

0.jar:1.5.0]

at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at org.apache.spark.scheduler.Task.run(Task.scala:88) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]

... 3 more{noformat}
 

 

  was:
We have a Spark Streaming consumer which at a certain point started 
consistently failing upon a restart with the below error.

Some details:
* Spark version is 1.5.0.
* Kafka version is 0.8.2.1 (2.10-0.8.2.1).
* The topic is configured with: retention.ms=1471228928, 
max.message.bytes=100000000.
* The consumer runs with auto.offset.reset=smallest.
* No checkpointing is currently enabled.

I don't see anything in the Spark or Kafka doc to understand why this is 
happening. From googling around,
{noformat}
https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

Finally, I’ll repeat that any semantics beyond at-most-once require that you 
have sufficient log retention in Kafka. If you’re seeing things like 
OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka 
storage, not because something’s wrong with Spark or Kafka.{noformat}
Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible 
causes.
{noformat}
You've under-provisioned Kafka storage and / or Spark compute capacity.
The result is that data is being deleted before it has been processed.{noformat}
All we're trying to do is start the consumer and consume from the topic from 
the earliest available offset. Why would we not be able to do that? How can the 
offsets be out of range if we're saying, just read from the earliest available?

Since we have the retention.ms set to 1 year and we created the topic just a 
few weeks ago, I'd not expect any deletion being done by Kafka as we're 
consuming.

The behavior we're seeing on the consumer side does not feel intuitive or 
cohesive to me. If it is, I'd like to know how to work around it.

 

 


> Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException
> -------------------------------------------------------------------------
>
>                 Key: SPARK-27529
>                 URL: https://issues.apache.org/jira/browse/SPARK-27529
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 1.5.0
>            Reporter: Dmitry Goldenberg
>            Priority: Major
>
> We have a Spark Streaming consumer which at a certain point started 
> consistently failing upon a restart with the below error.
> Some details:
>  * Spark version is 1.5.0.
>  * Kafka version is 0.8.2.1 (2.10-0.8.2.1).
>  * The topic is configured with: retention.ms=1471228928, 
> max.message.bytes=100000000.
>  * The consumer runs with auto.offset.reset=smallest.
>  * No checkpointing is currently enabled.
> I don't see anything in the Spark or Kafka doc to understand why this is 
> happening. From googling around,
> {noformat}
> https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/
> Finally, I’ll repeat that any semantics beyond at-most-once require that you 
> have sufficient log retention in Kafka. If you’re seeing things like 
> OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka 
> storage, not because something’s wrong with Spark or Kafka.{noformat}
> Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible 
> causes.
> {noformat}
> You've under-provisioned Kafka storage and / or Spark compute capacity.
> The result is that data is being deleted before it has been 
> processed.{noformat}
> All we're trying to do is start the consumer and consume from the topic from 
> the earliest available offset. Why would we not be able to do that? How can 
> the offsets be out of range if we're saying, just read from the earliest 
> available?
> Since we have the retention.ms set to 1 year and we created the topic just a 
> few weeks ago, I'd not expect any deletion being done by Kafka as we're 
> consuming.
> The behavior we're seeing on the consumer side does not feel intuitive or 
> cohesive to me. If it is, I'd like to know how to work around it.
> Stack traces:
> {noformat}
> 2019-04-19 11:35:17,147 ERROR org.apache.spark.scheduler
> .TaskSetManager: Task 10 in stage 147.0 failed 4 times; aborting job
> 2019-04-19 11:35:17,160 ERROR 
> org.apache.spark.streaming.scheduler.JobScheduler: Error running job 
> streaming job 1555682554000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in 
> stage 147.0 failed 4 times, most recent failure: Lost task
> 10.3 in stage 147.0 (TID 2368, 10.150.0.58): 
> kafka.common.OffsetOutOfRangeException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
> at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
> at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
> at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at 
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
> at 
> com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69)
> at 
> com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24)
> at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
> at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
> at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
> at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Driver stacktrace:
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sca
> la:1280) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268)
>  ~[spark-assembly-1.5.0-hadoop2.4
> .0.jar:1.5.0]
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267)
>  ~[spark-assembly-1.5.0-hadoop2.4
> .0.jar:1.5.0]
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.ja
> r:?]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>  ~[spark-assembly-1.5.0-h
> adoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>  ~[spark-assembly-1.5.0-h
> adoop2.4.0.jar:1.5.0]
> at scala.Option.foreach(Option.scala:236) 
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>  ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.
> 5.0]
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493)
>  ~[spark-assembly-1.5.0-hadoop2.4
> .0.jar:1.5.0]
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455)
>  ~[spark-assembly-1.5.0-hadoop2.4.0
> .jar:1.5.0]
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444)
>  ~[spark-assembly-1.5.0-hadoop2.4.0
> .jar:1.5.0]
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
> [spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>  ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>  ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222)
>  ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.
> 5.0]
> at 
> org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47)
>  ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1
> .5.0]
> at 
> com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:218)
>  ~[acme-ingest-kafka-spa
> rk-2.0.0-SNAPSHOT.jar:?]
> at 
> com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:207)
>  ~[acme-ingest-kafka-spa
> rk-2.0.0-SNAPSHOT.jar:?]
> at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
>  ~[spark-assembly-1
> .5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315)
>  ~[spark-assembly-1
> .5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
>  ~[spark-ass
> embly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631)
>  ~[spark-ass
> embly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
>  ~[
> spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
>  ~[spark-a
> ssembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
>  ~[spark-a
> ssembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>  ~[spark-assembly-1.5.0-hadoop2.4.0
> .jar:1.5.0]
> at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
>  ~[spark-assembly-1.5.0-had
> oop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>  ~[spark-assembly-1.5.0-hadoop2.4.
> 0.jar:1.5.0]
> at 
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>  ~[spark-assembly-1.5.0-hadoop2.4.
> 0.jar:1.5.0]
> at scala.util.Try$.apply(Try.scala:161) 
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
>  ~[spark-asse
> mbly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
>  ~[spark-assembly-1.
> 5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
>  ~[spark-assembly-1.
> 5.0-hadoop2.4.0.jar:1.5.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at 
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
>  ~[spark-assembly-1.5.0-hadoop2.4.0.j
> ar:1.5.0]
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[?:1.8.0_201]
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_201]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201]
> Caused by: kafka.common.OffsetOutOfRangeException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
> ~[?:1.8.0_201]
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  ~[?:1.8.0_201]
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  ~[?:1.8.0_201]
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
> ~[?:1.8.0_201]
> at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_201]
> at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) 
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
>  ~[acme-ingest-kafka-spark-2.
> 0.0-SNAPSHOT.jar:?]
> at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
>  ~[acme-ingest-kafka-spark-2.0.0-
> SNAPSHOT.jar:?]
> at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
>  ~[acme-ingest-kafka-spark-2.0.0-SNA
> PSHOT.jar:?]
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?]
> at 
> scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) 
> ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:
> ?]
> at 
> com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69)
>  ~[acme-ingest-kafka-spark-2.
> 0.0-SNAPSHOT.jar:?]
> at 
> com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24)
>  ~[acme-ingest-kafka-spark-2.
> 0.0-SNAPSHOT.jar:?]
> at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
>  ~[spark-assembly-1.5.0-hadoop2
> .4.0.jar:1.5.0]
> at 
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222)
>  ~[spark-assembly-1.5.0-hadoop2
> .4.0.jar:1.5.0]
> at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
>  ~[spark-assembly-1.5.0-hadoop2.4.
> 0.jar:1.5.0]
> at 
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
>  ~[spark-assembly-1.5.0-hadoop2.4.
> 0.jar:1.5.0]
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
>  ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
>  ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.scheduler.Task.run(Task.scala:88) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
> ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0]
> ... 3 more{noformat}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to