Hi,

We see below exception when using Spark Kafka streaming 0.10 on a normal
Kafka topic. Not sure why offset missing in zk, but since Spark streaming
override the offset reset policy to none in the code. I can not set the
reset policy to latest(I don't really care data loss now).

Is there any quick way to fix the missing offset or work around this?

Thanks,
Martin

1/06/2018 17:11:02: ERROR:the type of error is
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined
offset with no reset policy for partition:
elasticsearchtopicrealtimereports-97
01/06/2018 17:11:02: ERROR:Undefined offset with no reset policy for
partition: elasticsearchtopicrealtimereports-97
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:370)
org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:248)
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1601)
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1034)
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:165)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:184)
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:211)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.immutable.List.foreach(List.scala:381)
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
scala.collection.immutable.List.map(List.scala:285)
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
scala.Option.orElse(Option.scala:289)
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:34

Reply via email to