This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 02f32ee358c [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset
02f32ee358c is described below

commit 02f32ee358cc0a398aa7321bc5613cb92b306f6f
Author: wecharyu <yuwq1...@gmail.com>
AuthorDate: Thu Dec 8 17:12:30 2022 +0900

    [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset
    
    ### What changes were proposed in this pull request?
    
    Add the empty offset filter in `latestOffset()` for Kafka Source, so that 
offset remains unchanged if Kafka provides no topic partition during fetch.
    
    ### Why are the changes needed?
    
    KafkaOffsetReader may fetch empty partitions in some extreme cases like 
getting partitions while Kafka cluster is reassigning partitions, this will 
produce an empty `PartitionOffsetMap` (although there are topic-partitions 
being unchanged) and stored in `committedOffsets` after `runBatch()`.
    
    Then in the next batch, we fetch partitions normally and get the actual 
offsets, but when fetching data of this batch in 
`KafkaOffsetReaderAdmin#getOffsetRangesFromResolvedOffsets()` all partitions in 
endOffsets will be considered as new partitions since the startOffsets is 
empty, then these "new partitions" will fetch earliest offsets, which will 
cause the data duplication.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add an unit test.
    
    Closes #38898 from wecharyu/SPARK-41375.
    
    Authored-by: wecharyu <yuwq1...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 043475a87844f11c252fb0ebab469148ae6985d7)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala |  7 ++--
 .../apache/spark/sql/kafka010/KafkaSource.scala    |  4 +--
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 39 ++++++++++++++++++++++
 3 files changed, 43 insertions(+), 7 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 77bc658a1ef..a371d25899d 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -85,8 +85,6 @@ private[kafka010] class KafkaMicroBatchStream(
 
   private val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false)
 
-  private var endPartitionOffsets: KafkaSourceOffset = _
-
   private var latestPartitionOffsets: PartitionOffsetMap = _
 
   private var allDataForTriggerAvailableNow: PartitionOffsetMap = _
@@ -114,7 +112,7 @@ private[kafka010] class KafkaMicroBatchStream(
   }
 
   override def reportLatestOffset(): Offset = {
-    KafkaSourceOffset(latestPartitionOffsets)
+    
Option(KafkaSourceOffset(latestPartitionOffsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
   }
 
   override def latestOffset(): Offset = {
@@ -163,8 +161,7 @@ private[kafka010] class KafkaMicroBatchStream(
       }.getOrElse(latestPartitionOffsets)
     }
 
-    endPartitionOffsets = KafkaSourceOffset(offsets)
-    endPartitionOffsets
+    
Option(KafkaSourceOffset(offsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
   }
 
   /** Checks if we need to skip this trigger based on minOffsetsPerTrigger & 
maxTriggerDelay */
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index c82fda85eb4..b84643533f8 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -177,7 +177,7 @@ private[kafka010] class KafkaSource(
       kafkaReader.fetchLatestOffsets(currentOffsets)
     }
 
-    latestPartitionOffsets = Some(latest)
+    latestPartitionOffsets = if (latest.isEmpty) None else Some(latest)
 
     val limits: Seq[ReadLimit] = limit match {
       case rows: CompositeReadLimit => rows.getReadLimits
@@ -213,7 +213,7 @@ private[kafka010] class KafkaSource(
     }
     currentPartitionOffsets = Some(offsets)
     logDebug(s"GetOffset: ${offsets.toSeq.map(_.toString).sorted}")
-    KafkaSourceOffset(offsets)
+    
Option(KafkaSourceOffset(offsets)).filterNot(_.partitionToOffsets.isEmpty).orNull
   }
 
   /** Checks if we need to skip this trigger based on minOffsetsPerTrigger & 
maxTriggerDelay */
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index db71f0fd918..e033f13ebf6 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -624,6 +624,45 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     )
   }
 
+  test("SPARK-41375: empty partitions should not record to latest offset") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-good"
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, Array("-1"))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("kafka.request.timeout.ms", "3000")
+      .option("kafka.default.api.timeout.ms", "3000")
+      .option("subscribePattern", s"$topicPrefix-.*")
+      .option("failOnDataLoss", "false")
+
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2, 3),
+      CheckAnswer(2, 3, 4),
+      Assert {
+        testUtils.deleteTopic(topic)
+        true
+      },
+      AssertOnQuery { q =>
+        val latestOffset: Option[(Long, OffsetSeq)] = q.offsetLog.getLatest
+        latestOffset.exists { offset =>
+          !offset._2.offsets.exists(_.exists(_.json == "{}"))
+        }
+      }
+    )
+  }
+
   test("subscribe topic by pattern with topic recreation between batches") {
     val topicPrefix = newTopic()
     val topic = topicPrefix + "-good"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to