Repository: spark
Updated Branches:
  refs/heads/master fabc8e5b1 -> 95f4fbae5


[SPARK-14942][SQL][STREAMING] Reduce delay between batch construction and 
execution

## Problem

Currently in `StreamExecution`, [we first run the batch, then construct the 
next](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L165):
```scala
if (dataAvailable) runBatch()
constructNextBatch()
```

This is good when we run batches ASAP, where data would get processed in the 
**very next batch**:

![1](https://cloud.githubusercontent.com/assets/15843379/14779964/2786e698-0b0d-11e6-9d2c-bb41513488b2.png)

However, when we run batches at trigger like `ProcessTime("1 minute")`, data - 
such as _y_ below - may not get processed in the very next batch i.e. _batch 
1_, but in _batch 2_:

![2](https://cloud.githubusercontent.com/assets/15843379/14779818/6f3bb064-0b0c-11e6-9f16-c1ce4897186b.png)

## What changes were proposed in this pull request?

This patch reverses the order of `constructNextBatch()` and `runBatch()`. After 
this patch, data would get processed in the **very next batch**, i.e. _batch 1_:

![3](https://cloud.githubusercontent.com/assets/15843379/14779816/6f36ee62-0b0c-11e6-9e53-bc8397fade18.png)

In addition, this patch alters when we do `currentBatchId += 1`: let's do that 
when the processing of the current batch's data is completed, so we won't 
bother passing `currentBatchId + 1` or  `currentBatchId - 1` to states or sinks.

## How was this patch tested?

New added test case. Also this should be covered by existing test suits, e.g. 
stress tests and others.

Author: Liwei Lin <lwl...@gmail.com>

Closes #12725 from lw-lin/construct-before-run-3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95f4fbae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95f4fbae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95f4fbae

Branch: refs/heads/master
Commit: 95f4fbae52d26ede94c3ba8248394749f3d95dcc
Parents: fabc8e5
Author: Liwei Lin <lwl...@gmail.com>
Authored: Mon May 16 12:59:55 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Mon May 16 12:59:55 2016 -0700

----------------------------------------------------------------------
 .../streaming/IncrementalExecution.scala        |  6 +-
 .../execution/streaming/StreamExecution.scala   | 24 ++++--
 .../spark/sql/execution/streaming/memory.scala  |  4 +
 .../spark/sql/streaming/StreamSuite.scala       | 84 ++++++++++++++++++--
 4 files changed, 99 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/95f4fbae/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index e9052a3..8b96f65 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -27,12 +27,12 @@ import org.apache.spark.sql.execution.{QueryExecution, 
SparkPlan, SparkPlanner,
  * A variant of [[QueryExecution]] that allows the execution of the given 
[[LogicalPlan]]
  * plan incrementally. Possibly preserving state in between each execution.
  */
-class IncrementalExecution(
+class IncrementalExecution private[sql](
     sparkSession: SparkSession,
     logicalPlan: LogicalPlan,
     outputMode: OutputMode,
     checkpointLocation: String,
-    currentBatchId: Long)
+    val currentBatchId: Long)
   extends QueryExecution(sparkSession, logicalPlan) {
 
   // TODO: make this always part of planning.
@@ -57,7 +57,7 @@ class IncrementalExecution(
       case StateStoreSaveExec(keys, None,
              UnaryExecNode(agg,
                StateStoreRestoreExec(keys2, None, child))) =>
-        val stateId = OperatorStateId(checkpointLocation, operatorId, 
currentBatchId - 1)
+        val stateId = OperatorStateId(checkpointLocation, operatorId, 
currentBatchId)
         operatorId += 1
 
         StateStoreSaveExec(

http://git-wip-us.apache.org/repos/asf/spark/blob/95f4fbae/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ea367b6..df6304d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -122,7 +122,7 @@ class StreamExecution(
    * processing is done.  Thus, the Nth record in this log indicated data that 
is currently being
    * processed and the N-1th entry indicates which offsets have been durably 
committed to the sink.
    */
-  private val offsetLog =
+  private[sql] val offsetLog =
     new HDFSMetadataLog[CompositeOffset](sparkSession, 
checkpointFile("offsets"))
 
   /** Whether the query is currently active or not */
@@ -174,12 +174,21 @@ class StreamExecution(
 
       // While active, repeatedly attempt to run batches.
       SQLContext.setActive(sparkSession.wrapped)
-      populateStartOffsets()
-      logDebug(s"Stream running from $committedOffsets to $availableOffsets")
+
       triggerExecutor.execute(() => {
         if (isActive) {
-          if (dataAvailable) runBatch()
-          constructNextBatch()
+          if (currentBatchId < 0) {
+            // We'll do this initialization only once
+            populateStartOffsets()
+            logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+          } else {
+            constructNextBatch()
+          }
+          if (dataAvailable) {
+            runBatch()
+            // We'll increase currentBatchId after we complete processing 
current batch's data
+            currentBatchId += 1
+          }
           true
         } else {
           false
@@ -214,7 +223,7 @@ class StreamExecution(
     offsetLog.getLatest() match {
       case Some((batchId, nextOffsets)) =>
         logInfo(s"Resuming continuous query, starting with batch $batchId")
-        currentBatchId = batchId + 1
+        currentBatchId = batchId
         availableOffsets = nextOffsets.toStreamProgress(sources)
         logDebug(s"Found possibly uncommitted offsets $availableOffsets")
 
@@ -285,7 +294,6 @@ class StreamExecution(
           offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
           s"Concurrent update to the log.  Multiple streaming jobs detected 
for $currentBatchId")
       }
-      currentBatchId += 1
       logInfo(s"Committed offsets for batch $currentBatchId.")
     } else {
       awaitBatchLock.lock()
@@ -352,7 +360,7 @@ class StreamExecution(
 
     val nextBatch =
       new Dataset(sparkSession, lastExecution, 
RowEncoder(lastExecution.analyzed.schema))
-    sink.addBatch(currentBatchId - 1, nextBatch)
+    sink.addBatch(currentBatchId, nextBatch)
 
     awaitBatchLock.lock()
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/95f4fbae/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index a34927f..bcc33ae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -124,6 +124,10 @@ class MemorySink(val schema: StructType) extends Sink with 
Logging {
     batches.flatten
   }
 
+  def latestBatchId: Option[Int] = synchronized {
+    if (batches.size == 0) None else Some(batches.size - 1)
+  }
+
   def lastBatch: Seq[Row] = synchronized { batches.last }
 
   def toDebugString: String = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/95f4fbae/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index 6a8b280..013b731 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -137,20 +137,88 @@ class StreamSuite extends StreamTest with 
SharedSQLContext {
     }
   }
 
-  // This would fail for now -- error is "Timed out waiting for stream"
-  // Root cause is that data generated in batch 0 may not get processed in 
batch 1
-  // Let's enable this after SPARK-14942: Reduce delay between batch 
construction and execution
-  ignore("minimize delay between batch construction and execution") {
+  test("minimize delay between batch construction and execution") {
+
+    // For each batch, we would retrieve new data's offsets and log them 
before we run the execution
+    // This checks whether the key of the offset log is the expected batch id
+    def CheckOffsetLogLatestBatchId(expectedId: Int): AssertOnQuery =
+      AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
+        s"offsetLog's latest should be $expectedId")
+
+    // For each batch, we would log the state change during the execution
+    // This checks whether the key of the state change log is the expected 
batch id
+    def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): 
AssertOnQuery =
+      
AssertOnQuery(_.lastExecution.asInstanceOf[IncrementalExecution].currentBatchId 
== expectedId,
+        s"lastExecution's currentBatchId should be $expectedId")
+
+    // For each batch, we would log the sink change after the execution
+    // This checks whether the key of the sink change log is the expected 
batch id
+    def CheckSinkLatestBatchId(expectedId: Int): AssertOnQuery =
+      AssertOnQuery(_.sink.asInstanceOf[MemorySink].latestBatchId.get == 
expectedId,
+        s"sink's lastBatchId should be $expectedId")
+
     val inputData = MemoryStream[Int]
     testStream(inputData.toDS())(
       StartStream(ProcessingTime("10 seconds"), new ManualClock),
+
       /* -- batch 0 ----------------------- */
-      AddData(inputData, 1),
-      AddData(inputData, 2),
-      AddData(inputData, 3),
+      // Add some data in batch 0
+      AddData(inputData, 1, 2, 3),
       AdvanceManualClock(10 * 1000), // 10 seconds
+
       /* -- batch 1 ----------------------- */
-      CheckAnswer(1, 2, 3))
+      // Check the results of batch 0
+      CheckAnswer(1, 2, 3),
+      CheckIncrementalExecutionCurrentBatchId(0),
+      CheckOffsetLogLatestBatchId(0),
+      CheckSinkLatestBatchId(0),
+      // Add some data in batch 1
+      AddData(inputData, 4, 5, 6),
+      AdvanceManualClock(10 * 1000),
+
+      /* -- batch _ ----------------------- */
+      // Check the results of batch 1
+      CheckAnswer(1, 2, 3, 4, 5, 6),
+      CheckIncrementalExecutionCurrentBatchId(1),
+      CheckOffsetLogLatestBatchId(1),
+      CheckSinkLatestBatchId(1),
+
+      AdvanceManualClock(10 * 1000),
+      AdvanceManualClock(10 * 1000),
+      AdvanceManualClock(10 * 1000),
+
+      /* -- batch __ ---------------------- */
+      // Check the results of batch 1 again; this is to make sure that, when 
there's no new data,
+      // the currentId does not get logged (e.g. as 2) even if the clock has 
advanced many times
+      CheckAnswer(1, 2, 3, 4, 5, 6),
+      CheckIncrementalExecutionCurrentBatchId(1),
+      CheckOffsetLogLatestBatchId(1),
+      CheckSinkLatestBatchId(1),
+
+      /* Stop then restart the Stream  */
+      StopStream,
+      StartStream(ProcessingTime("10 seconds"), new ManualClock),
+
+      /* -- batch 1 rerun ----------------- */
+      // this batch 1 would re-run because the latest batch id logged in 
offset log is 1
+      AdvanceManualClock(10 * 1000),
+
+      /* -- batch 2 ----------------------- */
+      // Check the results of batch 1
+      CheckAnswer(1, 2, 3, 4, 5, 6),
+      CheckIncrementalExecutionCurrentBatchId(1),
+      CheckOffsetLogLatestBatchId(1),
+      CheckSinkLatestBatchId(1),
+      // Add some data in batch 2
+      AddData(inputData, 7, 8, 9),
+      AdvanceManualClock(10 * 1000),
+
+      /* -- batch 3 ----------------------- */
+      // Check the results of batch 2
+      CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9),
+      CheckIncrementalExecutionCurrentBatchId(2),
+      CheckOffsetLogLatestBatchId(2),
+      CheckSinkLatestBatchId(2))
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to