Repository: spark
Updated Branches:
  refs/heads/branch-1.6 9c8e17984 -> 0c23dd52d


[SPARK-11870][STREAMING][PYSPARK] Rethrow the exceptions in TransformFunction 
and TransformFunctionSerializer

TransformFunction and TransformFunctionSerializer don't rethrow the exception, 
so when any exception happens, it just return None. This will cause some weird 
NPE and confuse people.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #9847 from zsxwing/pyspark-streaming-exception.

(cherry picked from commit be7a2cfd978143f6f265eca63e9e24f755bc9f22)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c23dd52
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c23dd52
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c23dd52

Branch: refs/heads/branch-1.6
Commit: 0c23dd52d64d4a3448fb7d21b0e40d13f885bcfa
Parents: 9c8e179
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Fri Nov 20 14:23:01 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Nov 20 14:23:18 2015 -0800

----------------------------------------------------------------------
 python/pyspark/streaming/tests.py | 16 ++++++++++++++++
 python/pyspark/streaming/util.py  |  3 +++
 2 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c23dd52/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 3403f6d..a0e0267 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -403,6 +403,22 @@ class BasicOperationTests(PySparkStreamingTestCase):
         expected = [[('k', v)] for v in expected]
         self._test_func(input, func, expected)
 
+    def test_failed_func(self):
+        input = [self.sc.parallelize([d], 1) for d in range(4)]
+        input_stream = self.ssc.queueStream(input)
+
+        def failed_func(i):
+            raise ValueError("failed")
+
+        input_stream.map(failed_func).pprint()
+        self.ssc.start()
+        try:
+            self.ssc.awaitTerminationOrTimeout(10)
+        except:
+            return
+
+        self.fail("a failed func should throw an error")
+
 
 class StreamingListenerTests(PySparkStreamingTestCase):
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0c23dd52/python/pyspark/streaming/util.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py
index b20613b..767c732 100644
--- a/python/pyspark/streaming/util.py
+++ b/python/pyspark/streaming/util.py
@@ -64,6 +64,7 @@ class TransformFunction(object):
                 return r._jrdd
         except Exception:
             traceback.print_exc()
+            raise
 
     def __repr__(self):
         return "TransformFunction(%s)" % self.func
@@ -95,6 +96,7 @@ class TransformFunctionSerializer(object):
             return bytearray(self.serializer.dumps((func.func, 
func.deserializers)))
         except Exception:
             traceback.print_exc()
+            raise
 
     def loads(self, data):
         try:
@@ -102,6 +104,7 @@ class TransformFunctionSerializer(object):
             return TransformFunction(self.ctx, f, *deserializers)
         except Exception:
             traceback.print_exc()
+            raise
 
     def __repr__(self):
         return "TransformFunctionSerializer(%s)" % self.serializer


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to