Repository: spark Updated Branches: refs/heads/branch-1.5 817c38a0a -> 4e72839b7
[SPARK-10122] [PYSPARK] [STREAMING] Fix getOffsetRanges bug in PySpark-Streaming transform function Details of the bug and explanations can be seen in [SPARK-10122](https://issues.apache.org/jira/browse/SPARK-10122). tdas , please help to review. Author: jerryshao <ss...@hortonworks.com> Closes #8347 from jerryshao/SPARK-10122 and squashes the following commits: 4039b16 [jerryshao] Fix getOffsetRanges in transform() bug Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e72839b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e72839b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e72839b Branch: refs/heads/branch-1.5 Commit: 4e72839b7b1e0b925837b49534a07188a603d838 Parents: 817c38a Author: jerryshao <ss...@hortonworks.com> Authored: Fri Aug 21 13:10:11 2015 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Fri Aug 21 13:17:48 2015 -0700 ---------------------------------------------------------------------- python/pyspark/streaming/dstream.py | 5 ++++- python/pyspark/streaming/tests.py | 4 +++- 2 files changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4e72839b/python/pyspark/streaming/dstream.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 8dcb964..698336c 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -610,7 +610,10 @@ class TransformedDStream(DStream): self.is_checkpointed = False self._jdstream_val = None - if (isinstance(prev, TransformedDStream) and + # Using type() to avoid folding the functions and compacting the DStreams which is not + # not strictly a object of TransformedDStream. + # Changed here is to avoid bug in KafkaTransformedDStream when calling offsetRanges(). + if (type(prev) is TransformedDStream and not prev.is_cached and not prev.is_checkpointed): prev_func = prev.func self.func = lambda t, rdd: func(t, prev_func(t, rdd)) http://git-wip-us.apache.org/repos/asf/spark/blob/4e72839b/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 6108c84..214d5be 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -850,7 +850,9 @@ class KafkaStreamTests(PySparkStreamingTestCase): offsetRanges.append(o) return rdd - stream.transform(transformWithOffsetRanges).foreachRDD(lambda rdd: rdd.count()) + # Test whether it is ok mixing KafkaTransformedDStream and TransformedDStream together, + # only the TransformedDstreams can be folded together. + stream.transform(transformWithOffsetRanges).map(lambda kv: kv[1]).count().pprint() self.ssc.start() self.wait_for(offsetRanges, 1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org