Repository: spark
Updated Branches:
  refs/heads/master c20916a5d -> 8bb9414aa


[SPARK-25214][SS] Fix the issue that Kafka v2 source may return duplicated 
records when `failOnDataLoss=false`

## What changes were proposed in this pull request?

When there are missing offsets, Kafka v2 source may return duplicated records 
when `failOnDataLoss=false` because it doesn't skip missing offsets.

This PR fixes the issue and also adds regression tests for all Kafka readers.

## How was this patch tested?

New tests.

Closes #22207 from zsxwing/SPARK-25214.

Authored-by: Shixiong Zhu <zsxw...@gmail.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bb9414a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bb9414a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bb9414a

Branch: refs/heads/master
Commit: 8bb9414aaff4a147db2d921dccdbd04c8eb4e5db
Parents: c20916a
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Fri Aug 24 12:00:34 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri Aug 24 12:00:34 2018 -0700

----------------------------------------------------------------------
 .../kafka010/KafkaMicroBatchReadSupport.scala   |   2 +-
 .../spark/sql/kafka010/KafkaSourceRDD.scala     |  38 ---
 .../kafka010/KafkaDontFailOnDataLossSuite.scala | 272 +++++++++++++++++++
 .../kafka010/KafkaMicroBatchSourceSuite.scala   | 139 +---------
 4 files changed, 276 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8bb9414a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
index c31af60..70f37e3 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
@@ -341,6 +341,7 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
       val record = consumer.get(nextOffset, rangeToRead.untilOffset, 
pollTimeoutMs, failOnDataLoss)
       if (record != null) {
         nextRow = converter.toUnsafeRow(record)
+        nextOffset = record.offset + 1
         true
       } else {
         false
@@ -352,7 +353,6 @@ private[kafka010] case class KafkaMicroBatchPartitionReader(
 
   override def get(): UnsafeRow = {
     assert(nextRow != null)
-    nextOffset += 1
     nextRow
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8bb9414a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
index 8b4494d..f8b9005 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
@@ -77,44 +77,6 @@ private[kafka010] class KafkaSourceRDD(
     offsetRanges.zipWithIndex.map { case (o, i) => new 
KafkaSourceRDDPartition(i, o) }.toArray
   }
 
-  override def count(): Long = offsetRanges.map(_.size).sum
-
-  override def countApprox(timeout: Long, confidence: Double): 
PartialResult[BoundedDouble] = {
-    val c = count
-    new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
-  }
-
-  override def isEmpty(): Boolean = count == 0L
-
-  override def take(num: Int): Array[ConsumerRecord[Array[Byte], Array[Byte]]] 
= {
-    val nonEmptyPartitions =
-      
this.partitions.map(_.asInstanceOf[KafkaSourceRDDPartition]).filter(_.offsetRange.size
 > 0)
-
-    if (num < 1 || nonEmptyPartitions.isEmpty) {
-      return new Array[ConsumerRecord[Array[Byte], Array[Byte]]](0)
-    }
-
-    // Determine in advance how many messages need to be taken from each 
partition
-    val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) 
=>
-      val remain = num - result.values.sum
-      if (remain > 0) {
-        val taken = Math.min(remain, part.offsetRange.size)
-        result + (part.index -> taken.toInt)
-      } else {
-        result
-      }
-    }
-
-    val buf = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
-    val res = context.runJob(
-      this,
-      (tc: TaskContext, it: Iterator[ConsumerRecord[Array[Byte], 
Array[Byte]]]) =>
-      it.take(parts(tc.partitionId)).toArray, parts.keys.toArray
-    )
-    res.foreach(buf ++= _)
-    buf.toArray
-  }
-
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val part = split.asInstanceOf[KafkaSourceRDDPartition]
     part.offsetRange.preferredLoc.map(Seq(_)).getOrElse(Seq.empty)

http://git-wip-us.apache.org/repos/asf/spark/blob/8bb9414a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
new file mode 100644
index 0000000..0ff341c
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+/**
+ * This is a basic test trait which will set up a Kafka cluster that keeps 
only several records in
+ * a topic and ages out records very quickly. This is a helper trait to test
+ * "failonDataLoss=false" case with missing offsets.
+ *
+ * Note: there is a hard-code 30 seconds delay 
(kafka.log.LogManager.InitialTaskDelayMs) to clean up
+ * records. Hence each class extending this trait needs to wait at least 30 
seconds (or even longer
+ * when running on a slow Jenkins machine) before records start to be removed. 
To make sure a test
+ * does see missing offsets, you can check the earliest offset in `eventually` 
and make sure it's
+ * not 0 rather than sleeping a hard-code duration.
+ */
+trait KafkaMissingOffsetsTest extends SharedSQLContext {
+
+  protected var testUtils: KafkaTestUtils = _
+
+  override def createSparkSession(): TestSparkSession = {
+    // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
+    new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", 
sparkConf))
+  }
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils {
+      override def brokerConfiguration: Properties = {
+        val props = super.brokerConfiguration
+        // Try to make Kafka clean up messages as fast as possible. However, 
there is a hard-code
+        // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this 
test should run at
+        // least 30 seconds.
+        props.put("log.cleaner.backoff.ms", "100")
+        // The size of RecordBatch V2 increases to support transactional write.
+        props.put("log.segment.bytes", "70")
+        props.put("log.retention.bytes", "40")
+        props.put("log.retention.check.interval.ms", "100")
+        props.put("delete.retention.ms", "10")
+        props.put("log.flush.scheduler.interval.ms", "10")
+        props
+      }
+    }
+    testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+    if (testUtils != null) {
+      testUtils.teardown()
+      testUtils = null
+    }
+    super.afterAll()
+  }
+}
+
+class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+  /**
+   * @param testStreamingQuery whether to test a streaming query or a batch 
query.
+   * @param writeToTable the function to write the specified [[DataFrame]] to 
the given table.
+   */
+  private def verifyMissingOffsetsDontCauseDuplicatedRecords(
+      testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): 
Unit = {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 1)
+    testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray)
+
+    eventually(timeout(60.seconds)) {
+      assert(
+        testUtils.getEarliestOffsets(Set(topic)).head._2 > 0,
+        "Kafka didn't delete records after 1 minute")
+    }
+
+    val table = "DontFailOnDataLoss"
+    withTable(table) {
+      val kafkaOptions = Map(
+        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
+        "kafka.metadata.max.age.ms" -> "1",
+        "subscribe" -> topic,
+        "startingOffsets" -> s"""{"$topic":{"0":0}}""",
+        "failOnDataLoss" -> "false",
+        "kafkaConsumer.pollTimeoutMs" -> "1000")
+      val df =
+        if (testStreamingQuery) {
+          val reader = spark.readStream.format("kafka")
+          kafkaOptions.foreach(kv => reader.option(kv._1, kv._2))
+          reader.load()
+        } else {
+          val reader = spark.read.format("kafka")
+          kafkaOptions.foreach(kv => reader.option(kv._1, kv._2))
+          reader.load()
+        }
+      writeToTable(df.selectExpr("CAST(value AS STRING)"), table)
+      val result = spark.table(table).as[String].collect().toList
+      assert(result.distinct.size === result.size, s"$result contains 
duplicated records")
+      // Make sure Kafka did remove some records so that this test is valid.
+      assert(result.size > 0 && result.size < 50)
+    }
+  }
+
+  test("failOnDataLoss=false should not return duplicated records: v1") {
+    withSQLConf(
+      "spark.sql.streaming.disabledV2MicroBatchReaders" ->
+        classOf[KafkaSourceProvider].getCanonicalName) {
+      verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = 
true) { (df, table) =>
+        val query = df.writeStream.format("memory").queryName(table).start()
+        try {
+          query.processAllAvailable()
+        } finally {
+          query.stop()
+        }
+      }
+    }
+  }
+
+  test("failOnDataLoss=false should not return duplicated records: v2") {
+    verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) 
{ (df, table) =>
+      val query = df.writeStream.format("memory").queryName(table).start()
+      try {
+        query.processAllAvailable()
+      } finally {
+        query.stop()
+      }
+    }
+  }
+
+  test("failOnDataLoss=false should not return duplicated records: continuous 
processing") {
+    verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) 
{ (df, table) =>
+      val query = df.writeStream
+        .format("memory")
+        .queryName(table)
+        .trigger(Trigger.Continuous(100))
+        .start()
+      try {
+        query.processAllAvailable()
+      } finally {
+        query.stop()
+      }
+    }
+  }
+
+  test("failOnDataLoss=false should not return duplicated records: batch") {
+    verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = false) 
{ (df, table) =>
+      df.write.saveAsTable(table)
+    }
+  }
+}
+
+class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with 
KafkaMissingOffsetsTest {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+  protected def startStream(ds: Dataset[Int]) = {
+    ds.writeStream.foreach(new ForeachWriter[Int] {
+
+      override def open(partitionId: Long, version: Long): Boolean = true
+
+      override def process(value: Int): Unit = {
+        // Slow down the processing speed so that messages may be aged out.
+        Thread.sleep(Random.nextInt(500))
+      }
+
+      override def close(errorOrNull: Throwable): Unit = {}
+    }).start()
+  }
+
+  test("stress test for failOnDataLoss=false") {
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("kafka.default.api.timeout.ms", "3000")
+      .option("subscribePattern", "failOnDataLoss.*")
+      .option("startingOffsets", "earliest")
+      .option("failOnDataLoss", "false")
+      .option("fetchOffset.retryIntervalMs", "3000")
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val query = startStream(kafka.map(kv => kv._2.toInt))
+
+    val testTime = 1.minutes
+    val startTime = System.currentTimeMillis()
+    // Track the current existing topics
+    val topics = mutable.ArrayBuffer[String]()
+    // Track topics that have been deleted
+    val deletedTopics = mutable.Set[String]()
+    while (System.currentTimeMillis() - testTime.toMillis < startTime) {
+      Random.nextInt(10) match {
+        case 0 => // Create a new topic
+          val topic = newTopic()
+          topics += topic
+          // As pushing messages into Kafka updates Zookeeper asynchronously, 
there is a small
+          // chance that a topic will be recreated after deletion due to the 
asynchronous update.
+          // Hence, always overwrite to handle this race condition.
+          testUtils.createTopic(topic, partitions = 1, overwrite = true)
+          logInfo(s"Create topic $topic")
+        case 1 if topics.nonEmpty => // Delete an existing topic
+          val topic = topics.remove(Random.nextInt(topics.size))
+          testUtils.deleteTopic(topic)
+          logInfo(s"Delete topic $topic")
+          deletedTopics += topic
+        case 2 if deletedTopics.nonEmpty => // Recreate a topic that was 
deleted.
+          val topic = deletedTopics.toSeq(Random.nextInt(deletedTopics.size))
+          deletedTopics -= topic
+          topics += topic
+          // As pushing messages into Kafka updates Zookeeper asynchronously, 
there is a small
+          // chance that a topic will be recreated after deletion due to the 
asynchronous update.
+          // Hence, always overwrite to handle this race condition.
+          testUtils.createTopic(topic, partitions = 1, overwrite = true)
+          logInfo(s"Create topic $topic")
+        case 3 =>
+          Thread.sleep(1000)
+        case _ => // Push random messages
+          for (topic <- topics) {
+            val size = Random.nextInt(10)
+            for (_ <- 0 until size) {
+              testUtils.sendMessages(topic, Array(Random.nextInt(10).toString))
+            }
+          }
+      }
+      // `failOnDataLoss` is `false`, we should not fail the query
+      if (query.exception.nonEmpty) {
+        throw query.exception.get
+      }
+    }
+
+    query.stop()
+    // `failOnDataLoss` is `false`, we should not fail the query
+    if (query.exception.nonEmpty) {
+      throw query.exception.get
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8bb9414a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index c9c5250..1d14550 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -20,12 +20,11 @@ package org.apache.spark.sql.kafka010
 import java.io._
 import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.{Files, Paths}
-import java.util.{Locale, Properties}
+import java.util.Locale
 import java.util.concurrent.ConcurrentLinkedQueue
 import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 import scala.io.Source
 import scala.util.Random
 
@@ -36,8 +35,7 @@ import org.json4s.jackson.JsonMethods._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession}
+import org.apache.spark.sql.{ForeachWriter, SparkSession}
 import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
@@ -46,7 +44,7 @@ import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
 import org.apache.spark.sql.streaming.util.StreamManualClock
-import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+import org.apache.spark.sql.test.SharedSQLContext
 
 abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with 
KafkaTest {
 
@@ -1187,134 +1185,3 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
       iterations = 50)
   }
 }
-
-class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with 
SharedSQLContext {
-
-  import testImplicits._
-
-  private var testUtils: KafkaTestUtils = _
-
-  private val topicId = new AtomicInteger(0)
-
-  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
-
-  override def createSparkSession(): TestSparkSession = {
-    // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
-    new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", 
sparkConf))
-  }
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    testUtils = new KafkaTestUtils {
-      override def brokerConfiguration: Properties = {
-        val props = super.brokerConfiguration
-        // Try to make Kafka clean up messages as fast as possible. However, 
there is a hard-code
-        // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this 
test should run at
-        // least 30 seconds.
-        props.put("log.cleaner.backoff.ms", "100")
-        // The size of RecordBatch V2 increases to support transactional write.
-        props.put("log.segment.bytes", "70")
-        props.put("log.retention.bytes", "40")
-        props.put("log.retention.check.interval.ms", "100")
-        props.put("delete.retention.ms", "10")
-        props.put("log.flush.scheduler.interval.ms", "10")
-        props
-      }
-    }
-    testUtils.setup()
-  }
-
-  override def afterAll(): Unit = {
-    if (testUtils != null) {
-      testUtils.teardown()
-      testUtils = null
-      super.afterAll()
-    }
-  }
-
-  protected def startStream(ds: Dataset[Int]) = {
-    ds.writeStream.foreach(new ForeachWriter[Int] {
-
-      override def open(partitionId: Long, version: Long): Boolean = {
-        true
-      }
-
-      override def process(value: Int): Unit = {
-        // Slow down the processing speed so that messages may be aged out.
-        Thread.sleep(Random.nextInt(500))
-      }
-
-      override def close(errorOrNull: Throwable): Unit = {
-      }
-    }).start()
-  }
-
-  test("stress test for failOnDataLoss=false") {
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("kafka.default.api.timeout.ms", "3000")
-      .option("subscribePattern", "failOnDataLoss.*")
-      .option("startingOffsets", "earliest")
-      .option("failOnDataLoss", "false")
-      .option("fetchOffset.retryIntervalMs", "3000")
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val query = startStream(kafka.map(kv => kv._2.toInt))
-
-    val testTime = 1.minutes
-    val startTime = System.currentTimeMillis()
-    // Track the current existing topics
-    val topics = mutable.ArrayBuffer[String]()
-    // Track topics that have been deleted
-    val deletedTopics = mutable.Set[String]()
-    while (System.currentTimeMillis() - testTime.toMillis < startTime) {
-      Random.nextInt(10) match {
-        case 0 => // Create a new topic
-          val topic = newTopic()
-          topics += topic
-          // As pushing messages into Kafka updates Zookeeper asynchronously, 
there is a small
-          // chance that a topic will be recreated after deletion due to the 
asynchronous update.
-          // Hence, always overwrite to handle this race condition.
-          testUtils.createTopic(topic, partitions = 1, overwrite = true)
-          logInfo(s"Create topic $topic")
-        case 1 if topics.nonEmpty => // Delete an existing topic
-          val topic = topics.remove(Random.nextInt(topics.size))
-          testUtils.deleteTopic(topic)
-          logInfo(s"Delete topic $topic")
-          deletedTopics += topic
-        case 2 if deletedTopics.nonEmpty => // Recreate a topic that was 
deleted.
-          val topic = deletedTopics.toSeq(Random.nextInt(deletedTopics.size))
-          deletedTopics -= topic
-          topics += topic
-          // As pushing messages into Kafka updates Zookeeper asynchronously, 
there is a small
-          // chance that a topic will be recreated after deletion due to the 
asynchronous update.
-          // Hence, always overwrite to handle this race condition.
-          testUtils.createTopic(topic, partitions = 1, overwrite = true)
-          logInfo(s"Create topic $topic")
-        case 3 =>
-          Thread.sleep(1000)
-        case _ => // Push random messages
-          for (topic <- topics) {
-            val size = Random.nextInt(10)
-            for (_ <- 0 until size) {
-              testUtils.sendMessages(topic, Array(Random.nextInt(10).toString))
-            }
-          }
-      }
-      // `failOnDataLoss` is `false`, we should not fail the query
-      if (query.exception.nonEmpty) {
-        throw query.exception.get
-      }
-    }
-
-    query.stop()
-    // `failOnDataLoss` is `false`, we should not fail the query
-    if (query.exception.nonEmpty) {
-      throw query.exception.get
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to