This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 36df0a63a139 [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions 36df0a63a139 is described below commit 36df0a63a139704ccd2a344d057e430430b11ad8 Author: micheal-o <micheal.okut...@gmail.com> AuthorDate: Thu Feb 29 09:26:47 2024 +0900 [SPARK-47135][SS] Implement error classes for Kafka data loss exceptions ### What changes were proposed in this pull request? In the kafka connector code, we have several code that throws the java **IllegalStateException** to report data loss, while reading from Kafka. This change is to properly classify those exceptions using the new error framework. Adds a new exception type `SparkIllegalStateException` that can receive error class. New error classes are introduced for Kafka data loss errors. ### Why are the changes needed? New error framework for better error messages ### Does this PR introduce _any_ user-facing change? Yes, better error message with error class ### How was this patch tested? Updated existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #45221 from micheal-o/bmo/IllegalStateEx. Lead-authored-by: micheal-o <micheal.okut...@gmail.com> Co-authored-by: micheal-o <micheal.okut...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../main/resources/error/kafka-error-classes.json | 56 +++++++++++ .../spark/sql/kafka010/KafkaContinuousStream.scala | 26 ++--- .../spark/sql/kafka010/KafkaExceptions.scala | 109 +++++++++++++++++++-- .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 6 +- .../spark/sql/kafka010/KafkaOffsetReader.scala | 4 +- .../sql/kafka010/KafkaOffsetReaderAdmin.scala | 42 +++++--- .../sql/kafka010/KafkaOffsetReaderConsumer.scala | 46 ++++++--- .../apache/spark/sql/kafka010/KafkaSource.scala | 6 +- .../spark/sql/kafka010/KafkaSourceProvider.scala | 8 -- .../sql/kafka010/consumer/KafkaDataConsumer.scala | 85 ++++++++-------- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 2 +- .../sql/kafka010/KafkaOffsetReaderSuite.scala | 2 +- .../kafka010/consumer/KafkaDataConsumerSuite.scala | 11 ++- 13 files changed, 290 insertions(+), 113 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-classes.json b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-classes.json index ea7ffb592a55..a7b22e1370fd 100644 --- a/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-classes.json +++ b/connector/kafka-0-10-sql/src/main/resources/error/kafka-error-classes.json @@ -22,5 +22,61 @@ "Some of partitions in Kafka topic(s) report available offset which is less than end offset during running query with Trigger.AvailableNow. The error could be transient - restart your query, and report if you still see the same issue.", "latest offset: <latestOffset>, end offset: <endOffset>" ] + }, + "KAFKA_DATA_LOSS" : { + "message" : [ + "Some data may have been lost because they are not available in Kafka any more;", + "either the data was aged out by Kafka or the topic may have been deleted before all the data in the", + "topic was processed.", + "If you don't want your streaming query to fail on such cases, set the source option failOnDataLoss to false.", + "Reason:" + ], + "subClass" : { + "ADDED_PARTITION_DOES_NOT_START_FROM_OFFSET_ZERO" : { + "message" : [ + "Added partition <topicPartition> starts from <startOffset> instead of 0." + ] + }, + "COULD_NOT_READ_OFFSET_RANGE" : { + "message" : [ + "Could not read records in offset [<startOffset>, <endOffset>) for topic partition <topicPartition>", + "with consumer group <groupId>." + ] + }, + "INITIAL_OFFSET_NOT_FOUND_FOR_PARTITIONS" : { + "message" : [ + "Cannot find initial offsets for partitions <partitions>. They may have been deleted." + ] + }, + "PARTITIONS_DELETED" : { + "message" : [ + "Partitions <partitions> have been deleted." + ] + }, + "PARTITIONS_DELETED_AND_GROUP_ID_CONFIG_PRESENT" : { + "message" : [ + "Partitions <partitions> have been deleted.", + "Kafka option 'kafka.<groupIdConfig>' has been set on this query, it is", + "not recommended to set this option. This option is unsafe to use since multiple concurrent", + "queries or sources using the same group id will interfere with each other as they are part", + "of the same consumer group. Restarted queries may also suffer interference from the", + "previous run having the same group id. The user should have only one query per group id,", + "and/or set the option 'kafka.session.timeout.ms' to be very small so that the Kafka", + "consumers from the previous query are marked dead by the Kafka group coordinator before the", + "restarted query starts running." + ] + }, + "PARTITION_OFFSET_CHANGED" : { + "message" : [ + "Partition <topicPartition> offset was changed from <prevOffset> to <newOffset>." + ] + }, + "START_OFFSET_RESET" : { + "message" : [ + "Starting offset for <topicPartition> was <offset> but consumer reset to <fetchedOffset>." + ] + } + }, + "sqlState" : "22000" } } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index a86acd971a1c..9b7f52585545 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -92,13 +92,17 @@ class KafkaContinuousStream( val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet) if (deletedPartitions.nonEmpty) { - val message = if ( - offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}" - } else { - s"$deletedPartitions are gone. Some data may have been missed." - } - reportDataLoss(message) + val (message, config) = + if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + (s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}", + Some(ConsumerConfig.GROUP_ID_CONFIG)) + } else { + (s"$deletedPartitions are gone. Some data may have been missed.", None) + } + + reportDataLoss( + message, + () => KafkaExceptions.partitionsDeleted(deletedPartitions, config)) } val startOffsets = newPartitionOffsets ++ @@ -137,12 +141,12 @@ class KafkaContinuousStream( override def toString(): String = s"KafkaSource[$offsetReader]" /** - * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. + * If `failOnDataLoss` is true, this method will throw the exception. * Otherwise, just log a warning. */ - private def reportDataLoss(message: String): Unit = { + private def reportDataLoss(message: String, getException: () => Throwable): Unit = { if (failOnDataLoss) { - throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") + throw getException() } else { logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") } @@ -221,7 +225,7 @@ class KafkaContinuousPartitionReader( // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range, // or if it's the endpoint of the data range (i.e. the "true" next offset). - case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] => + case e: KafkaIllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] => val range = consumer.getAvailableOffsetRange() if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) { // retry diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala index b0e30f37af51..300d288507a3 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaExceptions.scala @@ -17,19 +17,23 @@ package org.apache.spark.sql.kafka010 +import scala.jdk.CollectionConverters._ + import org.apache.kafka.common.TopicPartition -import org.apache.spark.{ErrorClassesJsonReader, SparkException} +import org.apache.spark.{ErrorClassesJsonReader, SparkException, SparkThrowable} -object KafkaExceptions { - private val errorClassesJsonReader: ErrorClassesJsonReader = +private object KafkaExceptionsHelper { + val errorClassesJsonReader: ErrorClassesJsonReader = new ErrorClassesJsonReader( Seq(getClass.getClassLoader.getResource("error/kafka-error-classes.json"))) +} +object KafkaExceptions { def mismatchedTopicPartitionsBetweenEndOffsetAndPrefetched( tpsForPrefetched: Set[TopicPartition], tpsForEndOffset: Set[TopicPartition]): SparkException = { - val errMsg = errorClassesJsonReader.getErrorMessage( + val errMsg = KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage( "MISMATCHED_TOPIC_PARTITIONS_BETWEEN_END_OFFSET_AND_PREFETCHED", Map( "tpsForPrefetched" -> tpsForPrefetched.toString(), @@ -42,7 +46,7 @@ object KafkaExceptions { def endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched( prefetchedOffset: Map[TopicPartition, Long], endOffset: Map[TopicPartition, Long]): SparkException = { - val errMsg = errorClassesJsonReader.getErrorMessage( + val errMsg = KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage( "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_PREFETCHED", Map( "prefetchedOffset" -> prefetchedOffset.toString(), @@ -55,7 +59,7 @@ object KafkaExceptions { def lostTopicPartitionsInEndOffsetWithTriggerAvailableNow( tpsForLatestOffset: Set[TopicPartition], tpsForEndOffset: Set[TopicPartition]): SparkException = { - val errMsg = errorClassesJsonReader.getErrorMessage( + val errMsg = KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage( "LOST_TOPIC_PARTITIONS_IN_END_OFFSET_WITH_TRIGGER_AVAILABLENOW", Map( "tpsForLatestOffset" -> tpsForLatestOffset.toString(), @@ -68,7 +72,7 @@ object KafkaExceptions { def endOffsetHasGreaterOffsetForTopicPartitionThanLatestWithTriggerAvailableNow( latestOffset: Map[TopicPartition, Long], endOffset: Map[TopicPartition, Long]): SparkException = { - val errMsg = errorClassesJsonReader.getErrorMessage( + val errMsg = KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage( "END_OFFSET_HAS_GREATER_OFFSET_FOR_TOPIC_PARTITION_THAN_LATEST_WITH_TRIGGER_AVAILABLENOW", Map( "latestOffset" -> latestOffset.toString(), @@ -77,4 +81,95 @@ object KafkaExceptions { ) new SparkException(errMsg) } + + def couldNotReadOffsetRange( + startOffset: Long, + endOffset: Long, + topicPartition: TopicPartition, + groupId: String, + cause: Throwable): KafkaIllegalStateException = { + new KafkaIllegalStateException( + errorClass = "KAFKA_DATA_LOSS.COULD_NOT_READ_OFFSET_RANGE", + messageParameters = Map( + "startOffset" -> startOffset.toString, + "endOffset" -> endOffset.toString, + "topicPartition" -> topicPartition.toString, + "groupId" -> groupId), + cause = cause) + } + + def startOffsetReset( + topicPartition: TopicPartition, + offset: Long, + fetchedOffset: Long): KafkaIllegalStateException = { + new KafkaIllegalStateException( + errorClass = "KAFKA_DATA_LOSS.START_OFFSET_RESET", + messageParameters = Map( + "topicPartition" -> topicPartition.toString, + "offset" -> offset.toString, + "fetchedOffset" -> fetchedOffset.toString)) + } + + def initialOffsetNotFoundForPartitions( + partitions: Set[TopicPartition]): KafkaIllegalStateException = { + new KafkaIllegalStateException( + errorClass = "KAFKA_DATA_LOSS.INITIAL_OFFSET_NOT_FOUND_FOR_PARTITIONS", + messageParameters = Map("partitions" -> partitions.toString)) + } + + def addedPartitionDoesNotStartFromZero( + topicPartition: TopicPartition, + startOffset: Long): KafkaIllegalStateException = { + new KafkaIllegalStateException( + errorClass = "KAFKA_DATA_LOSS.ADDED_PARTITION_DOES_NOT_START_FROM_OFFSET_ZERO", + messageParameters = + Map("topicPartition" -> topicPartition.toString, "startOffset" -> startOffset.toString)) + } + + def partitionsDeleted( + partitions: Set[TopicPartition], + groupIdConfigName: Option[String]): KafkaIllegalStateException = { + groupIdConfigName match { + case Some(config) => + new KafkaIllegalStateException( + errorClass = "KAFKA_DATA_LOSS.PARTITIONS_DELETED_AND_GROUP_ID_CONFIG_PRESENT", + messageParameters = Map("partitions" -> partitions.toString, "groupIdConfig" -> config)) + case None => + new KafkaIllegalStateException( + errorClass = "KAFKA_DATA_LOSS.PARTITIONS_DELETED", + messageParameters = Map("partitions" -> partitions.toString)) + } + } + + def partitionOffsetChanged( + topicPartition: TopicPartition, + prevOffset: Long, + newOffset: Long): KafkaIllegalStateException = { + new KafkaIllegalStateException( + errorClass = "KAFKA_DATA_LOSS.PARTITION_OFFSET_CHANGED", + messageParameters = Map( + "topicPartition" -> topicPartition.toString, + "prevOffset" -> prevOffset.toString, + "newOffset" -> newOffset.toString)) + } +} + +/** + * Illegal state exception thrown with an error class. + */ +private[kafka010] class KafkaIllegalStateException( + errorClass: String, + messageParameters: Map[String, String], + cause: Throwable = null) + extends IllegalStateException( + KafkaExceptionsHelper.errorClassesJsonReader.getErrorMessage( + errorClass, messageParameters), cause) + with SparkThrowable { + + override def getSqlState: String = + KafkaExceptionsHelper.errorClassesJsonReader.getSqlState(errorClass) + + override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava + + override def getErrorClass: String = errorClass } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index e92ebecfce08..fefa3efcc353 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -302,12 +302,12 @@ private[kafka010] class KafkaMicroBatchStream( } /** - * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. + * If `failOnDataLoss` is true, this method will throw the exception. * Otherwise, just log a warning. */ - private def reportDataLoss(message: String): Unit = { + private def reportDataLoss(message: String, getException: () => Throwable): Unit = { if (failOnDataLoss) { - throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") + throw getException() } else { logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index b1370e10501f..df0c7e9c0425 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -62,7 +62,7 @@ private[kafka010] trait KafkaOffsetReader { */ def fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], - reportDataLoss: String => Unit): KafkaSourceOffset + reportDataLoss: (String, () => Throwable) => Unit): KafkaSourceOffset /** * Resolves the specific offsets based on timestamp per topic-partition. @@ -147,7 +147,7 @@ private[kafka010] trait KafkaOffsetReader { def getOffsetRangesFromResolvedOffsets( fromPartitionOffsets: PartitionOffsetMap, untilPartitionOffsets: PartitionOffsetMap, - reportDataLoss: String => Unit): Seq[KafkaOffsetRange] + reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] } private[kafka010] object KafkaOffsetReader extends Logging { diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala index 9206dfe9b3f2..27adccf6f902 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala @@ -148,7 +148,7 @@ private[kafka010] class KafkaOffsetReaderAdmin( override def fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], - reportDataLoss: String => Unit): KafkaSourceOffset = { + reportDataLoss: (String, () => Throwable) => Unit): KafkaSourceOffset = { val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => assert(partitions.asScala == partitionOffsets.keySet, "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + @@ -404,8 +404,10 @@ private[kafka010] class KafkaOffsetReaderAdmin( offsetRangesBase.map(range => (range.topicPartition, range.untilOffset)).toMap // No need to report data loss here - val resolvedFromOffsets = fetchSpecificOffsets(fromOffsetsMap, _ => ()).partitionToOffsets - val resolvedUntilOffsets = fetchSpecificOffsets(untilOffsetsMap, _ => ()).partitionToOffsets + val resolvedFromOffsets = + fetchSpecificOffsets(fromOffsetsMap, (_, _) => ()).partitionToOffsets + val resolvedUntilOffsets = + fetchSpecificOffsets(untilOffsetsMap, (_, _) => ()).partitionToOffsets val ranges = offsetRangesBase.map(_.topicPartition).map { tp => KafkaOffsetRange(tp, resolvedFromOffsets(tp), resolvedUntilOffsets(tp), preferredLoc = None) } @@ -444,7 +446,7 @@ private[kafka010] class KafkaOffsetReaderAdmin( override def getOffsetRangesFromResolvedOffsets( fromPartitionOffsets: PartitionOffsetMap, untilPartitionOffsets: PartitionOffsetMap, - reportDataLoss: String => Unit): Seq[KafkaOffsetRange] = { + reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = { // Find the new partitions, and get their earliest offsets val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet) val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq) @@ -452,22 +454,31 @@ private[kafka010] class KafkaOffsetReaderAdmin( // We cannot get from offsets for some partitions. It means they got deleted. val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) reportDataLoss( - s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed") + s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed", + () => + KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions)) } logInfo(s"Partitions added: $newPartitionInitialOffsets") newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => reportDataLoss( - s"Added partition $p starts from $o instead of 0. Some data may have been missed") + s"Added partition $p starts from $o instead of 0. Some data may have been missed", + () => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o)) } val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet) if (deletedPartitions.nonEmpty) { - val message = if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" - } else { - s"$deletedPartitions are gone. Some data may have been missed." - } - reportDataLoss(message) + val (message, config) = + if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + (s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}", + Some(ConsumerConfig.GROUP_ID_CONFIG)) + } else { + (s"$deletedPartitions are gone. Some data may have been missed.", None) + } + + reportDataLoss( + message, + () => + KafkaExceptions.partitionsDeleted(deletedPartitions, config)) } // Use the until partitions to calculate offset ranges to ignore partitions that have @@ -484,8 +495,11 @@ private[kafka010] class KafkaOffsetReaderAdmin( val fromOffset = fromOffsets(tp) val untilOffset = untilOffsets(tp) if (untilOffset < fromOffset) { - reportDataLoss(s"Partition $tp's offset was changed from " + - s"$fromOffset to $untilOffset, some data may have been missed") + reportDataLoss( + s"Partition $tp's offset was changed from " + + s"$fromOffset to $untilOffset, some data may have been missed", + () => + KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset)) } KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None) } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala index a859cd3d55a9..d4953a4a65e3 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala @@ -170,7 +170,7 @@ private[kafka010] class KafkaOffsetReaderConsumer( override def fetchSpecificOffsets( partitionOffsets: Map[TopicPartition, Long], - reportDataLoss: String => Unit): KafkaSourceOffset = { + reportDataLoss: (String, () => Throwable) => Unit): KafkaSourceOffset = { val fnAssertParametersWithPartitions: ju.Set[TopicPartition] => Unit = { partitions => assert(partitions.asScala == partitionOffsets.keySet, "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + @@ -189,7 +189,9 @@ private[kafka010] class KafkaOffsetReaderConsumer( off != KafkaOffsetRangeLimit.EARLIEST => if (fetched(tp) != off) { reportDataLoss( - s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") + s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}", + () => + KafkaExceptions.startOffsetReset(tp, off, fetched(tp))) } case _ => // no real way to check that beginning or end is reasonable @@ -451,8 +453,10 @@ private[kafka010] class KafkaOffsetReaderConsumer( offsetRangesBase.map(range => (range.topicPartition, range.untilOffset)).toMap // No need to report data loss here - val resolvedFromOffsets = fetchSpecificOffsets(fromOffsetsMap, _ => ()).partitionToOffsets - val resolvedUntilOffsets = fetchSpecificOffsets(untilOffsetsMap, _ => ()).partitionToOffsets + val resolvedFromOffsets = + fetchSpecificOffsets(fromOffsetsMap, (_, _) => ()).partitionToOffsets + val resolvedUntilOffsets = + fetchSpecificOffsets(untilOffsetsMap, (_, _) => ()).partitionToOffsets val ranges = offsetRangesBase.map(_.topicPartition).map { tp => KafkaOffsetRange(tp, resolvedFromOffsets(tp), resolvedUntilOffsets(tp), preferredLoc = None) } @@ -491,7 +495,7 @@ private[kafka010] class KafkaOffsetReaderConsumer( override def getOffsetRangesFromResolvedOffsets( fromPartitionOffsets: PartitionOffsetMap, untilPartitionOffsets: PartitionOffsetMap, - reportDataLoss: String => Unit): Seq[KafkaOffsetRange] = { + reportDataLoss: (String, () => Throwable) => Unit): Seq[KafkaOffsetRange] = { // Find the new partitions, and get their earliest offsets val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet) val newPartitionInitialOffsets = fetchEarliestOffsets(newPartitions.toSeq) @@ -499,22 +503,31 @@ private[kafka010] class KafkaOffsetReaderConsumer( // We cannot get from offsets for some partitions. It means they got deleted. val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet) reportDataLoss( - s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed") + s"Cannot find earliest offsets of ${deletedPartitions}. Some data may have been missed", + () => + KafkaExceptions.initialOffsetNotFoundForPartitions(deletedPartitions)) } logInfo(s"Partitions added: $newPartitionInitialOffsets") newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) => reportDataLoss( - s"Added partition $p starts from $o instead of 0. Some data may have been missed") + s"Added partition $p starts from $o instead of 0. Some data may have been missed", + () => KafkaExceptions.addedPartitionDoesNotStartFromZero(p, o)) } val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet) if (deletedPartitions.nonEmpty) { - val message = if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}" - } else { - s"$deletedPartitions are gone. Some data may have been missed." - } - reportDataLoss(message) + val (message, config) = + if (driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + (s"$deletedPartitions are gone.${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}", + Some(ConsumerConfig.GROUP_ID_CONFIG)) + } else { + (s"$deletedPartitions are gone. Some data may have been missed.", None) + } + + reportDataLoss( + message, + () => + KafkaExceptions.partitionsDeleted(deletedPartitions, config)) } // Use the until partitions to calculate offset ranges to ignore partitions that have @@ -531,8 +544,11 @@ private[kafka010] class KafkaOffsetReaderConsumer( val fromOffset = fromOffsets(tp) val untilOffset = untilOffsets(tp) if (untilOffset < fromOffset) { - reportDataLoss(s"Partition $tp's offset was changed from " + - s"$fromOffset to $untilOffset, some data may have been missed") + reportDataLoss( + s"Partition $tp's offset was changed from " + + s"$fromOffset to $untilOffset, some data may have been missed", + () => + KafkaExceptions.partitionOffsetChanged(tp, fromOffset, untilOffset)) } KafkaOffsetRange(tp, fromOffset, untilOffset, preferredLoc = None) } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 6f6d6319cd6f..83ed7fff23fc 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -344,12 +344,12 @@ private[kafka010] class KafkaSource( override def toString(): String = s"KafkaSourceV1[$kafkaReader]" /** - * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. + * If `failOnDataLoss` is true, this method will throw the exception. * Otherwise, just log a warning. */ - private def reportDataLoss(message: String): Unit = { + private def reportDataLoss(message: String, getException: () => Throwable): Unit = { if (failOnDataLoss) { - throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") + throw getException() } else { logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 73446eddd25f..a5fc00ff29ff 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -585,14 +585,6 @@ private[kafka010] object KafkaSourceProvider extends Logging { | option "failOnDataLoss" to "true". """.stripMargin - val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE = - """ - |Some data may have been lost because they are not available in Kafka any more; either the - | data was aged out by Kafka or the topic may have been deleted before all the data in the - | topic was processed. If you don't want your streaming query to fail on such cases, set the - | source option "failOnDataLoss" to "false". - """.stripMargin - val CUSTOM_GROUP_ID_ERROR_MESSAGE = s"""Kafka option 'kafka.${ConsumerConfig.GROUP_ID_CONFIG}' has been set on this query, it is | not recommended to set this option. This option is unsafe to use since multiple concurrent diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala index 2bd883a3cd51..fbc4a500322e 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala @@ -32,6 +32,7 @@ import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.Logging import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaTokenUtil} +import org.apache.spark.sql.kafka010.KafkaExceptions import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.{AvailableOffsetRange, UNKNOWN_OFFSET} import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread} @@ -340,8 +341,11 @@ private[kafka010] class KafkaDataConsumer( releaseConsumer() fetchedData.reset() - reportDataLoss(topicPartition, groupId, failOnDataLoss, - s"Cannot fetch offset $toFetchOffset", e) + if (failOnDataLoss) { + throwOnDataLoss(toFetchOffset, untilOffset, topicPartition, groupId, e) + } else { + logOnDataLoss(topicPartition, groupId, s"Cannot fetch offset $toFetchOffset", e) + } val oldToFetchOffsetd = toFetchOffset toFetchOffset = getEarliestAvailableOffsetBetween(consumer, toFetchOffset, untilOffset) @@ -443,7 +447,7 @@ private[kafka010] class KafkaDataConsumer( s""" |The current available offset range is $range. | Offset $offset is out of range, and records in [$offset, $untilOffset) will be - | skipped ${additionalMessage(topicPartition, groupId, failOnDataLoss = false)} + | skipped ${additionalWarningMessage(topicPartition, groupId)} """.stripMargin logWarning(warningMessage) UNKNOWN_OFFSET @@ -457,7 +461,7 @@ private[kafka010] class KafkaDataConsumer( // then we will see `offset` disappears first then appears again. Although the parameters // are same, the state in Kafka cluster is changed, so the outer loop won't be endless. logWarning(s"Found a disappeared offset $offset. Some data may be lost " + - s"${additionalMessage(topicPartition, groupId, failOnDataLoss = false)}") + s"${additionalWarningMessage(topicPartition, groupId)}") offset } else { // ------------------------------------------------------------------------------ @@ -468,7 +472,7 @@ private[kafka010] class KafkaDataConsumer( s""" |The current available offset range is $range. | Offset ${offset} is out of range, and records in [$offset, ${range.earliest}) will be - | skipped ${additionalMessage(topicPartition, groupId, failOnDataLoss = false)} + | skipped ${additionalWarningMessage(topicPartition, groupId)} """.stripMargin logWarning(warningMessage) range.earliest @@ -535,18 +539,17 @@ private[kafka010] class KafkaDataConsumer( } // This may happen when some records aged out but their offsets already got verified if (failOnDataLoss) { - reportDataLoss(consumer.topicPartition, consumer.groupId, failOnDataLoss = true, - s"Cannot fetch records in [$offset, ${record.offset})") + throwOnDataLoss(offset, record.offset, consumer.topicPartition, consumer.groupId) // Never happen as "reportDataLoss" will throw an exception throw new IllegalStateException( "reportDataLoss didn't throw an exception when 'failOnDataLoss' is true") } else if (record.offset >= untilOffset) { - reportDataLoss(consumer.topicPartition, consumer.groupId, failOnDataLoss = false, + logOnDataLoss(consumer.topicPartition, consumer.groupId, s"Skip missing records in [$offset, $untilOffset)") // Set `nextOffsetToFetch` to `untilOffset` to finish the current batch. fetchedRecord.withRecord(null, untilOffset) } else { - reportDataLoss(consumer.topicPartition, consumer.groupId, failOnDataLoss = false, + logOnDataLoss(consumer.topicPartition, consumer.groupId, s"Skip missing records in [$offset, ${record.offset})") fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData) } @@ -624,31 +627,47 @@ private[kafka010] class KafkaDataConsumer( /** * Return an addition message including useful message and instruction. */ - private def additionalMessage( + private def additionalWarningMessage( + topicPartition: TopicPartition, + groupId: String): String = { + s"(GroupId: $groupId, TopicPartition: $topicPartition). " + + s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE" + } + + /** + * Throw an exception when data loss is detected. + */ + private def throwOnDataLoss( + startOffset: Long, + endOffset: Long, topicPartition: TopicPartition, groupId: String, - failOnDataLoss: Boolean): String = { - if (failOnDataLoss) { - s"(GroupId: $groupId, TopicPartition: $topicPartition). " + - s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE" - } else { - s"(GroupId: $groupId, TopicPartition: $topicPartition). " + - s"$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE" - } + cause: Throwable = null): Unit = { + dataLoss += 1 + throw KafkaExceptions.couldNotReadOffsetRange( + startOffset, + endOffset, + topicPartition, + groupId, + cause) } /** - * Throw an exception or log a warning as per `failOnDataLoss`. + * Log a warning when data loss is detected. */ - private def reportDataLoss( + private def logOnDataLoss( topicPartition: TopicPartition, groupId: String, - failOnDataLoss: Boolean, message: String, cause: Throwable = null): Unit = { - val finalMessage = s"$message ${additionalMessage(topicPartition, groupId, failOnDataLoss)}" + val finalMessage = s"$message ${additionalWarningMessage(topicPartition, groupId)}" + dataLoss += 1 - reportDataLoss0(failOnDataLoss, finalMessage, cause) + if (cause != null) { + logWarning(finalMessage, cause) + } else { + logWarning(finalMessage) + } } private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match { @@ -714,24 +733,4 @@ private[kafka010] object KafkaDataConsumer extends Logging { new KafkaDataConsumer(topicPartition, kafkaParams, consumerPool, fetchedDataPool) } - - private def reportDataLoss0( - failOnDataLoss: Boolean, - finalMessage: String, - cause: Throwable = null): Unit = { - if (failOnDataLoss) { - if (cause != null) { - throw new IllegalStateException(finalMessage, cause) - } else { - throw new IllegalStateException(finalMessage) - } - } else { - if (cause != null) { - logWarning(finalMessage, cause) - } else { - logWarning(finalMessage) - } - } - } - } diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index fb5e71a1e7b8..9ae6a9290f80 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -904,7 +904,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[IllegalStateException](e => { + ExpectFailure[KafkaIllegalStateException](e => { // The offset of `topic2` should be changed from 2 to 1 assert(e.getMessage.contains("was changed from 2 to 1")) }) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala index 332db5483b83..691e81f02a8c 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala @@ -170,7 +170,7 @@ class KafkaOffsetReaderSuite extends QueryTest with SharedSparkSession with Kafk val offsetRanges = reader.getOffsetRangesFromResolvedOffsets( fromPartitionOffsets, untilPartitionOffsets, - _ => {}) + (_, _) => {}) assert(offsetRanges.sortBy(_.topicPartition.toString) === Seq( KafkaOffsetRange(tp1, 0, 33, None), KafkaOffsetRange(tp1, 33, 66, None), diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala index b6748d0f261e..dc3319499982 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumerSuite.scala @@ -32,7 +32,7 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.{TaskContext, TaskContextImpl} import org.apache.spark.kafka010.KafkaDelegationTokenTest -import org.apache.spark.sql.kafka010.{KafkaTestUtils, RecordBuilder} +import org.apache.spark.sql.kafka010.{KafkaIllegalStateException, KafkaTestUtils, RecordBuilder} import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.CacheKey import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.ResetSystemProperties @@ -91,11 +91,12 @@ class KafkaDataConsumerSuite consumerPool.reset() } - test("SPARK-19886: Report error cause correctly in reportDataLoss") { + test("SPARK-19886: Report error cause correctly in throwOnDataLoss") { val cause = new Exception("D'oh!") - val reportDataLoss = PrivateMethod[Unit](Symbol("reportDataLoss0")) - val e = intercept[IllegalStateException] { - KafkaDataConsumer.invokePrivate(reportDataLoss(true, "message", cause)) + val throwOnDataLoss = PrivateMethod[Unit](Symbol("throwOnDataLoss")) + val consumer = KafkaDataConsumer.acquire(topicPartition, getKafkaParams()) + val e = intercept[KafkaIllegalStateException] { + consumer.invokePrivate(throwOnDataLoss(0L, 1L, topicPartition, groupId, cause)) } assert(e.getCause === cause) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org