[SPARK-11904][PYSPARK] reduceByKeyAndWindow does not require checkpointing when invFunc is None
when invFunc is None, `reduceByKeyAndWindow(func, None, winsize, slidesize)` is equivalent to reduceByKey(func).window(winsize, slidesize).reduceByKey(winsize, slidesize) and no checkpoint is necessary. The corresponding Scala code does exactly that, but Python code always creates a windowed stream with obligatory checkpointing. The patch fixes this. I do not know how to unit-test this. Author: David Tolpin <david.tol...@gmail.com> Closes #9888 from dtolpin/master. Project: http://git-wip-us.apache.org/repos/asf/bahir/repo Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/c25b799a Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/c25b799a Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/c25b799a Branch: refs/heads/master Commit: c25b799a8df1a79f66feb94e7a60be34d2a2b262 Parents: e30f0c2 Author: David Tolpin <david.tol...@gmail.com> Authored: Wed Dec 16 22:10:24 2015 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Dec 16 22:10:24 2015 -0800 ---------------------------------------------------------------------- streaming-mqtt/python/dstream.py | 45 ++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bahir/blob/c25b799a/streaming-mqtt/python/dstream.py ---------------------------------------------------------------------- diff --git a/streaming-mqtt/python/dstream.py b/streaming-mqtt/python/dstream.py index f61137c..b994a53 100644 --- a/streaming-mqtt/python/dstream.py +++ b/streaming-mqtt/python/dstream.py @@ -542,31 +542,32 @@ class DStream(object): reduced = self.reduceByKey(func, numPartitions) - def reduceFunc(t, a, b): - b = b.reduceByKey(func, numPartitions) - r = a.union(b).reduceByKey(func, numPartitions) if a else b - if filterFunc: - r = r.filter(filterFunc) - return r - - def invReduceFunc(t, a, b): - b = b.reduceByKey(func, numPartitions) - joined = a.leftOuterJoin(b, numPartitions) - return joined.mapValues(lambda kv: invFunc(kv[0], kv[1]) - if kv[1] is not None else kv[0]) - - jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) if invFunc: + def reduceFunc(t, a, b): + b = b.reduceByKey(func, numPartitions) + r = a.union(b).reduceByKey(func, numPartitions) if a else b + if filterFunc: + r = r.filter(filterFunc) + return r + + def invReduceFunc(t, a, b): + b = b.reduceByKey(func, numPartitions) + joined = a.leftOuterJoin(b, numPartitions) + return joined.mapValues(lambda kv: invFunc(kv[0], kv[1]) + if kv[1] is not None else kv[0]) + + jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer) + if slideDuration is None: + slideDuration = self._slideDuration + dstream = self._sc._jvm.PythonReducedWindowedDStream( + reduced._jdstream.dstream(), + jreduceFunc, jinvReduceFunc, + self._ssc._jduration(windowDuration), + self._ssc._jduration(slideDuration)) + return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) else: - jinvReduceFunc = None - if slideDuration is None: - slideDuration = self._slideDuration - dstream = self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(), - jreduceFunc, jinvReduceFunc, - self._ssc._jduration(windowDuration), - self._ssc._jduration(slideDuration)) - return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) + return reduced.window(windowDuration, slideDuration).reduceByKey(func, numPartitions) def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None): """