Repository: spark
Updated Branches:
  refs/heads/branch-1.3 5278ef0f1 -> 387d81891


[SPARK-11812][PYSPARK] invFunc=None works properly with python's 
reduceByKeyAndWindow

invFunc is optional and can be None. Instead of invFunc (the parameter) 
invReduceFunc (a local function) was checked for trueness (that is, not None, 
in this context). A local function is never None,
thus the case of invFunc=None (a common one when inverse reduction is not 
defined) was treated incorrectly, resulting in loss of data.

In addition, the docstring used wrong parameter names, also fixed.

Author: David Tolpin <david.tol...@gmail.com>

Closes #9775 from dtolpin/master.

(cherry picked from commit 599a8c6e2bf7da70b20ef3046f5ce099dfd637f8)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/387d8189
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/387d8189
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/387d8189

Branch: refs/heads/branch-1.3
Commit: 387d818918f1df34cf15e3c217621b1b794107ea
Parents: 5278ef0
Author: David Tolpin <david.tol...@gmail.com>
Authored: Thu Nov 19 13:57:23 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Nov 19 13:59:06 2015 -0800

----------------------------------------------------------------------
 python/pyspark/streaming/dstream.py |  6 +++---
 python/pyspark/streaming/tests.py   | 11 +++++++++++
 2 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/387d8189/python/pyspark/streaming/dstream.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/dstream.py 
b/python/pyspark/streaming/dstream.py
index 3fa4244..994f08d 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -520,8 +520,8 @@ class DStream(object):
         `invFunc` can be None, then it will reduce all the RDDs in window, 
could be slower
         than having `invFunc`.
 
-        @param reduceFunc:     associative reduce function
-        @param invReduceFunc:  inverse function of `reduceFunc`
+        @param func:           associative reduce function
+        @param invFunc:        inverse function of `reduceFunc`
         @param windowDuration: width of the window; must be a multiple of this 
DStream's
                               batching interval
         @param slideDuration:  sliding interval of the window (i.e., the 
interval after which
@@ -551,7 +551,7 @@ class DStream(object):
             return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is 
not None else v1)
 
         jreduceFunc = TransformFunction(self._sc, reduceFunc, 
reduced._jrdd_deserializer)
-        if invReduceFunc:
+        if invFunc:
             jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, 
reduced._jrdd_deserializer)
         else:
             jinvReduceFunc = None

http://git-wip-us.apache.org/repos/asf/spark/blob/387d8189/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 608f8e2..c136bf1 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -415,6 +415,17 @@ class WindowFunctionTests(PySparkStreamingTestCase):
         self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, 
None, 0.1, 0.1))
         self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, 
None, 1, 0.1))
 
+    def test_reduce_by_key_and_window_with_none_invFunc(self):
+        input = [range(1), range(2), range(3), range(4), range(5), range(6)]
+
+        def func(dstream):
+            return dstream.map(lambda x: (x, 1))\
+                .reduceByKeyAndWindow(operator.add, None, 5, 1)\
+                .filter(lambda kv: kv[1] > 0).count()
+
+        expected = [[2], [4], [6], [6], [6], [6]]
+        self._test_func(input, func, expected)
+
 
 class StreamingContextTests(PySparkStreamingTestCase):
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to