[jira] [Commented] (SPARK-26267) Kafka source may reprocess data
[ https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16726973#comment-16726973 ] ASF GitHub Bot commented on SPARK-26267: zsxwing opened a new pull request #23365: [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4) URL: https://github.com/apache/spark/pull/23365 ## What changes were proposed in this pull request? Backport #23324 to branch-2.4. ## How was this patch tested? Jenkins This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka source may reprocess data > --- > > Key: SPARK-26267 > URL: https://issues.apache.org/jira/browse/SPARK-26267 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Blocker > Labels: correctness > > Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it > may get an earliest offset, and then it will reprocess messages that have > been processed when it gets the correct latest offset in the next batch. > This usually happens when restarting a streaming query. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26267) Kafka source may reprocess data
[ https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16726970#comment-16726970 ] ASF GitHub Bot commented on SPARK-26267: zsxwing closed pull request #23324: [SPARK-26267][SS]Retry when detecting incorrect offsets from Kafka URL: https://github.com/apache/spark/pull/23324 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala index 1753a28fba2fb..02dfb9ca2b95a 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala @@ -60,7 +60,7 @@ class KafkaContinuousReadSupport( override def initialOffset(): Offset = { val offsets = initialOffsets match { case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) - case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets()) + case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None)) case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss) } logInfo(s"Initial offsets: $offsets") @@ -107,7 +107,7 @@ class KafkaContinuousReadSupport( override def needsReconfiguration(config: ScanConfig): Boolean = { val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions -offsetReader.fetchLatestOffsets().keySet != knownPartitions +offsetReader.fetchLatestOffsets(None).keySet != knownPartitions } override def toString(): String = s"KafkaSource[$offsetReader]" diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala index bb4de674c3c72..b4f042e93a5da 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala @@ -84,7 +84,7 @@ private[kafka010] class KafkaMicroBatchReadSupport( override def latestOffset(start: Offset): Offset = { val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets -val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets() +val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets => rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets) }.getOrElse { @@ -133,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReadSupport( }.toSeq logDebug("TopicPartitions: " + topicPartitions.mkString(", ")) +val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets +val untilOffsets = endPartitionOffsets +untilOffsets.foreach { case (tp, untilOffset) => + fromOffsets.get(tp).foreach { fromOffset => +if (untilOffset < fromOffset) { + reportDataLoss(s"Partition $tp's offset was changed from " + +s"$fromOffset to $untilOffset, some data may have been missed") +} + } +} + // Calculate offset ranges val offsetRanges = rangeCalculator.getRanges( - fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets, - untilOffsets = endPartitionOffsets, + fromOffsets = fromOffsets, + untilOffsets = untilOffsets, executorLocations = getSortedExecutorList()) // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions, @@ -186,7 +197,7 @@ private[kafka010] class KafkaMicroBatchReadSupport( case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets()) case LatestOffsetRangeLimit => - KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets()) + KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None)) case SpecificOffsetRangeLimit(p) => kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss) } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala index fb209c724afba..6008794924052 100644 ---
[jira] [Commented] (SPARK-26267) Kafka source may reprocess data
[ https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721869#comment-16721869 ] ASF GitHub Bot commented on SPARK-26267: zsxwing opened a new pull request #23324: [SPARK-26267][SS]Retry when detecting incorrect offsets from Kafka URL: https://github.com/apache/spark/pull/23324 ## What changes were proposed in this pull request? Due to [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703), Kafka may return an earliest offset when we are request a latest offset. This will cause Spark to reprocess data. To reduce the impact of KAFKA-7703, this PR will use the previous offsets we get to audit the result from Kafka. If we find any incorrect offset, we will retry at most `maxOffsetFetchAttempts` times. For the first batch of a new query, as we don't have any previous offsets, we simply fetch offsets twice. This should reduce the chance to hit KAFKA-7703 a lot. ## How was this patch tested? Jenkins This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka source may reprocess data > --- > > Key: SPARK-26267 > URL: https://issues.apache.org/jira/browse/SPARK-26267 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Assignee: Shixiong Zhu >Priority: Blocker > Labels: correctness > > Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it > may get an earliest offset, and then it will reprocess messages that have > been processed when it gets the correct latest offset in the next batch. > This usually happens when restarting a streaming query. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26267) Kafka source may reprocess data
[ https://issues.apache.org/jira/browse/SPARK-26267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16709169#comment-16709169 ] Shixiong Zhu commented on SPARK-26267: -- KAFKA-7703 only exists in Kafka 1.1.0 and above, so a possible workaround is using an old version that doesn't have this issue. This doesn't impact Spark 2.3.x and below as we use Kafka 0.10.0.1 by default. > Kafka source may reprocess data > --- > > Key: SPARK-26267 > URL: https://issues.apache.org/jira/browse/SPARK-26267 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Priority: Major > > Due to KAFKA-7703, when the Kafka source tries to get the latest offset, it > may get an earliest offset, and then it will reprocess messages that have > been processed when it gets the correct latest offset in the next batch. > This usually happens when restarting a streaming query. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org