This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new faa4c28 [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4) faa4c28 is described below commit faa4c2823b69c1643d7678ee1cb0b7295c611334 Author: Shixiong Zhu <zsxw...@gmail.com> AuthorDate: Mon Jan 7 16:53:07 2019 -0800 [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka (2.4) ## What changes were proposed in this pull request? Backport #23324 to branch-2.4. ## How was this patch tested? Jenkins Closes #23365 from zsxwing/SPARK-26267-2.4. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../spark/sql/kafka010/KafkaContinuousReader.scala | 4 +- .../spark/sql/kafka010/KafkaMicroBatchReader.scala | 20 ++++-- .../sql/kafka010/KafkaOffsetRangeCalculator.scala | 2 + .../spark/sql/kafka010/KafkaOffsetReader.scala | 80 ++++++++++++++++++++-- .../apache/spark/sql/kafka010/KafkaSource.scala | 5 +- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 48 +++++++++++++ 6 files changed, 146 insertions(+), 13 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala index 8ce56a2..561d501 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala @@ -73,7 +73,7 @@ class KafkaContinuousReader( offset = start.orElse { val offsets = initialOffsets match { case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) - case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets()) + case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets(None)) case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss) } logInfo(s"Initial offsets: $offsets") @@ -128,7 +128,7 @@ class KafkaContinuousReader( } override def needsReconfiguration(): Boolean = { - knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != knownPartitions + knownPartitions != null && offsetReader.fetchLatestOffsets(None).keySet != knownPartitions } override def toString(): String = s"KafkaSource[$offsetReader]" diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala index 8cc989f..b6c8035 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala @@ -93,7 +93,8 @@ private[kafka010] class KafkaMicroBatchReader( endPartitionOffsets = Option(end.orElse(null)) .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets) .getOrElse { - val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets() + val latestPartitionOffsets = + kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) maxOffsetsPerTrigger.map { maxOffsets => rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets) }.getOrElse { @@ -132,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReader( }.toSeq logDebug("TopicPartitions: " + topicPartitions.mkString(", ")) + val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets + val untilOffsets = endPartitionOffsets + untilOffsets.foreach { case (tp, untilOffset) => + fromOffsets.get(tp).foreach { fromOffset => + if (untilOffset < fromOffset) { + reportDataLoss(s"Partition $tp's offset was changed from " + + s"$fromOffset to $untilOffset, some data may have been missed") + } + } + } + // Calculate offset ranges val offsetRanges = rangeCalculator.getRanges( - fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets, - untilOffsets = endPartitionOffsets, + fromOffsets = fromOffsets, + untilOffsets = untilOffsets, executorLocations = getSortedExecutorList()) // Reuse Kafka consumers only when all the offset ranges have distinct TopicPartitions, @@ -192,7 +204,7 @@ private[kafka010] class KafkaMicroBatchReader( case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets()) case LatestOffsetRangeLimit => - KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets()) + KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None)) case SpecificOffsetRangeLimit(p) => kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss) } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala index fb209c7..6008794 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala @@ -37,6 +37,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int * the read tasks of the skewed partitions to multiple Spark tasks. * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more * depending on rounding errors or Kafka partitions that didn't receive any new data. + * + * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped. */ def getRanges( fromOffsets: PartitionOffsetMap, diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 8206669..fc443d2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -21,6 +21,7 @@ import java.{util => ju} import java.util.concurrent.{Executors, ThreadFactory} import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import scala.util.control.NonFatal @@ -137,6 +138,12 @@ private[kafka010] class KafkaOffsetReader( // Poll to get the latest assigned partitions consumer.poll(0) val partitions = consumer.assignment() + + // Call `position` to wait until the potential offset request triggered by `poll(0)` is + // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by + // `poll(0)` may reset offsets that should have been set by another request. + partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) + consumer.pause(partitions) assert(partitions.asScala == partitionOffsets.keySet, "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" + @@ -192,19 +199,82 @@ private[kafka010] class KafkaOffsetReader( /** * Fetch the latest offsets for the topic partitions that are indicated * in the [[ConsumerStrategy]]. + * + * Kafka may return earliest offsets when we are requesting latest offsets if `poll` is called + * right before `seekToEnd` (KAFKA-7703). As a workaround, we will call `position` right after + * `poll` to wait until the potential offset request triggered by `poll(0)` is done. + * + * In addition, to avoid other unknown issues, we also use the given `knownOffsets` to audit the + * latest offsets returned by Kafka. If we find some incorrect offsets (a latest offset is less + * than an offset in `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. When + * a topic is recreated, the latest offsets may be less than offsets in `knownOffsets`. We cannot + * distinguish this with KAFKA-7703, so we just return whatever we get from Kafka after retrying. */ - def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly { + def fetchLatestOffsets( + knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = runUninterruptibly { withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions consumer.poll(0) val partitions = consumer.assignment() + + // Call `position` to wait until the potential offset request triggered by `poll(0)` is + // done. This is a workaround for KAFKA-7703, which an async `seekToBeginning` triggered by + // `poll(0)` may reset offsets that should have been set by another request. + partitions.asScala.map(p => p -> consumer.position(p)).foreach(_ => {}) + consumer.pause(partitions) logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the end.") - consumer.seekToEnd(partitions) - val partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap - logDebug(s"Got latest offsets for partition : $partitionOffsets") - partitionOffsets + if (knownOffsets.isEmpty) { + consumer.seekToEnd(partitions) + partitions.asScala.map(p => p -> consumer.position(p)).toMap + } else { + var partitionOffsets: PartitionOffsetMap = Map.empty + + /** + * Compare `knownOffsets` and `partitionOffsets`. Returns all partitions that have incorrect + * latest offset (offset in `knownOffsets` is great than the one in `partitionOffsets`). + */ + def findIncorrectOffsets(): Seq[(TopicPartition, Long, Long)] = { + var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]() + partitionOffsets.foreach { case (tp, offset) => + knownOffsets.foreach(_.get(tp).foreach { knownOffset => + if (knownOffset > offset) { + val incorrectOffset = (tp, knownOffset, offset) + incorrectOffsets += incorrectOffset + } + }) + } + incorrectOffsets + } + + // Retry to fetch latest offsets when detecting incorrect offsets. We don't use + // `withRetriesWithoutInterrupt` to retry because: + // + // - `withRetriesWithoutInterrupt` will reset the consumer for each attempt but a fresh + // consumer has a much bigger chance to hit KAFKA-7703. + // - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703. + var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil + var attempt = 0 + do { + consumer.seekToEnd(partitions) + partitionOffsets = partitions.asScala.map(p => p -> consumer.position(p)).toMap + attempt += 1 + + incorrectOffsets = findIncorrectOffsets() + if (incorrectOffsets.nonEmpty) { + logWarning("Found incorrect offsets in some partitions " + + s"(partition, previous offset, fetched offset): $incorrectOffsets") + if (attempt < maxOffsetFetchAttempts) { + logWarning("Retrying to fetch latest offsets because of incorrect offsets") + Thread.sleep(offsetFetchAttemptIntervalMs) + } + } + } while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts) + + logDebug(s"Got latest offsets for partition : $partitionOffsets") + partitionOffsets + } } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 66ec7e0..d65b3ce 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -130,7 +130,7 @@ private[kafka010] class KafkaSource( metadataLog.get(0).getOrElse { val offsets = startingOffsets match { case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) - case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) + case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets(None)) case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss) } metadataLog.add(0, offsets) @@ -148,7 +148,8 @@ private[kafka010] class KafkaSource( // Make sure initialPartitionOffsets is initialized initialPartitionOffsets - val latest = kafkaReader.fetchLatestOffsets() + val latest = kafkaReader.fetchLatestOffsets( + currentPartitionOffsets.orElse(Some(initialPartitionOffsets))) val offsets = maxOffsetsPerTrigger match { case None => latest diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index d89e45e..5f05833 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -327,6 +327,54 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) } + test("subscribe topic by pattern with topic recreation between batches") { + val topicPrefix = newTopic() + val topic = topicPrefix + "-good" + val topic2 = topicPrefix + "-bad" + testUtils.createTopic(topic, partitions = 1) + testUtils.sendMessages(topic, Array("1", "3")) + testUtils.createTopic(topic2, partitions = 1) + testUtils.sendMessages(topic2, Array("2", "4")) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.default.api.timeout.ms", "3000") + .option("startingOffsets", "earliest") + .option("subscribePattern", s"$topicPrefix-.*") + + val ds = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + .map(kv => kv._2.toInt) + + testStream(ds)( + StartStream(), + AssertOnQuery { q => + q.processAllAvailable() + true + }, + CheckAnswer(1, 2, 3, 4), + // Restart the stream in this test to make the test stable. When recreating a topic when a + // consumer is alive, it may not be able to see the recreated topic even if a fresh consumer + // has seen it. + StopStream, + // Recreate `topic2` and wait until it's available + WithOffsetSync(new TopicPartition(topic2, 0), expectedOffset = 1) { () => + testUtils.deleteTopic(topic2) + testUtils.createTopic(topic2) + testUtils.sendMessages(topic2, Array("6")) + }, + StartStream(), + ExpectFailure[IllegalStateException](e => { + // The offset of `topic2` should be changed from 2 to 1 + assert(e.getMessage.contains("was changed from 2 to 1")) + }) + ) + } + test("ensure that initial offset are written with an extra byte in the beginning (SPARK-19517)") { withTempDir { metadataPath => val topic = "kafka-initial-offset-current" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org