spark git commit: [SPARK-18164][SQL] ForeachSink should fail the Spark job if `process` throws exception

2016-10-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 578e40e3e -> 9ed8976c6


[SPARK-18164][SQL] ForeachSink should fail the Spark job if `process` throws 
exception

## What changes were proposed in this pull request?

Fixed the issue that ForeachSink didn't rethrow the exception.

## How was this patch tested?

The fixed unit test.

Author: Shixiong Zhu 

Closes #15674 from zsxwing/foreach-sink-error.

(cherry picked from commit 59cccbda489f25add3e10997e950de7e88704aa7)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ed8976c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ed8976c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ed8976c

Branch: refs/heads/branch-2.0
Commit: 9ed8976c6efc853cbe6dde3c5f44d55d063c7a20
Parents: 578e40e
Author: Shixiong Zhu 
Authored: Fri Oct 28 20:14:38 2016 -0700
Committer: Shixiong Zhu 
Committed: Fri Oct 28 20:14:53 2016 -0700

--
 .../sql/execution/streaming/ForeachSink.scala|  7 ++-
 .../execution/streaming/ForeachSinkSuite.scala   | 19 ++-
 2 files changed, 16 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9ed8976c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index 082664a..24f98b9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -68,19 +68,16 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) 
extends Sink with Seria
   }
 datasetWithIncrementalExecution.foreachPartition { iter =>
   if (writer.open(TaskContext.getPartitionId(), batchId)) {
-var isFailed = false
 try {
   while (iter.hasNext) {
 writer.process(iter.next())
   }
 } catch {
   case e: Throwable =>
-isFailed = true
 writer.close(e)
+throw e
 }
-if (!isFailed) {
-  writer.close(null)
-}
+writer.close(null)
   } else {
 writer.close(null)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9ed8976c/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index 7928b8e..9e05921 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -23,8 +23,9 @@ import scala.collection.mutable
 
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.ForeachWriter
-import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, 
StreamTest}
 import org.apache.spark.sql.test.SharedSQLContext
 
 class ForeachSinkSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfter {
@@ -136,7 +137,7 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
 }
   }
 
-  test("foreach with error") {
+  testQuietly("foreach with error") {
 withTempDir { checkpointDir =>
   val input = MemoryStream[Int]
   val query = input.toDS().repartition(1).writeStream
@@ -148,16 +149,24 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
   }
 }).start()
   input.addData(1, 2, 3, 4)
-  query.processAllAvailable()
+
+  // Error in `process` should fail the Spark job
+  val e = intercept[StreamingQueryException] {
+query.processAllAvailable()
+  }
+  assert(e.getCause.isInstanceOf[SparkException])
+  assert(e.getCause.getCause.getMessage === "error")
+  assert(query.isActive === false)
 
   val allEvents = ForeachSinkSuite.allEvents()
   assert(allEvents.size === 1)
   assert(allEvents(0)(0) === ForeachSinkSuite.Open(partition = 0, version 
= 0))
-  assert(allEvents(0)(1) ===  ForeachSinkSuite.Process(value = 1))
+  assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1))
+
+  // `close` should be called with the error
   val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close]
   assert

spark git commit: [SPARK-18164][SQL] ForeachSink should fail the Spark job if `process` throws exception

2016-10-28 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master ac26e9cf2 -> 59cccbda4


[SPARK-18164][SQL] ForeachSink should fail the Spark job if `process` throws 
exception

## What changes were proposed in this pull request?

Fixed the issue that ForeachSink didn't rethrow the exception.

## How was this patch tested?

The fixed unit test.

Author: Shixiong Zhu 

Closes #15674 from zsxwing/foreach-sink-error.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59cccbda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59cccbda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59cccbda

Branch: refs/heads/master
Commit: 59cccbda489f25add3e10997e950de7e88704aa7
Parents: ac26e9c
Author: Shixiong Zhu 
Authored: Fri Oct 28 20:14:38 2016 -0700
Committer: Shixiong Zhu 
Committed: Fri Oct 28 20:14:38 2016 -0700

--
 .../sql/execution/streaming/ForeachSink.scala|  7 ++-
 .../execution/streaming/ForeachSinkSuite.scala   | 19 ++-
 2 files changed, 16 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/59cccbda/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index 082664a..24f98b9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -68,19 +68,16 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) 
extends Sink with Seria
   }
 datasetWithIncrementalExecution.foreachPartition { iter =>
   if (writer.open(TaskContext.getPartitionId(), batchId)) {
-var isFailed = false
 try {
   while (iter.hasNext) {
 writer.process(iter.next())
   }
 } catch {
   case e: Throwable =>
-isFailed = true
 writer.close(e)
+throw e
 }
-if (!isFailed) {
-  writer.close(null)
-}
+writer.close(null)
   } else {
 writer.close(null)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/59cccbda/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index 7928b8e..9e05921 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -23,8 +23,9 @@ import scala.collection.mutable
 
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.ForeachWriter
-import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, 
StreamTest}
 import org.apache.spark.sql.test.SharedSQLContext
 
 class ForeachSinkSuite extends StreamTest with SharedSQLContext with 
BeforeAndAfter {
@@ -136,7 +137,7 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
 }
   }
 
-  test("foreach with error") {
+  testQuietly("foreach with error") {
 withTempDir { checkpointDir =>
   val input = MemoryStream[Int]
   val query = input.toDS().repartition(1).writeStream
@@ -148,16 +149,24 @@ class ForeachSinkSuite extends StreamTest with 
SharedSQLContext with BeforeAndAf
   }
 }).start()
   input.addData(1, 2, 3, 4)
-  query.processAllAvailable()
+
+  // Error in `process` should fail the Spark job
+  val e = intercept[StreamingQueryException] {
+query.processAllAvailable()
+  }
+  assert(e.getCause.isInstanceOf[SparkException])
+  assert(e.getCause.getCause.getMessage === "error")
+  assert(query.isActive === false)
 
   val allEvents = ForeachSinkSuite.allEvents()
   assert(allEvents.size === 1)
   assert(allEvents(0)(0) === ForeachSinkSuite.Open(partition = 0, version 
= 0))
-  assert(allEvents(0)(1) ===  ForeachSinkSuite.Process(value = 1))
+  assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1))
+
+  // `close` should be called with the error
   val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close]
   assert(errorEvent.error.get.isInstanceOf[RuntimeException])
   assert(errorEvent.error.get.getMessage === "er