Repository: spark
Updated Branches:
  refs/heads/master 9c6556c5f -> a4ead6d38


[SPARK-14382][SQL] QueryProgress should be post after committedOffsets is 
updated

## What changes were proposed in this pull request?

Make sure QueryProgress is post after committedOffsets is updated. If 
QueryProgress is post before committedOffsets is updated, the listener may see 
a wrong sinkStatus (created from committedOffsets).

See 
https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.2/644/testReport/junit/org.apache.spark.sql.util/ContinuousQueryListenerSuite/single_listener/
 for an example of the failure.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #12155 from zsxwing/SPARK-14382.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4ead6d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4ead6d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4ead6d3

Branch: refs/heads/master
Commit: a4ead6d3881f071a2ae53ff1c961c6ac388cac1d
Parents: 9c6556c
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Apr 6 12:28:04 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Apr 6 12:28:04 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/streaming/StreamExecution.scala    | 15 +++++----------
 .../sql/util/ContinuousQueryListenerSuite.scala      |  3 +--
 2 files changed, 6 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a4ead6d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 3e4acb7..688e051 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -159,7 +159,7 @@ class StreamExecution(
       triggerExecutor.execute(() => {
         if (isActive) {
           if (dataAvailable) runBatch()
-          commitAndConstructNextBatch()
+          constructNextBatch()
           true
         } else {
           false
@@ -207,7 +207,7 @@ class StreamExecution(
       case None => // We are starting this stream for the first time.
         logInfo(s"Starting new continuous query.")
         currentBatchId = 0
-        commitAndConstructNextBatch()
+        constructNextBatch()
     }
   }
 
@@ -227,15 +227,8 @@ class StreamExecution(
   /**
    * Queries all of the sources to see if any new data is available. When 
there is new data the
    * batchId counter is incremented and a new log entry is written with the 
newest offsets.
-   *
-   * Note that committing the offsets for a new batch implicitly marks the 
previous batch as
-   * finished and thus this method should only be called when all currently 
available data
-   * has been written to the sink.
    */
-  private def commitAndConstructNextBatch(): Boolean = {
-    // Update committed offsets.
-    committedOffsets ++= availableOffsets
-
+  private def constructNextBatch(): Boolean = {
     // There is a potential dead-lock in Hadoop "Shell.runCommand" before 
2.5.0 (HADOOP-10622).
     // If we interrupt some thread running Shell.runCommand, we may hit this 
issue.
     // As "FileStreamSource.getOffset" will create a file using HDFS API and 
call "Shell.runCommand"
@@ -331,6 +324,8 @@ class StreamExecution(
 
     val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
     logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
+    // Update committed offsets.
+    committedOffsets ++= availableOffsets
     postEvent(new QueryProgress(this))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a4ead6d3/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
index d04783e..3498fe8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala
@@ -146,7 +146,6 @@ class ContinuousQueryListenerSuite extends StreamTest with 
SharedSQLContext with
 
 
   private def withListenerAdded(listener: ContinuousQueryListener)(body: => 
Unit): Unit = {
-    @volatile var query: StreamExecution = null
     try {
       failAfter(1 minute) {
         sqlContext.streams.addListener(listener)
@@ -212,7 +211,7 @@ class ContinuousQueryListenerSuite extends StreamTest with 
SharedSQLContext with
 
   case class QueryStatus(
     active: Boolean,
-    expection: Option[Exception],
+    exception: Option[Exception],
     sourceStatuses: Array[SourceStatus],
     sinkStatus: SinkStatus)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to