Repository: spark
Updated Branches:
  refs/heads/master e8c1a0c2f -> 2c2a86b5d


[SPARK-24453][SS] Fix error recovering from the failure in a no-data batch

## What changes were proposed in this pull request?

The error occurs when we are recovering from a failure in a no-data batch (say 
X) that has been planned (i.e. written to offset log) but not executed (i.e. 
not written to commit log). Upon recovery the following sequence of events 
happen.

1. `MicroBatchExecution.populateStartOffsets` sets `currentBatchId` to X. Since 
there was no data in the batch, the `availableOffsets` is same as 
`committedOffsets`, so `isNewDataAvailable` is `false`.
2. When `MicroBatchExecution.constructNextBatch` is called, ideally it should 
immediately return true because the next batch has already been constructed. 
However, the check of whether the batch has been constructed was `if 
(isNewDataAvailable) return true`. Since the planned batch is a no-data batch, 
it escaped this check and proceeded to plan the same batch X *once again*.

The solution is to have an explicit flag that signifies whether a batch has 
already been constructed or not. `populateStartOffsets` is going to set the 
flag appropriately.

## How was this patch tested?

new unit test

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #21491 from tdas/SPARK-24453.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c2a86b5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c2a86b5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c2a86b5

Branch: refs/heads/master
Commit: 2c2a86b5d5be6f77ee72d16f990b39ae59f479b9
Parents: e8c1a0c
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Jun 5 01:08:55 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Jun 5 01:08:55 2018 -0700

----------------------------------------------------------------------
 .../streaming/MicroBatchExecution.scala         | 38 +++++++----
 .../streaming/MicroBatchExecutionSuite.scala    | 71 ++++++++++++++++++++
 .../apache/spark/sql/streaming/StreamTest.scala |  2 +-
 3 files changed, 98 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c2a86b5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 7817360..17ffa2a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -127,6 +127,12 @@ class MicroBatchExecution(
   }
 
   /**
+   * Signifies whether current batch (i.e. for the batch `currentBatchId`) has 
been constructed
+   * (i.e. written to the offsetLog) and is ready for execution.
+   */
+  private var isCurrentBatchConstructed = false
+
+  /**
    * Signals to the thread executing micro-batches that it should stop running 
after the next
    * batch. This method blocks until the thread stops running.
    */
@@ -154,7 +160,6 @@ class MicroBatchExecution(
 
     triggerExecutor.execute(() => {
       if (isActive) {
-        var currentBatchIsRunnable = false // Whether the current batch is 
runnable / has been run
         var currentBatchHasNewData = false // Whether the current batch had 
new data
 
         startTrigger()
@@ -175,7 +180,9 @@ class MicroBatchExecution(
           // new data to process as `constructNextBatch` may decide to run a 
batch for
           // state cleanup, etc. `isNewDataAvailable` will be updated to 
reflect whether new data
           // is available or not.
-          currentBatchIsRunnable = constructNextBatch(noDataBatchesEnabled)
+          if (!isCurrentBatchConstructed) {
+            isCurrentBatchConstructed = 
constructNextBatch(noDataBatchesEnabled)
+          }
 
           // Remember whether the current batch has data or not. This will be 
required later
           // for bookkeeping after running the batch, when 
`isNewDataAvailable` will have changed
@@ -183,7 +190,7 @@ class MicroBatchExecution(
           currentBatchHasNewData = isNewDataAvailable
 
           currentStatus = currentStatus.copy(isDataAvailable = 
isNewDataAvailable)
-          if (currentBatchIsRunnable) {
+          if (isCurrentBatchConstructed) {
             if (currentBatchHasNewData) updateStatusMessage("Processing new 
data")
             else updateStatusMessage("No new data but cleaning up state")
             runBatch(sparkSessionForStream)
@@ -194,9 +201,12 @@ class MicroBatchExecution(
 
         finishTrigger(currentBatchHasNewData)  // Must be outside 
reportTimeTaken so it is recorded
 
-        // If the current batch has been executed, then increment the batch 
id, else there was
-        // no data to execute the batch
-        if (currentBatchIsRunnable) currentBatchId += 1 else 
Thread.sleep(pollingDelayMs)
+        // If the current batch has been executed, then increment the batch id 
and reset flag.
+        // Otherwise, there was no data to execute the batch and sleep for 
some time
+        if (isCurrentBatchConstructed) {
+          currentBatchId += 1
+          isCurrentBatchConstructed = false
+        } else Thread.sleep(pollingDelayMs)
       }
       updateStatusMessage("Waiting for next trigger")
       isActive
@@ -231,6 +241,7 @@ class MicroBatchExecution(
         /* First assume that we are re-executing the latest known batch
          * in the offset log */
         currentBatchId = latestBatchId
+        isCurrentBatchConstructed = true
         availableOffsets = nextOffsets.toStreamProgress(sources)
         /* Initialize committed offsets to a committed batch, which at this
          * is the second latest batch id in the offset log. */
@@ -269,6 +280,7 @@ class MicroBatchExecution(
                   // here, so we do nothing here.
               }
               currentBatchId = latestCommittedBatchId + 1
+              isCurrentBatchConstructed = false
               committedOffsets ++= availableOffsets
               // Construct a new batch be recomputing availableOffsets
             } else if (latestCommittedBatchId < latestBatchId - 1) {
@@ -313,11 +325,8 @@ class MicroBatchExecution(
    * - If either of the above is true, then construct the next batch by 
committing to the offset
    *   log that range of offsets that the next batch will process.
    */
-  private def constructNextBatch(noDataBatchesEnables: Boolean): Boolean = 
withProgressLocked {
-    // If new data is already available that means this method has already 
been called before
-    // and it must have already committed the offset range of next batch to 
the offset log.
-    // Hence do nothing, just return true.
-    if (isNewDataAvailable) return true
+  private def constructNextBatch(noDataBatchesEnabled: Boolean): Boolean = 
withProgressLocked {
+    if (isCurrentBatchConstructed) return true
 
     // Generate a map from each unique source to the next available offset.
     val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = 
uniqueSources.map {
@@ -348,9 +357,14 @@ class MicroBatchExecution(
       batchTimestampMs = triggerClock.getTimeMillis())
 
     // Check whether next batch should be constructed
-    val lastExecutionRequiresAnotherBatch = noDataBatchesEnables &&
+    val lastExecutionRequiresAnotherBatch = noDataBatchesEnabled &&
       Option(lastExecution).exists(_.shouldRunAnotherBatch(offsetSeqMetadata))
     val shouldConstructNextBatch = isNewDataAvailable || 
lastExecutionRequiresAnotherBatch
+    logTrace(
+      s"noDataBatchesEnabled = $noDataBatchesEnabled, " +
+      s"lastExecutionRequiresAnotherBatch = 
$lastExecutionRequiresAnotherBatch, " +
+      s"isNewDataAvailable = $isNewDataAvailable, " +
+      s"shouldConstructNextBatch = $shouldConstructNextBatch")
 
     if (shouldConstructNextBatch) {
       // Commit the next batch offset range to the offset log

http://git-wip-us.apache.org/repos/asf/spark/blob/2c2a86b5/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
new file mode 100644
index 0000000..c228740
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.functions.{count, window}
+import org.apache.spark.sql.streaming.StreamTest
+
+class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+
+  after {
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("SPARK-24156: do not plan a no-data batch again after it has already 
been planned") {
+    val inputData = MemoryStream[Int]
+    val df = inputData.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "10 seconds")
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+    testStream(df)(
+      AddData(inputData, 10, 11, 12, 13, 14, 15), // Set watermark to 5
+      CheckAnswer(),
+      AddData(inputData, 25), // Set watermark to 15 to make 
MicroBatchExecution run no-data batch
+      CheckAnswer((10, 5)),   // Last batch should be a no-data batch
+      StopStream,
+      Execute { q =>
+        // Delete the last committed batch from the commit log to signify that 
the last batch
+        // (a no-data batch) never completed
+        val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
+        q.commitLog.purgeAfter(commit - 1)
+      },
+      // Add data before start so that MicroBatchExecution can plan a batch. 
It should not,
+      // it should first re-run the incomplete no-data batch and then run a 
new batch to process
+      // new data.
+      AddData(inputData, 30),
+      StartStream(),
+      CheckNewAnswer((15, 1)),   // This should not throw the error reported 
in SPARK-24156
+      StopStream,
+      Execute { q =>
+        // Delete the entire commit log
+        val commit = q.commitLog.getLatest().map(_._1).getOrElse(-1L)
+        q.commitLog.purge(commit + 1)
+      },
+      AddData(inputData, 50),
+      StartStream(),
+      CheckNewAnswer((25, 1), (30, 1))   // This should not throw the error 
reported in SPARK-24156
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2c2a86b5/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index f348dac..4c3fd58 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -292,7 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with TimeLimits with Be
   /** Execute arbitrary code */
   object Execute {
     def apply(func: StreamExecution => Any): AssertOnQuery =
-      AssertOnQuery(query => { func(query); true })
+      AssertOnQuery(query => { func(query); true }, "Execute")
   }
 
   object AwaitEpoch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to