[ https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159294#comment-17159294 ]
Ed Mitchell commented on SPARK-32151: ------------------------------------- I could do that if I wanted to start back at the beginning or the end of the topic, but in this case, I would like it to restart back at the offsets defined by my datastore. > Kafka does not allow Partition Rebalance Handling > ------------------------------------------------- > > Key: SPARK-32151 > URL: https://issues.apache.org/jira/browse/SPARK-32151 > Project: Spark > Issue Type: Improvement > Components: DStreams > Affects Versions: 2.4.5 > Reporter: Ed Mitchell > Priority: Minor > > When a consumer group rebalance occurs when the Spark driver is using the > Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared > when partitions are revoked and then reassigned. > While this doesn't happen in the normal rebalance scenario of more consumers > joining the group (though it could), it does happen when the partition leader > is reelected because of a Kafka node being stopped or decommissioned. > This seems to only occur when users specify their own offsets and do not use > Kafka as the persistent store of offsets (they use their own database, and > possibly if using checkpointing). > This could probably affect Structured Streaming. > This presents itself as an "NoOffsetForPartitionException": > {code:java} > 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time > 1589333820000 ms > org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined > offset with no reset policy for partitions: [production-ad-metrics-1, > production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, > production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, > production-ad-metrics-7] > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > at scala.util.Try$.apply(Try.scala:192) > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) > at > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > {code} > This can be fixed by allowing the user to specify an > {code:java} > org.apache.kafka.clients.consumer.ConsumerRebalanceListener{code} > in the KafkaConsumer#subscribe method. > The documentation for ConsumerRebalanceListener states that you can use > KafkaConsumer#seek with fetched offsets > I'm suggesting adding a new ConsumerStrategy that allows users to specify a > function to fetch offsets with a Collection of TopicPartitions. The reason > for this is to keep the Spark user from having to interact with the Kafka API > directly. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org