Repository: spark
Updated Branches:
  refs/heads/branch-2.0 08ae32e61 -> 1c2082b64


[SPARK-14579][SQL] Fix the race condition in 
StreamExecution.processAllAvailable again

## What changes were proposed in this pull request?

#12339 didn't fix the race condition. MemorySinkSuite is still flaky: 
https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.2/814/testReport/junit/org.apache.spark.sql.streaming/MemorySinkSuite/registering_as_a_table/

Here is an execution order to reproduce it.

| Time        |Thread 1           | MicroBatchThread  |
|:-------------:|:-------------:|:-----:|
| 1 | |  `MemorySink.getOffset` |
| 2 | |  availableOffsets ++= newData (availableOffsets is not changed here)  |
| 3 | addData(newData)      |   |
| 4 | Set `noNewData` to `false` in  processAllAvailable |  |
| 5 | | `dataAvailable` returns `false`   |
| 6 | | noNewData = true |
| 7 | `noNewData` is true so just return | |
| 8 |  assert results and fail | |
| 9 |   | `dataAvailable` returns true so process the new batch |

This PR expands the scope of `awaitBatchLock.synchronized` to eliminate the 
above race.

## How was this patch tested?

test("stress test"). It always failed before this patch. And it will pass after 
applying this patch. Ignore this test in the PR as it takes several minutes to 
finish.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #12582 from zsxwing/SPARK-14579-2.

(cherry picked from commit a35a67a83dbb450d26ce0d142ab106e952670842)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c2082b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c2082b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c2082b6

Branch: refs/heads/branch-2.0
Commit: 1c2082b643dc01fdeb2405c97dcf5a9551cc782d
Parents: 08ae32e
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Mon May 2 11:28:21 2016 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon May 2 11:28:32 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/streaming/StreamExecution.scala   | 10 +++++-----
 .../org/apache/spark/sql/streaming/MemorySinkSuite.scala  |  9 +++++++++
 2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1c2082b6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index fc18e5f..ce68c09 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -242,12 +242,12 @@ class StreamExecution(
     // method. See SPARK-14131.
     //
     // Check to see what new data is available.
-    val newData = microBatchThread.runUninterruptibly {
-      uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
-    }
-    availableOffsets ++= newData
-
     val hasNewData = awaitBatchLock.synchronized {
+      val newData = microBatchThread.runUninterruptibly {
+        uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
+      }
+      availableOffsets ++= newData
+
       if (dataAvailable) {
         true
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/1c2082b6/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
index 1f28340..74ca397 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -26,6 +26,15 @@ class MemorySinkSuite extends StreamTest with 
SharedSQLContext {
   import testImplicits._
 
   test("registering as a table") {
+    testRegisterAsTable()
+  }
+
+  ignore("stress test") {
+    // Ignore the stress test as it takes several minutes to run
+    (0 until 1000).foreach(_ => testRegisterAsTable())
+  }
+
+  private def testRegisterAsTable(): Unit = {
     val input = MemoryStream[Int]
     val query = input.toDF().write
       .format("memory")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to