Repository: spark
Updated Branches:
  refs/heads/master c7572b79d -> 07ae39d0e


[SPARK-22956][SS] Bug fix for 2 streams union failover scenario

## What changes were proposed in this pull request?

This problem reported by yanlin-Lynn ivoson and LiangchangZ. Thanks!

When we union 2 streams from kafka or other sources, while one of them have no 
continues data coming and in the same time task restart, this will cause an 
`IllegalStateException`. This mainly cause because the code in 
[MicroBatchExecution](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L190)
 , while one stream has no continues data, its comittedOffset same with 
availableOffset during `populateStartOffsets`, and `currentPartitionOffsets` 
not properly handled in KafkaSource. Also, maybe we should also consider this 
scenario in other Source.

## How was this patch tested?

Add a UT in KafkaSourceSuite.scala

Author: Yuanjian Li <xyliyuanj...@gmail.com>

Closes #20150 from xuanyuanking/SPARK-22956.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07ae39d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07ae39d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07ae39d0

Branch: refs/heads/master
Commit: 07ae39d0ec1f03b1c73259373a8bb599694c7860
Parents: c7572b7
Author: Yuanjian Li <xyliyuanj...@gmail.com>
Authored: Mon Jan 15 22:01:14 2018 -0800
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Mon Jan 15 22:01:14 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/kafka010/KafkaSource.scala | 13 ++--
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 65 ++++++++++++++++++++
 .../streaming/MicroBatchExecution.scala         |  6 +-
 .../spark/sql/execution/streaming/memory.scala  |  6 ++
 4 files changed, 81 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/07ae39d0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index e9cff04..864a92b 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -223,6 +223,14 @@ private[kafka010] class KafkaSource(
 
     logInfo(s"GetBatch called with start = $start, end = $end")
     val untilPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(end)
+    // On recovery, getBatch will get called before getOffset
+    if (currentPartitionOffsets.isEmpty) {
+      currentPartitionOffsets = Some(untilPartitionOffsets)
+    }
+    if (start.isDefined && start.get == end) {
+      return sqlContext.internalCreateDataFrame(
+        sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
+    }
     val fromPartitionOffsets = start match {
       case Some(prevBatchEndOffset) =>
         KafkaSourceOffset.getPartitionOffsets(prevBatchEndOffset)
@@ -305,11 +313,6 @@ private[kafka010] class KafkaSource(
     logInfo("GetBatch generating RDD of offset range: " +
       offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
 
-    // On recovery, getBatch will get called before getOffset
-    if (currentPartitionOffsets.isEmpty) {
-      currentPartitionOffsets = Some(untilPartitionOffsets)
-    }
-
     sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/07ae39d0/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 2034b9b..a0f5695 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -318,6 +318,71 @@ class KafkaSourceSuite extends KafkaSourceTest {
     )
   }
 
+  test("SPARK-22956: currentPartitionOffsets should be set when no new data 
comes in") {
+    def getSpecificDF(range: Range.Inclusive): 
org.apache.spark.sql.Dataset[Int] = {
+      val topic = newTopic()
+      testUtils.createTopic(topic, partitions = 1)
+      testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0))
+
+      val reader = spark
+        .readStream
+        .format("kafka")
+        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+        .option("kafka.metadata.max.age.ms", "1")
+        .option("maxOffsetsPerTrigger", 5)
+        .option("subscribe", topic)
+        .option("startingOffsets", "earliest")
+
+      reader.load()
+        .selectExpr("CAST(value AS STRING)")
+        .as[String]
+        .map(k => k.toInt)
+    }
+
+    val df1 = getSpecificDF(0 to 9)
+    val df2 = getSpecificDF(100 to 199)
+
+    val kafka = df1.union(df2)
+
+    val clock = new StreamManualClock
+
+    val waitUntilBatchProcessed = AssertOnQuery { q =>
+      eventually(Timeout(streamingTimeout)) {
+        if (!q.exception.isDefined) {
+          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+        }
+      }
+      if (q.exception.isDefined) {
+        throw q.exception.get
+      }
+      true
+    }
+
+    testStream(kafka)(
+      StartStream(ProcessingTime(100), clock),
+      waitUntilBatchProcessed,
+      // 5 from smaller topic, 5 from bigger one
+      CheckLastBatch((0 to 4) ++ (100 to 104): _*),
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // 5 from smaller topic, 5 from bigger one
+      CheckLastBatch((5 to 9) ++ (105 to 109): _*),
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // smaller topic empty, 5 from bigger one
+      CheckLastBatch(110 to 114: _*),
+      StopStream,
+      StartStream(ProcessingTime(100), clock),
+      waitUntilBatchProcessed,
+      // smallest now empty, 5 from bigger one
+      CheckLastBatch(115 to 119: _*),
+      AdvanceManualClock(100),
+      waitUntilBatchProcessed,
+      // smallest now empty, 5 from bigger one
+      CheckLastBatch(120 to 124: _*)
+    )
+  }
+
   test("cannot stop Kafka stream") {
     val topic = newTopic()
     testUtils.createTopic(topic, partitions = 5)

http://git-wip-us.apache.org/repos/asf/spark/blob/07ae39d0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 42240ee..70407f0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -208,10 +208,8 @@ class MicroBatchExecution(
                * batch will be executed before getOffset is called again. */
               availableOffsets.foreach {
                 case (source: Source, end: Offset) =>
-                  if (committedOffsets.get(source).map(_ != 
end).getOrElse(true)) {
-                    val start = committedOffsets.get(source)
-                    source.getBatch(start, end)
-                  }
+                  val start = committedOffsets.get(source)
+                  source.getBatch(start, end)
                 case nonV1Tuple =>
                   // The V2 API does not have the same edge case requiring 
getBatch to be called
                   // here, so we do nothing here.

http://git-wip-us.apache.org/repos/asf/spark/blob/07ae39d0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 3041d4d..509a69d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -119,9 +119,15 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: 
SQLContext)
     val newBlocks = synchronized {
       val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
       val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
+      assert(sliceStart <= sliceEnd, s"sliceStart: $sliceStart sliceEnd: 
$sliceEnd")
       batches.slice(sliceStart, sliceEnd)
     }
 
+    if (newBlocks.isEmpty) {
+      return sqlContext.internalCreateDataFrame(
+        sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
+    }
+
     logDebug(generateDebugString(newBlocks, startOrdinal, endOrdinal))
 
     newBlocks


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to