This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bf2fc24967ba [SPARK-48383][SS] Throw better error for mismatched 
partitions in startOffset option in Kafka
bf2fc24967ba is described below

commit bf2fc24967bac8e87899be00815979b2f9968d4c
Author: Siying Dong <siying.d...@databricks.com>
AuthorDate: Mon Jun 3 12:54:17 2024 +0900

    [SPARK-48383][SS] Throw better error for mismatched partitions in 
startOffset option in Kafka
    
    ### What changes were proposed in this pull request?
    Create a new error class START_OFFSET_DOES_NOT_MATCH_ASSIGNED. When 
partition mismatch is found in KafkaOffsetReader between start offsets and 
assigned partitions, throw this exception.
    
    ### Why are the changes needed?
    In KafkaOffsetReader, we assert startOffsets have the same topic partition 
list as assigned. However, if the user changes topic partition while the query 
is running, they will see the assertion. Instead, they should see an exception.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Add a new unit test in KafkaOffsetReaderSuite to make sure the exception 
case is thrown correctly.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #46692 from siying/kafka_ef.
    
    Authored-by: Siying Dong <siying.d...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../resources/error/kafka-error-conditions.json    |  7 ++++++
 .../spark/sql/kafka010/KafkaExceptions.scala       | 10 +++++++++
 .../sql/kafka010/KafkaOffsetReaderAdmin.scala      |  7 +++---
 .../sql/kafka010/KafkaOffsetReaderConsumer.scala   |  7 +++---
 .../sql/kafka010/KafkaOffsetReaderSuite.scala      | 25 ++++++++++++++++++++++
 5 files changed, 48 insertions(+), 8 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json 
b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json
index a7b22e1370fd..2fa44d7bd66a 100644
--- 
a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json
+++ 
b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json
@@ -23,6 +23,13 @@
       "latest offset: <latestOffset>, end offset: <endOffset>"
     ]
   },
+  "KAFKA_START_OFFSET_DOES_NOT_MATCH_ASSIGNED" : {
+    "message" : [
+      "Partitions specified for Kafka start offsets don't match what are 
assigned. Maybe topic partitions are created ",
+      "or deleted while the query is running. Use -1 for latest, -2 for 
earliest.",
+      "Specified: <specifiedPartitions> Assigned: <assignedPartitions>"
+    ]
+  },
   "KAFKA_DATA_LOSS" : {
     "message" : [
       "Some data may have been lost because they are not available in Kafka 
any more;",
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
index 8dc4e543060d..13a68e72269f 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala
@@ -155,6 +155,16 @@ object KafkaExceptions {
         "prevOffset" -> prevOffset.toString,
         "newOffset" -> newOffset.toString))
   }
+
+  def startOffsetDoesNotMatchAssigned(
+      specifiedPartitions: Set[TopicPartition],
+      assignedPartitions: Set[TopicPartition]): KafkaIllegalStateException = {
+    new KafkaIllegalStateException(
+      errorClass = "KAFKA_START_OFFSET_DOES_NOT_MATCH_ASSIGNED",
+      messageParameters = Map(
+        "specifiedPartitions" -> specifiedPartitions.toString,
+        "assignedPartitions" -> assignedPartitions.toString))
+  }
 }
 
 /**
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
index 9ac06a41a068..bb4f14686f97 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala
@@ -120,10 +120,9 @@ private[kafka010] class KafkaOffsetReaderAdmin(
       isStartingOffsets: Boolean): Map[TopicPartition, Long] = {
     def validateTopicPartitions(partitions: Set[TopicPartition],
       partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] 
= {
-      assert(partitions == partitionOffsets.keySet,
-        "If startingOffsets contains specific offsets, you must specify all 
TopicPartitions.\n" +
-          "Use -1 for latest, -2 for earliest.\n" +
-          s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions}")
+      if (partitions != partitionOffsets.keySet) {
+        throw 
KafkaExceptions.startOffsetDoesNotMatchAssigned(partitionOffsets.keySet, 
partitions)
+      }
       logDebug(s"Assigned partitions: $partitions. Seeking to 
$partitionOffsets")
       partitionOffsets
     }
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
index eceedbee1541..fa53d6373176 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala
@@ -142,10 +142,9 @@ private[kafka010] class KafkaOffsetReaderConsumer(
       isStartingOffsets: Boolean): Map[TopicPartition, Long] = {
     def validateTopicPartitions(partitions: Set[TopicPartition],
       partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] 
= {
-      assert(partitions == partitionOffsets.keySet,
-        "If startingOffsets contains specific offsets, you must specify all 
TopicPartitions.\n" +
-          "Use -1 for latest, -2 for earliest.\n" +
-          s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions}")
+      if (partitions != partitionOffsets.keySet) {
+        throw 
KafkaExceptions.startOffsetDoesNotMatchAssigned(partitionOffsets.keySet, 
partitions)
+      }
       logDebug(s"Partitions assigned to consumer: $partitions. Seeking to 
$partitionOffsets")
       partitionOffsets
     }
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
index 691e81f02a8c..320485a79e59 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
@@ -135,6 +135,31 @@ class KafkaOffsetReaderSuite extends QueryTest with 
SharedSparkSession with Kafk
       KafkaOffsetRange(tp, 2, LATEST, None)).sortBy(_.topicPartition.toString))
   }
 
+  testWithAllOffsetFetchingSQLConf(
+    "SPARK-48383: START_OFFSET_DOES_NOT_MATCH_ASSIGNED error class"
+  ) {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 3)
+    val reader = createKafkaReader(topic, minPartitions = Some(4))
+
+    // There are three topic partitions, but we only include two in offsets.
+    val tp1 = new TopicPartition(topic, 0)
+    val tp2 = new TopicPartition(topic, 1)
+    val startingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> EARLIEST, tp2 -> 
EARLIEST))
+    val endingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> LATEST, tp2 -> 3))
+
+    val ex = intercept[KafkaIllegalStateException] {
+      reader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, 
endingOffsets)
+    }
+    checkError(
+      exception = ex,
+      errorClass = "KAFKA_START_OFFSET_DOES_NOT_MATCH_ASSIGNED",
+      parameters = Map(
+        "specifiedPartitions" -> "Set\\(.*,.*\\)",
+        "assignedPartitions" -> "Set\\(.*,.*,.*\\)"),
+      matchPVals = true)
+  }
+
   testWithAllOffsetFetchingSQLConf("SPARK-30656: 
getOffsetRangesFromUnresolvedOffsets - " +
     "multiple topic partitions") {
     val topic = newTopic()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to