Repository: spark Updated Branches: refs/heads/master c050c1227 -> 569e50680
[SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error ## What changes were proposed in this pull request? We should call `StateStore.abort()` when there should be any error before the store is committed. ## How was this patch tested? Manually. Author: Liwei Lin <lwl...@gmail.com> Closes #16547 from lw-lin/append-filter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/569e5068 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/569e5068 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/569e5068 Branch: refs/heads/master Commit: 569e50680f97b1ed054337a39fe198769ef52d93 Parents: c050c12 Author: Liwei Lin <lwl...@gmail.com> Authored: Wed Jan 18 10:52:47 2017 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Jan 18 10:52:47 2017 -0800 ---------------------------------------------------------------------- .../spark/sql/execution/streaming/StatefulAggregate.scala | 8 ++++++++ .../streaming/state/HDFSBackedStateStoreProvider.scala | 2 +- .../spark/sql/execution/streaming/state/StateStore.scala | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala index 0551e4b..d4ccced 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType +import org.apache.spark.TaskContext /** Used to identify the state store for a given operator. */ @@ -150,6 +151,13 @@ case class StateStoreSaveExec( val numTotalStateRows = longMetric("numTotalStateRows") val numUpdatedStateRows = longMetric("numUpdatedStateRows") + // Abort the state store in case of error + TaskContext.get().addTaskCompletionListener(_ => { + if (!store.hasCommitted) { + store.abort() + } + }) + outputMode match { // Update and output all rows in the StateStore. case Some(Complete) => http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 4f3f818..1279b71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -203,7 +203,7 @@ private[state] class HDFSBackedStateStoreProvider( /** * Whether all updates have been committed */ - override private[state] def hasCommitted: Boolean = { + override private[streaming] def hasCommitted: Boolean = { state == COMMITTED } http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 9bc6c0e..d59746f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -83,7 +83,7 @@ trait StateStore { /** * Whether all updates have been committed */ - private[state] def hasCommitted: Boolean + private[streaming] def hasCommitted: Boolean } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org