Repository: spark Updated Branches: refs/heads/master 9c6556c5f -> a4ead6d38
[SPARK-14382][SQL] QueryProgress should be post after committedOffsets is updated ## What changes were proposed in this pull request? Make sure QueryProgress is post after committedOffsets is updated. If QueryProgress is post before committedOffsets is updated, the listener may see a wrong sinkStatus (created from committedOffsets). See https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.2/644/testReport/junit/org.apache.spark.sql.util/ContinuousQueryListenerSuite/single_listener/ for an example of the failure. ## How was this patch tested? Existing unit tests. Author: Shixiong Zhu <shixi...@databricks.com> Closes #12155 from zsxwing/SPARK-14382. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4ead6d3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4ead6d3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4ead6d3 Branch: refs/heads/master Commit: a4ead6d3881f071a2ae53ff1c961c6ac388cac1d Parents: 9c6556c Author: Shixiong Zhu <shixi...@databricks.com> Authored: Wed Apr 6 12:28:04 2016 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Wed Apr 6 12:28:04 2016 -0700 ---------------------------------------------------------------------- .../sql/execution/streaming/StreamExecution.scala | 15 +++++---------- .../sql/util/ContinuousQueryListenerSuite.scala | 3 +-- 2 files changed, 6 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a4ead6d3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 3e4acb7..688e051 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -159,7 +159,7 @@ class StreamExecution( triggerExecutor.execute(() => { if (isActive) { if (dataAvailable) runBatch() - commitAndConstructNextBatch() + constructNextBatch() true } else { false @@ -207,7 +207,7 @@ class StreamExecution( case None => // We are starting this stream for the first time. logInfo(s"Starting new continuous query.") currentBatchId = 0 - commitAndConstructNextBatch() + constructNextBatch() } } @@ -227,15 +227,8 @@ class StreamExecution( /** * Queries all of the sources to see if any new data is available. When there is new data the * batchId counter is incremented and a new log entry is written with the newest offsets. - * - * Note that committing the offsets for a new batch implicitly marks the previous batch as - * finished and thus this method should only be called when all currently available data - * has been written to the sink. */ - private def commitAndConstructNextBatch(): Boolean = { - // Update committed offsets. - committedOffsets ++= availableOffsets - + private def constructNextBatch(): Boolean = { // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). // If we interrupt some thread running Shell.runCommand, we may hit this issue. // As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand" @@ -331,6 +324,8 @@ class StreamExecution( val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 logInfo(s"Completed up to $availableOffsets in ${batchTime}ms") + // Update committed offsets. + committedOffsets ++= availableOffsets postEvent(new QueryProgress(this)) } http://git-wip-us.apache.org/repos/asf/spark/blob/a4ead6d3/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala index d04783e..3498fe8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala @@ -146,7 +146,6 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = { - @volatile var query: StreamExecution = null try { failAfter(1 minute) { sqlContext.streams.addListener(listener) @@ -212,7 +211,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with case class QueryStatus( active: Boolean, - expection: Option[Exception], + exception: Option[Exception], sourceStatuses: Array[SourceStatus], sinkStatus: SinkStatus) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org