[jira] [Commented] (SPARK-32151) Kafka does not allow Partition Rebalance Handling
[ https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159330#comment-17159330 ] Ed Mitchell commented on SPARK-32151: - Yeah - I think you're right. It's going to be in the write ahead logs or the checkpoint. > Kafka does not allow Partition Rebalance Handling > - > > Key: SPARK-32151 > URL: https://issues.apache.org/jira/browse/SPARK-32151 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.4.5 >Reporter: Ed Mitchell >Priority: Minor > > When a consumer group rebalance occurs when the Spark driver is using the > Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared > when partitions are revoked and then reassigned. > While this doesn't happen in the normal rebalance scenario of more consumers > joining the group (though it could), it does happen when the partition leader > is reelected because of a Kafka node being stopped or decommissioned. > This seems to only occur when users specify their own offsets and do not use > Kafka as the persistent store of offsets (they use their own database, and > possibly if using checkpointing). > This could probably affect Structured Streaming. > This presents itself as an "NoOffsetForPartitionException": > {code:java} > 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time > 158933382 ms > org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined > offset with no reset policy for partitions: [production-ad-metrics-1, > production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, > production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, > production-ad-metrics-7] > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > at scala.util.Try$.apply(Try.scala:192) > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGener
[jira] [Commented] (SPARK-32151) Kafka does not allow Partition Rebalance Handling
[ https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159322#comment-17159322 ] Gabor Somogyi commented on SPARK-32151: --- I see, this case Structured Streaming is not affected. > Kafka does not allow Partition Rebalance Handling > - > > Key: SPARK-32151 > URL: https://issues.apache.org/jira/browse/SPARK-32151 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.4.5 >Reporter: Ed Mitchell >Priority: Minor > > When a consumer group rebalance occurs when the Spark driver is using the > Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared > when partitions are revoked and then reassigned. > While this doesn't happen in the normal rebalance scenario of more consumers > joining the group (though it could), it does happen when the partition leader > is reelected because of a Kafka node being stopped or decommissioned. > This seems to only occur when users specify their own offsets and do not use > Kafka as the persistent store of offsets (they use their own database, and > possibly if using checkpointing). > This could probably affect Structured Streaming. > This presents itself as an "NoOffsetForPartitionException": > {code:java} > 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time > 158933382 ms > org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined > offset with no reset policy for partitions: [production-ad-metrics-1, > production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, > production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, > production-ad-metrics-7] > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > at scala.util.Try$.apply(Try.scala:192) > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) > at > org.a
[jira] [Commented] (SPARK-32151) Kafka does not allow Partition Rebalance Handling
[ https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159294#comment-17159294 ] Ed Mitchell commented on SPARK-32151: - I could do that if I wanted to start back at the beginning or the end of the topic, but in this case, I would like it to restart back at the offsets defined by my datastore. > Kafka does not allow Partition Rebalance Handling > - > > Key: SPARK-32151 > URL: https://issues.apache.org/jira/browse/SPARK-32151 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.4.5 >Reporter: Ed Mitchell >Priority: Minor > > When a consumer group rebalance occurs when the Spark driver is using the > Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared > when partitions are revoked and then reassigned. > While this doesn't happen in the normal rebalance scenario of more consumers > joining the group (though it could), it does happen when the partition leader > is reelected because of a Kafka node being stopped or decommissioned. > This seems to only occur when users specify their own offsets and do not use > Kafka as the persistent store of offsets (they use their own database, and > possibly if using checkpointing). > This could probably affect Structured Streaming. > This presents itself as an "NoOffsetForPartitionException": > {code:java} > 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time > 158933382 ms > org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined > offset with no reset policy for partitions: [production-ad-metrics-1, > production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, > production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, > production-ad-metrics-7] > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > at scala.util.Try$.apply(Try.scala:
[jira] [Commented] (SPARK-32151) Kafka does not allow Partition Rebalance Handling
[ https://issues.apache.org/jira/browse/SPARK-32151?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159278#comment-17159278 ] Gabor Somogyi commented on SPARK-32151: --- Have you set `auto.offset.reset`? > Kafka does not allow Partition Rebalance Handling > - > > Key: SPARK-32151 > URL: https://issues.apache.org/jira/browse/SPARK-32151 > Project: Spark > Issue Type: Improvement > Components: DStreams >Affects Versions: 2.4.5 >Reporter: Ed Mitchell >Priority: Minor > > When a consumer group rebalance occurs when the Spark driver is using the > Subscribe or Subscribe Pattern ConsumerStrategy, driver's offsets are cleared > when partitions are revoked and then reassigned. > While this doesn't happen in the normal rebalance scenario of more consumers > joining the group (though it could), it does happen when the partition leader > is reelected because of a Kafka node being stopped or decommissioned. > This seems to only occur when users specify their own offsets and do not use > Kafka as the persistent store of offsets (they use their own database, and > possibly if using checkpointing). > This could probably affect Structured Streaming. > This presents itself as an "NoOffsetForPartitionException": > {code:java} > 20/05/13 01:37:00 ERROR JobScheduler: Error generating jobs for time > 158933382 ms > org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined > offset with no reset policy for partitions: [production-ad-metrics-1, > production-ad-metrics-2, production-ad-metrics-0, production-ad-metrics-5, > production-ad-metrics-6, production-ad-metrics-3, production-ad-metrics-4, > production-ad-metrics-7] > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.resetMissingPositions(SubscriptionState.java:391) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2185) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1222) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:172) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:191) > at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:341) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:336) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:334) > at scala.Option.orElse(Option.scala:289) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:122) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:121) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247) > at scala.util.Try$.apply(Try.scala:192) > at > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247) > at > org.apache.spark.streaming