This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 02f32ee358c [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset 02f32ee358c is described below commit 02f32ee358cc0a398aa7321bc5613cb92b306f6f Author: wecharyu <yuwq1...@gmail.com> AuthorDate: Thu Dec 8 17:12:30 2022 +0900 [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset ### What changes were proposed in this pull request? Add the empty offset filter in `latestOffset()` for Kafka Source, so that offset remains unchanged if Kafka provides no topic partition during fetch. ### Why are the changes needed? KafkaOffsetReader may fetch empty partitions in some extreme cases like getting partitions while Kafka cluster is reassigning partitions, this will produce an empty `PartitionOffsetMap` (although there are topic-partitions being unchanged) and stored in `committedOffsets` after `runBatch()`. Then in the next batch, we fetch partitions normally and get the actual offsets, but when fetching data of this batch in `KafkaOffsetReaderAdmin#getOffsetRangesFromResolvedOffsets()` all partitions in endOffsets will be considered as new partitions since the startOffsets is empty, then these "new partitions" will fetch earliest offsets, which will cause the data duplication. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add an unit test. Closes #38898 from wecharyu/SPARK-41375. Authored-by: wecharyu <yuwq1...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 043475a87844f11c252fb0ebab469148ae6985d7) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 7 ++-- .../apache/spark/sql/kafka010/KafkaSource.scala | 4 +-- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 39 ++++++++++++++++++++++ 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 77bc658a1ef..a371d25899d 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -85,8 +85,6 @@ private[kafka010] class KafkaMicroBatchStream( private val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false) - private var endPartitionOffsets: KafkaSourceOffset = _ - private var latestPartitionOffsets: PartitionOffsetMap = _ private var allDataForTriggerAvailableNow: PartitionOffsetMap = _ @@ -114,7 +112,7 @@ private[kafka010] class KafkaMicroBatchStream( } override def reportLatestOffset(): Offset = { - KafkaSourceOffset(latestPartitionOffsets) + Option(KafkaSourceOffset(latestPartitionOffsets)).filterNot(_.partitionToOffsets.isEmpty).orNull } override def latestOffset(): Offset = { @@ -163,8 +161,7 @@ private[kafka010] class KafkaMicroBatchStream( }.getOrElse(latestPartitionOffsets) } - endPartitionOffsets = KafkaSourceOffset(offsets) - endPartitionOffsets + Option(KafkaSourceOffset(offsets)).filterNot(_.partitionToOffsets.isEmpty).orNull } /** Checks if we need to skip this trigger based on minOffsetsPerTrigger & maxTriggerDelay */ diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index c82fda85eb4..b84643533f8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -177,7 +177,7 @@ private[kafka010] class KafkaSource( kafkaReader.fetchLatestOffsets(currentOffsets) } - latestPartitionOffsets = Some(latest) + latestPartitionOffsets = if (latest.isEmpty) None else Some(latest) val limits: Seq[ReadLimit] = limit match { case rows: CompositeReadLimit => rows.getReadLimits @@ -213,7 +213,7 @@ private[kafka010] class KafkaSource( } currentPartitionOffsets = Some(offsets) logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}") - KafkaSourceOffset(offsets) + Option(KafkaSourceOffset(offsets)).filterNot(_.partitionToOffsets.isEmpty).orNull } /** Checks if we need to skip this trigger based on minOffsetsPerTrigger & maxTriggerDelay */ diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index db71f0fd918..e033f13ebf6 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -624,6 +624,45 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } + test("SPARK-41375: empty partitions should not record to latest offset") { + val topicPrefix = newTopic() + val topic = topicPrefix + "-good" + testUtils.createTopic(topic, partitions = 5) + testUtils.sendMessages(topic, Array("-1")) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.request.timeout.ms", "3000") + .option("kafka.default.api.timeout.ms", "3000") + .option("subscribePattern", s"$topicPrefix-.*") + .option("failOnDataLoss", "false") + + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped = kafka.map(kv => kv._2.toInt + 1) + + testStream(mapped)( + makeSureGetOffsetCalled, + AddKafkaData(Set(topic), 1, 2, 3), + CheckAnswer(2, 3, 4), + Assert { + testUtils.deleteTopic(topic) + true + }, + AssertOnQuery { q => + val latestOffset: Option[(Long, OffsetSeq)] = q.offsetLog.getLatest + latestOffset.exists { offset => + !offset._2.offsets.exists(_.exists(_.json == "{}")) + } + } + ) + } + test("subscribe topic by pattern with topic recreation between batches") { val topicPrefix = newTopic() val topic = topicPrefix + "-good" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org