Repository: spark
Updated Branches:
  refs/heads/master 372baf047 -> 768b3d623


[SPARK-14579][SQL] Fix a race condition in StreamExecution.processAllAvailable

## What changes were proposed in this pull request?

There is a race condition in `StreamExecution.processAllAvailable`. Here is an 
execution order to reproduce it.

| Time        |Thread 1           | MicroBatchThread  |
|:-------------:|:-------------:|:-----:|
| 1 | |  `dataAvailable in constructNextBatch` returns false  |
| 2 | addData(newData)      |   |
| 3 | `noNewData = false` in  processAllAvailable |  |
| 4 | | noNewData = true |
| 5 | `noNewData` is true so just return | |

The root cause is that `checking dataAvailable and change noNewData to true` is 
not atomic. This PR puts these two actions into `synchronized` to make sure 
they are atomic.

In addition, this PR also has the following changes:

- Make `committedOffsets` and `availableOffsets` volatile to make sure they can 
be seen in other threads.
- Copy the reference of `availableOffsets` to a local variable so that 
`sourceStatuses` can use a snapshot of `availableOffsets`.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #12339 from zsxwing/race-condition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/768b3d62
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/768b3d62
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/768b3d62

Branch: refs/heads/master
Commit: 768b3d623c29eaf960be096845b7c421f8a3ba36
Parents: 372baf0
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue Apr 12 17:31:47 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Apr 12 17:31:47 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   | 40 +++++++++++++-------
 1 file changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/768b3d62/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 688e051..87dd27a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -59,12 +59,14 @@ class StreamExecution(
    * Tracks how much data we have processed and committed to the sink or state 
store from each
    * input source.
    */
+  @volatile
   private[sql] var committedOffsets = new StreamProgress
 
   /**
    * Tracks the offsets that are available to be processed, but have not yet 
be committed to the
    * sink.
    */
+  @volatile
   private var availableOffsets = new StreamProgress
 
   /** The current batchId or -1 if execution has not yet been initialized. */
@@ -111,7 +113,8 @@ class StreamExecution(
 
   /** Returns current status of all the sources. */
   override def sourceStatuses: Array[SourceStatus] = {
-    sources.map(s => new SourceStatus(s.toString, 
availableOffsets.get(s))).toArray
+    val localAvailableOffsets = availableOffsets
+    sources.map(s => new SourceStatus(s.toString, 
localAvailableOffsets.get(s))).toArray
   }
 
   /** Returns current status of the sink. */
@@ -228,7 +231,7 @@ class StreamExecution(
    * Queries all of the sources to see if any new data is available. When 
there is new data the
    * batchId counter is incremented and a new log entry is written with the 
newest offsets.
    */
-  private def constructNextBatch(): Boolean = {
+  private def constructNextBatch(): Unit = {
     // There is a potential dead-lock in Hadoop "Shell.runCommand" before 
2.5.0 (HADOOP-10622).
     // If we interrupt some thread running Shell.runCommand, we may hit this 
issue.
     // As "FileStreamSource.getOffset" will create a file using HDFS API and 
call "Shell.runCommand"
@@ -241,7 +244,15 @@ class StreamExecution(
     }
     availableOffsets ++= newData
 
-    if (dataAvailable) {
+    val hasNewData = awaitBatchLock.synchronized {
+      if (dataAvailable) {
+        true
+      } else {
+        noNewData = true
+        false
+      }
+    }
+    if (hasNewData) {
       // There is a potential dead-lock in Hadoop "Shell.runCommand" before 
2.5.0 (HADOOP-10622).
       // If we interrupt some thread running Shell.runCommand, we may hit this 
issue.
       // As "offsetLog.add" will create a file using HDFS API and call 
"Shell.runCommand" to set
@@ -254,15 +265,11 @@ class StreamExecution(
       }
       currentBatchId += 1
       logInfo(s"Committed offsets for batch $currentBatchId.")
-      true
     } else {
-      noNewData = true
       awaitBatchLock.synchronized {
         // Wake up any threads that are waiting for the stream to progress.
         awaitBatchLock.notifyAll()
       }
-
-      false
     }
   }
 
@@ -353,7 +360,10 @@ class StreamExecution(
    * least the given `Offset`. This method is indented for use primarily when 
writing tests.
    */
   def awaitOffset(source: Source, newOffset: Offset): Unit = {
-    def notDone = !committedOffsets.contains(source) || 
committedOffsets(source) < newOffset
+    def notDone = {
+      val localCommittedOffsets = committedOffsets
+      !localCommittedOffsets.contains(source) || localCommittedOffsets(source) 
< newOffset
+    }
 
     while (notDone) {
       logInfo(s"Waiting until $newOffset at $source")
@@ -365,13 +375,17 @@ class StreamExecution(
   /** A flag to indicate that a batch has completed with no new data 
available. */
   @volatile private var noNewData = false
 
-  override def processAllAvailable(): Unit = {
+  override def processAllAvailable(): Unit = awaitBatchLock.synchronized {
     noNewData = false
-    while (!noNewData) {
-      awaitBatchLock.synchronized { awaitBatchLock.wait(10000) }
-      if (streamDeathCause != null) { throw streamDeathCause }
+    while (true) {
+      awaitBatchLock.wait(10000)
+      if (streamDeathCause != null) {
+        throw streamDeathCause
+      }
+      if (noNewData) {
+        return
+      }
     }
-    if (streamDeathCause != null) { throw streamDeathCause }
   }
 
   override def awaitTermination(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to