spark git commit: [SPARK-18164][SQL] ForeachSink should fail the Spark job if `process` throws exception
Repository: spark Updated Branches: refs/heads/branch-2.0 578e40e3e -> 9ed8976c6 [SPARK-18164][SQL] ForeachSink should fail the Spark job if `process` throws exception ## What changes were proposed in this pull request? Fixed the issue that ForeachSink didn't rethrow the exception. ## How was this patch tested? The fixed unit test. Author: Shixiong Zhu Closes #15674 from zsxwing/foreach-sink-error. (cherry picked from commit 59cccbda489f25add3e10997e950de7e88704aa7) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ed8976c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ed8976c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ed8976c Branch: refs/heads/branch-2.0 Commit: 9ed8976c6efc853cbe6dde3c5f44d55d063c7a20 Parents: 578e40e Author: Shixiong Zhu Authored: Fri Oct 28 20:14:38 2016 -0700 Committer: Shixiong Zhu Committed: Fri Oct 28 20:14:53 2016 -0700 -- .../sql/execution/streaming/ForeachSink.scala| 7 ++- .../execution/streaming/ForeachSinkSuite.scala | 19 ++- 2 files changed, 16 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9ed8976c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala index 082664a..24f98b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala @@ -68,19 +68,16 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Seria } datasetWithIncrementalExecution.foreachPartition { iter => if (writer.open(TaskContext.getPartitionId(), batchId)) { -var isFailed = false try { while (iter.hasNext) { writer.process(iter.next()) } } catch { case e: Throwable => -isFailed = true writer.close(e) +throw e } -if (!isFailed) { - writer.close(null) -} +writer.close(null) } else { writer.close(null) } http://git-wip-us.apache.org/repos/asf/spark/blob/9ed8976c/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index 7928b8e..9e05921 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -23,8 +23,9 @@ import scala.collection.mutable import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkException import org.apache.spark.sql.ForeachWriter -import org.apache.spark.sql.streaming.{OutputMode, StreamTest} +import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest} import org.apache.spark.sql.test.SharedSQLContext class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { @@ -136,7 +137,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf } } - test("foreach with error") { + testQuietly("foreach with error") { withTempDir { checkpointDir => val input = MemoryStream[Int] val query = input.toDS().repartition(1).writeStream @@ -148,16 +149,24 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf } }).start() input.addData(1, 2, 3, 4) - query.processAllAvailable() + + // Error in `process` should fail the Spark job + val e = intercept[StreamingQueryException] { +query.processAllAvailable() + } + assert(e.getCause.isInstanceOf[SparkException]) + assert(e.getCause.getCause.getMessage === "error") + assert(query.isActive === false) val allEvents = ForeachSinkSuite.allEvents() assert(allEvents.size === 1) assert(allEvents(0)(0) === ForeachSinkSuite.Open(partition = 0, version = 0)) - assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1)) + assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1)) + + // `close` should be called with the error val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close] assert
spark git commit: [SPARK-18164][SQL] ForeachSink should fail the Spark job if `process` throws exception
Repository: spark Updated Branches: refs/heads/master ac26e9cf2 -> 59cccbda4 [SPARK-18164][SQL] ForeachSink should fail the Spark job if `process` throws exception ## What changes were proposed in this pull request? Fixed the issue that ForeachSink didn't rethrow the exception. ## How was this patch tested? The fixed unit test. Author: Shixiong Zhu Closes #15674 from zsxwing/foreach-sink-error. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59cccbda Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59cccbda Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59cccbda Branch: refs/heads/master Commit: 59cccbda489f25add3e10997e950de7e88704aa7 Parents: ac26e9c Author: Shixiong Zhu Authored: Fri Oct 28 20:14:38 2016 -0700 Committer: Shixiong Zhu Committed: Fri Oct 28 20:14:38 2016 -0700 -- .../sql/execution/streaming/ForeachSink.scala| 7 ++- .../execution/streaming/ForeachSinkSuite.scala | 19 ++- 2 files changed, 16 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/59cccbda/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala index 082664a..24f98b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala @@ -68,19 +68,16 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Seria } datasetWithIncrementalExecution.foreachPartition { iter => if (writer.open(TaskContext.getPartitionId(), batchId)) { -var isFailed = false try { while (iter.hasNext) { writer.process(iter.next()) } } catch { case e: Throwable => -isFailed = true writer.close(e) +throw e } -if (!isFailed) { - writer.close(null) -} +writer.close(null) } else { writer.close(null) } http://git-wip-us.apache.org/repos/asf/spark/blob/59cccbda/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index 7928b8e..9e05921 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -23,8 +23,9 @@ import scala.collection.mutable import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkException import org.apache.spark.sql.ForeachWriter -import org.apache.spark.sql.streaming.{OutputMode, StreamTest} +import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest} import org.apache.spark.sql.test.SharedSQLContext class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter { @@ -136,7 +137,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf } } - test("foreach with error") { + testQuietly("foreach with error") { withTempDir { checkpointDir => val input = MemoryStream[Int] val query = input.toDS().repartition(1).writeStream @@ -148,16 +149,24 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf } }).start() input.addData(1, 2, 3, 4) - query.processAllAvailable() + + // Error in `process` should fail the Spark job + val e = intercept[StreamingQueryException] { +query.processAllAvailable() + } + assert(e.getCause.isInstanceOf[SparkException]) + assert(e.getCause.getCause.getMessage === "error") + assert(query.isActive === false) val allEvents = ForeachSinkSuite.allEvents() assert(allEvents.size === 1) assert(allEvents(0)(0) === ForeachSinkSuite.Open(partition = 0, version = 0)) - assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1)) + assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1)) + + // `close` should be called with the error val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close] assert(errorEvent.error.get.isInstanceOf[RuntimeException]) assert(errorEvent.error.get.getMessage === "er