[ https://issues.apache.org/jira/browse/SPARK-41375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-41375: ------------------------------------ Assignee: Apache Spark > Avoid empty latest KafkaSourceOffset > ------------------------------------ > > Key: SPARK-41375 > URL: https://issues.apache.org/jira/browse/SPARK-41375 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.3.1 > Reporter: Wechar > Assignee: Apache Spark > Priority: Major > Attachments: image-2022-12-04-03-11-11-428.png > > > We found the offsetLog recorded an empty offset `{}` for the KafkaSource: > !image-2022-12-04-03-11-11-428.png! > It occurs only once but this empty offset will cause the data duplication. > {*}Root Cause{*}: > The root cause is that Kafka consumer may get empty partitions in extreme > cases like getting partitions while Kafka cluster is reassigning partitions. > {code:scala} > // org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer > private def partitionsAssignedToConsumer( > body: ju.Set[TopicPartition] => Map[TopicPartition, Long], > fetchingEarliestOffset: Boolean = false) > : Map[TopicPartition, Long] = > uninterruptibleThreadRunner.runUninterruptibly { > withRetriesWithoutInterrupt { > // Poll to get the latest assigned partitions > consumer.poll(0) > val partitions = consumer.assignment() // partitions may be empty > if (!fetchingEarliestOffset) { > // Call `position` to wait until the potential offset request > triggered by `poll(0)` is > // done. This is a workaround for KAFKA-7703, which an async > `seekToBeginning` triggered by > // `poll(0)` may reset offsets that should have been set by another > request. > partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => > {}) > } > consumer.pause(partitions) > logDebug(s"Partitions assigned to consumer: $partitions.") > body(partitions) > } > } > {code} > *Solution:* > Add offset filter for latestOffset. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org