Wechar created SPARK-41375: ------------------------------ Summary: Avoid empty latest KafkaSourceOffset Key: SPARK-41375 URL: https://issues.apache.org/jira/browse/SPARK-41375 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 3.3.1 Reporter: Wechar
We found the offsetLog recorded an empty offset `{}` for the KafkaSource: !image-2022-12-04-02-43-35-018.png! It occurs only once but this empty offset will cause the data duplication. {*}Root Cause{*}: The root cause is that Kafka consumer may get empty partitions in extreme cases like getting partitions while Kafka cluster is reassigning partitions. {code:scala} // org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer private def partitionsAssignedToConsumer( body: ju.Set[TopicPartition] => Map[TopicPartition, Long], fetchingEarliestOffset: Boolean = false) : Map[TopicPartition, Long] = uninterruptibleThreadRunner.runUninterruptibly { withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions consumer.poll(0) val partitions = consumer.assignment() // partitions may be empty if (!fetchingEarliestOffset) { // Call `position` to wait until the potential offset request triggered by `poll(0)` is // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by // `poll(0)` may reset offsets that should have been set by another request. partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) } consumer.pause(partitions) logDebug(s"Partitions assigned to consumer: $partitions.") body(partitions) } } {code} *Solution:* Add offset filter for latestOffset. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org