[ https://issues.apache.org/jira/browse/SPARK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826072#comment-16826072 ]
Dmitry Goldenberg commented on SPARK-27529: ------------------------------------------- Hi Hyukjin, I could check although it seems this behavior has been there a while and in all likelihood has not changed. A check with a higher version will entail some effort because a lot has changed in Kafka and in Spark. I was hoping to get a more precise answer from someone who wrote the code in Spark Streaming or is familiar on the detailed level. Is this a bug or is this a behavior? If it's an expected behavior, what's causing it and how could we work around it? > Spark Streaming consumer dies with kafka.common.OffsetOutOfRangeException > ------------------------------------------------------------------------- > > Key: SPARK-27529 > URL: https://issues.apache.org/jira/browse/SPARK-27529 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 1.5.0 > Reporter: Dmitry Goldenberg > Priority: Major > > We have a Spark Streaming consumer which at a certain point started > consistently failing upon a restart with the below error. > Some details: > * Spark version is 1.5.0. > * Kafka version is 0.8.2.1 (2.10-0.8.2.1). > * The topic is configured with: retention.ms=1471228928, > max.message.bytes=100000000. > * The consumer runs with auto.offset.reset=smallest. > * No checkpointing is currently enabled. > I don't see anything in the Spark or Kafka doc to understand why this is > happening. From googling around, > {noformat} > https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ > Finally, I’ll repeat that any semantics beyond at-most-once require that you > have sufficient log retention in Kafka. If you’re seeing things like > OffsetOutOfRangeException, it’s probably because you underprovisioned Kafka > storage, not because something’s wrong with Spark or Kafka.{noformat} > Also looking at SPARK-12693 and SPARK-11693, I don't understand the possible > causes. > {noformat} > You've under-provisioned Kafka storage and / or Spark compute capacity. > The result is that data is being deleted before it has been > processed.{noformat} > All we're trying to do is start the consumer and consume from the topic from > the earliest available offset. Why would we not be able to do that? How can > the offsets be out of range if we're saying, just read from the earliest > available? > Since we have the retention.ms set to 1 year and we created the topic just a > few weeks ago, I'd not expect any deletion being done by Kafka as we're > consuming. > I'd like to understand the actual cause of this error. Any recommendations on > a workaround would be appreciated. > Stack traces: > {noformat} > 2019-04-19 11:35:17,147 ERROR org.apache.spark.scheduler > .TaskSetManager: Task 10 in stage 147.0 failed 4 times; aborting job > 2019-04-19 11:35:17,160 ERROR > org.apache.spark.streaming.scheduler.JobScheduler: Error running job > streaming job 1555682554000 ms.0 > org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in > stage 147.0 failed 4 times, most recent failure: Lost task > 10.3 in stage 147.0 (TID 2368, 10.150.0.58): > kafka.common.OffsetOutOfRangeException > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at java.lang.Class.newInstance(Class.java:442) > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) > at > com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69) > at > com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sca > la:1280) ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1268) > ~[spark-assembly-1.5.0-hadoop2.4 > .0.jar:1.5.0] > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1267) > ~[spark-assembly-1.5.0-hadoop2.4 > .0.jar:1.5.0] > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.ja > r:?] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1267) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > ~[spark-assembly-1.5.0-h > adoop2.4.0.jar:1.5.0] > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) > ~[spark-assembly-1.5.0-h > adoop2.4.0.jar:1.5.0] > at scala.Option.foreach(Option.scala:236) > ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1. > 5.0] > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1493) > ~[spark-assembly-1.5.0-hadoop2.4 > .0.jar:1.5.0] > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1455) > ~[spark-assembly-1.5.0-hadoop2.4.0 > .jar:1.5.0] > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1444) > ~[spark-assembly-1.5.0-hadoop2.4.0 > .jar:1.5.0] > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > [spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1839) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:898) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:896) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:896) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:222) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1. > 5.0] > at > org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:47) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1 > .5.0] > at > com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:218) > ~[acme-ingest-kafka-spa > rk-2.0.0-SNAPSHOT.jar:?] > at > com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver$3.call(KafkaSparkStreamingDriver.java:207) > ~[acme-ingest-kafka-spa > rk-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315) > ~[spark-assembly-1 > .5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:315) > ~[spark-assembly-1 > .5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631) > ~[spark-ass > embly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:631) > ~[spark-ass > embly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42) > ~[ > spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) > ~[spark-a > ssembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) > ~[spark-a > ssembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > ~[spark-assembly-1.5.0-hadoop2.4.0 > .jar:1.5.0] > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40) > ~[spark-assembly-1.5.0-had > oop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > ~[spark-assembly-1.5.0-hadoop2.4. > 0.jar:1.5.0] > at > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > ~[spark-assembly-1.5.0-hadoop2.4. > 0.jar:1.5.0] > at scala.util.Try$.apply(Try.scala:161) > ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207) > ~[spark-asse > mbly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207) > ~[spark-assembly-1. > 5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207) > ~[spark-assembly-1. > 5.0-hadoop2.4.0.jar:1.5.0] > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206) > ~[spark-assembly-1.5.0-hadoop2.4.0.j > ar:1.5.0] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[?:1.8.0_201] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[?:1.8.0_201] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_201] > Caused by: kafka.common.OffsetOutOfRangeException > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > ~[?:1.8.0_201] > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > ~[?:1.8.0_201] > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > ~[?:1.8.0_201] > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > ~[?:1.8.0_201] > at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_201] > at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) > ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184) > ~[acme-ingest-kafka-spark-2. > 0.0-SNAPSHOT.jar:?] > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193) > ~[acme-ingest-kafka-spark-2.0.0- > SNAPSHOT.jar:?] > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208) > ~[acme-ingest-kafka-spark-2.0.0-SNA > PSHOT.jar:?] > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar:?] > at > scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29) > ~[acme-ingest-kafka-spark-2.0.0-SNAPSHOT.jar: > ?] > at > com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:69) > ~[acme-ingest-kafka-spark-2. > 0.0-SNAPSHOT.jar:?] > at > com.acme.consumer.kafka.spark.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24) > ~[acme-ingest-kafka-spark-2. > 0.0-SNAPSHOT.jar:?] > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) > ~[spark-assembly-1.5.0-hadoop2 > .4.0.jar:1.5.0] > at > org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:222) > ~[spark-assembly-1.5.0-hadoop2 > .4.0.jar:1.5.0] > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) > ~[spark-assembly-1.5.0-hadoop2.4. > 0.jar:1.5.0] > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898) > ~[spark-assembly-1.5.0-hadoop2.4. > 0.jar:1.5.0] > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.scheduler.Task.run(Task.scala:88) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > ~[spark-assembly-1.5.0-hadoop2.4.0.jar:1.5.0] > ... 3 more{noformat} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org