Repository: spark Updated Branches: refs/heads/master c20916a5d -> 8bb9414aa
[SPARK-25214][SS] Fix the issue that Kafka v2 source may return duplicated records when `failOnDataLoss=false` ## What changes were proposed in this pull request? When there are missing offsets, Kafka v2 source may return duplicated records when `failOnDataLoss=false` because it doesn't skip missing offsets. This PR fixes the issue and also adds regression tests for all Kafka readers. ## How was this patch tested? New tests. Closes #22207 from zsxwing/SPARK-25214. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bb9414a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bb9414a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bb9414a Branch: refs/heads/master Commit: 8bb9414aaff4a147db2d921dccdbd04c8eb4e5db Parents: c20916a Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Fri Aug 24 12:00:34 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Fri Aug 24 12:00:34 2018 -0700 ---------------------------------------------------------------------- .../kafka010/KafkaMicroBatchReadSupport.scala | 2 +- .../spark/sql/kafka010/KafkaSourceRDD.scala | 38 --- .../kafka010/KafkaDontFailOnDataLossSuite.scala | 272 +++++++++++++++++++ .../kafka010/KafkaMicroBatchSourceSuite.scala | 139 +--------- 4 files changed, 276 insertions(+), 175 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8bb9414a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala index c31af60..70f37e3 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala @@ -341,6 +341,7 @@ private[kafka010] case class KafkaMicroBatchPartitionReader( val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss) if (record != null) { nextRow = converter.toUnsafeRow(record) + nextOffset = record.offset + 1 true } else { false @@ -352,7 +353,6 @@ private[kafka010] case class KafkaMicroBatchPartitionReader( override def get(): UnsafeRow = { assert(nextRow != null) - nextOffset += 1 nextRow } http://git-wip-us.apache.org/repos/asf/spark/blob/8bb9414a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala index 8b4494d..f8b9005 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala @@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD( offsetRanges.zipWithIndex.map { case (o, i) => new KafkaSourceRDDPartition(i, o) }.toArray } - override def count(): Long = offsetRanges.map(_.size).sum - - override def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] = { - val c = count - new PartialResult(new BoundedDouble(c, 1.0, c, c), true) - } - - override def isEmpty(): Boolean = count == 0L - - override def take(num: Int): Array[ConsumerRecord[Array[Byte], Array[Byte]]] = { - val nonEmptyPartitions = - this.partitions.map(_.asInstanceOf[KafkaSourceRDDPartition]).filter(_.offsetRange.size > 0) - - if (num < 1 || nonEmptyPartitions.isEmpty) { - return new Array[ConsumerRecord[Array[Byte], Array[Byte]]](0) - } - - // Determine in advance how many messages need to be taken from each partition - val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => - val remain = num - result.values.sum - if (remain > 0) { - val taken = Math.min(remain, part.offsetRange.size) - result + (part.index -> taken.toInt) - } else { - result - } - } - - val buf = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]] - val res = context.runJob( - this, - (tc: TaskContext, it: Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]) => - it.take(parts(tc.partitionId)).toArray, parts.keys.toArray - ) - res.foreach(buf ++= _) - buf.toArray - } - override def getPreferredLocations(split: Partition): Seq[String] = { val part = split.asInstanceOf[KafkaSourceRDDPartition] part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty) http://git-wip-us.apache.org/repos/asf/spark/blob/8bb9414a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala new file mode 100644 index 0000000..0ff341c --- /dev/null +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.util.Random + +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +/** + * This is a basic test trait which will set up a Kafka cluster that keeps only several records in + * a topic and ages out records very quickly. This is a helper trait to test + * "failonDataLoss=false" case with missing offsets. + * + * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up + * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer + * when running on a slow Jenkins machine) before records start to be removed. To make sure a test + * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's + * not 0 rather than sleeping a hard-code duration. + */ +trait KafkaMissingOffsetsTest extends SharedSQLContext { + + protected var testUtils: KafkaTestUtils = _ + + override def createSparkSession(): TestSparkSession = { + // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic + new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) + } + + override def beforeAll(): Unit = { + super.beforeAll() + testUtils = new KafkaTestUtils { + override def brokerConfiguration: Properties = { + val props = super.brokerConfiguration + // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code + // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at + // least 30 seconds. + props.put("log.cleaner.backoff.ms", "100") + // The size of RecordBatch V2 increases to support transactional write. + props.put("log.segment.bytes", "70") + props.put("log.retention.bytes", "40") + props.put("log.retention.check.interval.ms", "100") + props.put("delete.retention.ms", "10") + props.put("log.flush.scheduler.interval.ms", "10") + props + } + } + testUtils.setup() + } + + override def afterAll(): Unit = { + if (testUtils != null) { + testUtils.teardown() + testUtils = null + } + super.afterAll() + } +} + +class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + + /** + * @param testStreamingQuery whether to test a streaming query or a batch query. + * @param writeToTable the function to write the specified [[DataFrame]] to the given table. + */ + private def verifyMissingOffsetsDontCauseDuplicatedRecords( + testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 1) + testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray) + + eventually(timeout(60.seconds)) { + assert( + testUtils.getEarliestOffsets(Set(topic)).head._2 > 0, + "Kafka didn't delete records after 1 minute") + } + + val table = "DontFailOnDataLoss" + withTable(table) { + val kafkaOptions = Map( + "kafka.bootstrap.servers" -> testUtils.brokerAddress, + "kafka.metadata.max.age.ms" -> "1", + "subscribe" -> topic, + "startingOffsets" -> s"""{"$topic":{"0":0}}""", + "failOnDataLoss" -> "false", + "kafkaConsumer.pollTimeoutMs" -> "1000") + val df = + if (testStreamingQuery) { + val reader = spark.readStream.format("kafka") + kafkaOptions.foreach(kv => reader.option(kv._1, kv._2)) + reader.load() + } else { + val reader = spark.read.format("kafka") + kafkaOptions.foreach(kv => reader.option(kv._1, kv._2)) + reader.load() + } + writeToTable(df.selectExpr("CAST(value AS STRING)"), table) + val result = spark.table(table).as[String].collect().toList + assert(result.distinct.size === result.size, s"$result contains duplicated records") + // Make sure Kafka did remove some records so that this test is valid. + assert(result.size > 0 && result.size < 50) + } + } + + test("failOnDataLoss=false should not return duplicated records: v1") { + withSQLConf( + "spark.sql.streaming.disabledV2MicroBatchReaders" -> + classOf[KafkaSourceProvider].getCanonicalName) { + verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) => + val query = df.writeStream.format("memory").queryName(table).start() + try { + query.processAllAvailable() + } finally { + query.stop() + } + } + } + } + + test("failOnDataLoss=false should not return duplicated records: v2") { + verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) => + val query = df.writeStream.format("memory").queryName(table).start() + try { + query.processAllAvailable() + } finally { + query.stop() + } + } + } + + test("failOnDataLoss=false should not return duplicated records: continuous processing") { + verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) => + val query = df.writeStream + .format("memory") + .queryName(table) + .trigger(Trigger.Continuous(100)) + .start() + try { + query.processAllAvailable() + } finally { + query.stop() + } + } + } + + test("failOnDataLoss=false should not return duplicated records: batch") { + verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = false) { (df, table) => + df.write.saveAsTable(table) + } + } +} + +class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTest { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + + protected def startStream(ds: Dataset[Int]) = { + ds.writeStream.foreach(new ForeachWriter[Int] { + + override def open(partitionId: Long, version: Long): Boolean = true + + override def process(value: Int): Unit = { + // Slow down the processing speed so that messages may be aged out. + Thread.sleep(Random.nextInt(500)) + } + + override def close(errorOrNull: Throwable): Unit = {} + }).start() + } + + test("stress test for failOnDataLoss=false") { + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("kafka.default.api.timeout.ms", "3000") + .option("subscribePattern", "failOnDataLoss.*") + .option("startingOffsets", "earliest") + .option("failOnDataLoss", "false") + .option("fetchOffset.retryIntervalMs", "3000") + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val query = startStream(kafka.map(kv => kv._2.toInt)) + + val testTime = 1.minutes + val startTime = System.currentTimeMillis() + // Track the current existing topics + val topics = mutable.ArrayBuffer[String]() + // Track topics that have been deleted + val deletedTopics = mutable.Set[String]() + while (System.currentTimeMillis() - testTime.toMillis < startTime) { + Random.nextInt(10) match { + case 0 => // Create a new topic + val topic = newTopic() + topics += topic + // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small + // chance that a topic will be recreated after deletion due to the asynchronous update. + // Hence, always overwrite to handle this race condition. + testUtils.createTopic(topic, partitions = 1, overwrite = true) + logInfo(s"Create topic $topic") + case 1 if topics.nonEmpty => // Delete an existing topic + val topic = topics.remove(Random.nextInt(topics.size)) + testUtils.deleteTopic(topic) + logInfo(s"Delete topic $topic") + deletedTopics += topic + case 2 if deletedTopics.nonEmpty => // Recreate a topic that was deleted. + val topic = deletedTopics.toSeq(Random.nextInt(deletedTopics.size)) + deletedTopics -= topic + topics += topic + // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small + // chance that a topic will be recreated after deletion due to the asynchronous update. + // Hence, always overwrite to handle this race condition. + testUtils.createTopic(topic, partitions = 1, overwrite = true) + logInfo(s"Create topic $topic") + case 3 => + Thread.sleep(1000) + case _ => // Push random messages + for (topic <- topics) { + val size = Random.nextInt(10) + for (_ <- 0 until size) { + testUtils.sendMessages(topic, Array(Random.nextInt(10).toString)) + } + } + } + // `failOnDataLoss` is `false`, we should not fail the query + if (query.exception.nonEmpty) { + throw query.exception.get + } + } + + query.stop() + // `failOnDataLoss` is `false`, we should not fail the query + if (query.exception.nonEmpty) { + throw query.exception.get + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/8bb9414a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index c9c5250..1d14550 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -20,12 +20,11 @@ package org.apache.spark.sql.kafka010 import java.io._ import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.{Files, Paths} -import java.util.{Locale, Properties} +import java.util.Locale import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.io.Source import scala.util.Random @@ -36,8 +35,7 @@ import org.json4s.jackson.JsonMethods._ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ -import org.apache.spark.SparkContext -import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession} +import org.apache.spark.sql.{ForeachWriter, SparkSession} import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution @@ -46,7 +44,7 @@ import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.streaming.util.StreamManualClock -import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} +import org.apache.spark.sql.test.SharedSQLContext abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with KafkaTest { @@ -1187,134 +1185,3 @@ class KafkaSourceStressSuite extends KafkaSourceTest { iterations = 50) } } - -class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with SharedSQLContext { - - import testImplicits._ - - private var testUtils: KafkaTestUtils = _ - - private val topicId = new AtomicInteger(0) - - private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" - - override def createSparkSession(): TestSparkSession = { - // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic - new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) - } - - override def beforeAll(): Unit = { - super.beforeAll() - testUtils = new KafkaTestUtils { - override def brokerConfiguration: Properties = { - val props = super.brokerConfiguration - // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code - // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at - // least 30 seconds. - props.put("log.cleaner.backoff.ms", "100") - // The size of RecordBatch V2 increases to support transactional write. - props.put("log.segment.bytes", "70") - props.put("log.retention.bytes", "40") - props.put("log.retention.check.interval.ms", "100") - props.put("delete.retention.ms", "10") - props.put("log.flush.scheduler.interval.ms", "10") - props - } - } - testUtils.setup() - } - - override def afterAll(): Unit = { - if (testUtils != null) { - testUtils.teardown() - testUtils = null - super.afterAll() - } - } - - protected def startStream(ds: Dataset[Int]) = { - ds.writeStream.foreach(new ForeachWriter[Int] { - - override def open(partitionId: Long, version: Long): Boolean = { - true - } - - override def process(value: Int): Unit = { - // Slow down the processing speed so that messages may be aged out. - Thread.sleep(Random.nextInt(500)) - } - - override def close(errorOrNull: Throwable): Unit = { - } - }).start() - } - - test("stress test for failOnDataLoss=false") { - val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("kafka.default.api.timeout.ms", "3000") - .option("subscribePattern", "failOnDataLoss.*") - .option("startingOffsets", "earliest") - .option("failOnDataLoss", "false") - .option("fetchOffset.retryIntervalMs", "3000") - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val query = startStream(kafka.map(kv => kv._2.toInt)) - - val testTime = 1.minutes - val startTime = System.currentTimeMillis() - // Track the current existing topics - val topics = mutable.ArrayBuffer[String]() - // Track topics that have been deleted - val deletedTopics = mutable.Set[String]() - while (System.currentTimeMillis() - testTime.toMillis < startTime) { - Random.nextInt(10) match { - case 0 => // Create a new topic - val topic = newTopic() - topics += topic - // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small - // chance that a topic will be recreated after deletion due to the asynchronous update. - // Hence, always overwrite to handle this race condition. - testUtils.createTopic(topic, partitions = 1, overwrite = true) - logInfo(s"Create topic $topic") - case 1 if topics.nonEmpty => // Delete an existing topic - val topic = topics.remove(Random.nextInt(topics.size)) - testUtils.deleteTopic(topic) - logInfo(s"Delete topic $topic") - deletedTopics += topic - case 2 if deletedTopics.nonEmpty => // Recreate a topic that was deleted. - val topic = deletedTopics.toSeq(Random.nextInt(deletedTopics.size)) - deletedTopics -= topic - topics += topic - // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small - // chance that a topic will be recreated after deletion due to the asynchronous update. - // Hence, always overwrite to handle this race condition. - testUtils.createTopic(topic, partitions = 1, overwrite = true) - logInfo(s"Create topic $topic") - case 3 => - Thread.sleep(1000) - case _ => // Push random messages - for (topic <- topics) { - val size = Random.nextInt(10) - for (_ <- 0 until size) { - testUtils.sendMessages(topic, Array(Random.nextInt(10).toString)) - } - } - } - // `failOnDataLoss` is `false`, we should not fail the query - if (query.exception.nonEmpty) { - throw query.exception.get - } - } - - query.stop() - // `failOnDataLoss` is `false`, we should not fail the query - if (query.exception.nonEmpty) { - throw query.exception.get - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org