This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bf2fc24967ba [SPARK-48383][SS] Throw better error for mismatched partitions in startOffset option in Kafka bf2fc24967ba is described below commit bf2fc24967bac8e87899be00815979b2f9968d4c Author: Siying Dong <siying.d...@databricks.com> AuthorDate: Mon Jun 3 12:54:17 2024 +0900 [SPARK-48383][SS] Throw better error for mismatched partitions in startOffset option in Kafka ### What changes were proposed in this pull request? Create a new error class START_OFFSET_DOES_NOT_MATCH_ASSIGNED. When partition mismatch is found in KafkaOffsetReader between start offsets and assigned partitions, throw this exception. ### Why are the changes needed? In KafkaOffsetReader, we assert startOffsets have the same topic partition list as assigned. However, if the user changes topic partition while the query is running, they will see the assertion. Instead, they should see an exception. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add a new unit test in KafkaOffsetReaderSuite to make sure the exception case is thrown correctly. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46692 from siying/kafka_ef. Authored-by: Siying Dong <siying.d...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../resources/error/kafka-error-conditions.json | 7 ++++++ .../spark/sql/kafka010/KafkaExceptions.scala | 10 +++++++++ .../sql/kafka010/KafkaOffsetReaderAdmin.scala | 7 +++--- .../sql/kafka010/KafkaOffsetReaderConsumer.scala | 7 +++--- .../sql/kafka010/KafkaOffsetReaderSuite.scala | 25 ++++++++++++++++++++++ 5 files changed, 48 insertions(+), 8 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json index a7b22e1370fd..2fa44d7bd66a 100644 --- a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json +++ b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-conditions.json @@ -23,6 +23,13 @@ "latest offset: <latestOffset>, end offset: <endOffset>" ] }, + "KAFKA_START_OFFSET_DOES_NOT_MATCH_ASSIGNED" : { + "message" : [ + "Partitions specified for Kafka start offsets don't match what are assigned. Maybe topic partitions are created ", + "or deleted while the query is running. Use -1 for latest, -2 for earliest.", + "Specified: <specifiedPartitions> Assigned: <assignedPartitions>" + ] + }, "KAFKA_DATA_LOSS" : { "message" : [ "Some data may have been lost because they are not available in Kafka any more;", diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala index 8dc4e543060d..13a68e72269f 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala @@ -155,6 +155,16 @@ object KafkaExceptions { "prevOffset" -> prevOffset.toString, "newOffset" -> newOffset.toString)) } + + def startOffsetDoesNotMatchAssigned( + specifiedPartitions: Set[TopicPartition], + assignedPartitions: Set[TopicPartition]): KafkaIllegalStateException = { + new KafkaIllegalStateException( + errorClass = "KAFKA_START_OFFSET_DOES_NOT_MATCH_ASSIGNED", + messageParameters = Map( + "specifiedPartitions" -> specifiedPartitions.toString, + "assignedPartitions" -> assignedPartitions.toString)) + } } /** diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala index 9ac06a41a068..bb4f14686f97 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala @@ -120,10 +120,9 @@ private[kafka010] class KafkaOffsetReaderAdmin( isStartingOffsets: Boolean): Map[TopicPartition, Long] = { def validateTopicPartitions(partitions: Set[TopicPartition], partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { - assert(partitions == partitionOffsets.keySet, - "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + - "Use -1 for latest, -2 for earliest.\n" + - s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions}") + if (partitions != partitionOffsets.keySet) { + throw KafkaExceptions.startOffsetDoesNotMatchAssigned(partitionOffsets.keySet, partitions) + } logDebug(s"Assigned partitions: $partitions. Seeking to $partitionOffsets") partitionOffsets } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala index eceedbee1541..fa53d6373176 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala @@ -142,10 +142,9 @@ private[kafka010] class KafkaOffsetReaderConsumer( isStartingOffsets: Boolean): Map[TopicPartition, Long] = { def validateTopicPartitions(partitions: Set[TopicPartition], partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = { - assert(partitions == partitionOffsets.keySet, - "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + - "Use -1 for latest, -2 for earliest.\n" + - s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions}") + if (partitions != partitionOffsets.keySet) { + throw KafkaExceptions.startOffsetDoesNotMatchAssigned(partitionOffsets.keySet, partitions) + } logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets") partitionOffsets } diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala index 691e81f02a8c..320485a79e59 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala @@ -135,6 +135,31 @@ class KafkaOffsetReaderSuite extends QueryTest with SharedSparkSession with Kafk KafkaOffsetRange(tp, 2, LATEST, None)).sortBy(_.topicPartition.toString)) } + testWithAllOffsetFetchingSQLConf( + "SPARK-48383: START_OFFSET_DOES_NOT_MATCH_ASSIGNED error class" + ) { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + val reader = createKafkaReader(topic, minPartitions = Some(4)) + + // There are three topic partitions, but we only include two in offsets. + val tp1 = new TopicPartition(topic, 0) + val tp2 = new TopicPartition(topic, 1) + val startingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> EARLIEST, tp2 -> EARLIEST)) + val endingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> LATEST, tp2 -> 3)) + + val ex = intercept[KafkaIllegalStateException] { + reader.getOffsetRangesFromUnresolvedOffsets(startingOffsets, endingOffsets) + } + checkError( + exception = ex, + errorClass = "KAFKA_START_OFFSET_DOES_NOT_MATCH_ASSIGNED", + parameters = Map( + "specifiedPartitions" -> "Set\\(.*,.*\\)", + "assignedPartitions" -> "Set\\(.*,.*,.*\\)"), + matchPVals = true) + } + testWithAllOffsetFetchingSQLConf("SPARK-30656: getOffsetRangesFromUnresolvedOffsets - " + "multiple topic partitions") { val topic = newTopic() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org