[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19498 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19498#discussion_r151855253 --- Diff: python/pyspark/streaming/util.py --- @@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds): t = datetime.fromtimestamp(milliseconds / 1000.0) r = self.func(t, *rdds) if r: -return r._jrdd +# Here, we work around to ensure `_jrdd` is `JavaRDD` by wrapping it by `map`. +# org.apache.spark.streaming.api.python.PythonTransformFunction requires to return +# `JavaRDD`; however, this could be `JavaPairRDD` by some APIs, for example, `zip`. +# See SPARK-17756. +return r.map(lambda x: x)._jrdd --- End diff -- Thanks for review @holdenk. Let me push the change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...
Github user holdenk commented on a diff in the pull request: https://github.com/apache/spark/pull/19498#discussion_r151839513 --- Diff: python/pyspark/streaming/util.py --- @@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds): t = datetime.fromtimestamp(milliseconds / 1000.0) r = self.func(t, *rdds) if r: -return r._jrdd +# Here, we work around to ensure `_jrdd` is `JavaRDD` by wrapping it by `map`. +# org.apache.spark.streaming.api.python.PythonTransformFunction requires to return +# `JavaRDD`; however, this could be `JavaPairRDD` by some APIs, for example, `zip`. +# See SPARK-17756. +return r.map(lambda x: x)._jrdd --- End diff -- Personally, I think the only applying the `map` when the result is not JavaRDD is a good incremental improvement (since otherwise the code path fails right?). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19498#discussion_r149649246 --- Diff: python/pyspark/streaming/util.py --- @@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds): t = datetime.fromtimestamp(milliseconds / 1000.0) r = self.func(t, *rdds) if r: -return r._jrdd +# Here, we work around to ensure `_jrdd` is `JavaRDD` by wrapping it by `map`. +# org.apache.spark.streaming.api.python.PythonTransformFunction requires to return +# `JavaRDD`; however, this could be `JavaPairRDD` by some APIs, for example, `zip`. +# See SPARK-17756. +return r.map(lambda x: x)._jrdd --- End diff -- @tdas, it's fine if you feel we should find another way or make a small benchmark. I can try. But just wonder which one you'd prefer before I go too far in a different way you'd prefer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19498#discussion_r147551966 --- Diff: python/pyspark/streaming/util.py --- @@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds): t = datetime.fromtimestamp(milliseconds / 1000.0) r = self.func(t, *rdds) if r: -return r._jrdd +# Here, we work around to ensure `_jrdd` is `JavaRDD` by wrapping it by `map`. +# org.apache.spark.streaming.api.python.PythonTransformFunction requires to return +# `JavaRDD`; however, this could be `JavaPairRDD` by some APIs, for example, `zip`. +# See SPARK-17756. +return r.map(lambda x: x)._jrdd --- End diff -- @tdas, I believe this one is something we should fix ... would you like me to keep this way, try the way above or find another way to avoid a map even if it is complex? Otherwise, I can run a benchmark on RDD with extra map too see the slight perf diff (although I am not sure yet if I can show the meaningful diff). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19498#discussion_r146531880 --- Diff: python/pyspark/streaming/util.py --- @@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds): t = datetime.fromtimestamp(milliseconds / 1000.0) r = self.func(t, *rdds) if r: -return r._jrdd +# Here, we work around to ensure `_jrdd` is `JavaRDD` by wrapping it by `map`. +# org.apache.spark.streaming.api.python.PythonTransformFunction requires to return +# `JavaRDD`; however, this could be `JavaPairRDD` by some APIs, for example, `zip`. +# See SPARK-17756. +return r.map(lambda x: x)._jrdd --- End diff -- Yup, it guess it would but I could not think of a better and simpler way (and I saw this workaround is already used above in `python/pyspark/streaming/context.py`). If a little bit more uglier (possibly flaky imho) way is okay for this concern, I could do this, for example, as below: ```python if r._jrdd.getClass().toString() == "JavaRDD": return r._jrdd else: return r.map(lambda x: x)._jrdd ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19498#discussion_r146514175 --- Diff: python/pyspark/streaming/util.py --- @@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds): t = datetime.fromtimestamp(milliseconds / 1000.0) r = self.func(t, *rdds) if r: -return r._jrdd +# Here, we work around to ensure `_jrdd` is `JavaRDD` by wrapping it by `map`. +# org.apache.spark.streaming.api.python.PythonTransformFunction requires to return +# `JavaRDD`; however, this could be `JavaPairRDD` by some APIs, for example, `zip`. +# See SPARK-17756. +return r.map(lambda x: x)._jrdd --- End diff -- wouldnt introducing this additional map cause a slight perf regression? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19498: [SPARK-17756][PYTHON][STREAMING] Workaround to av...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/19498 [SPARK-17756][PYTHON][STREAMING] Workaround to avoid return type mismatch in PythonTransformFunction ## What changes were proposed in this pull request? This PR proposes to wrap the transformed rdd within `TransformFunction`. `PythonTransformFunction` looks requiring to return `JavaRDD` in `_jrdd`. https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/python/pyspark/streaming/util.py#L67 https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala#L43 However, this could be `JavaPairRDD` by some APIs, for example, `zip` in PySpark's RDD API. `_jrdd` could be checked as below: ```python >>> rdd.zip(rdd)._jrdd.getClass().toString() u'class org.apache.spark.api.java.JavaPairRDD' ``` So, here, I wrapped it with `map` so that it ensures returning `JavaRDD`. ```python >>> rdd.zip(rdd).map(lambda x: x)._jrdd.getClass().toString() u'class org.apache.spark.api.java.JavaRDD' ``` I tried to elaborate some failure cases as below: ```python from pyspark.streaming import StreamingContext ssc = StreamingContext(spark.sparkContext, 10) ssc.queueStream([sc.range(10)]) \ .transform(lambda rdd: rdd.cartesian(rdd)) \ .pprint() ssc.start() ``` ```python from pyspark.streaming import StreamingContext ssc = StreamingContext(spark.sparkContext, 10) ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.cartesian(rdd)) ssc.start() ``` ```python from pyspark.streaming import StreamingContext ssc = StreamingContext(spark.sparkContext, 10) ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.zip(rdd)) ssc.start() ``` ```python from pyspark.streaming import StreamingContext ssc = StreamingContext(spark.sparkContext, 10) ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.zip(rdd).union(rdd.zip(rdd))) ssc.start() ``` ```python from pyspark.streaming import StreamingContext ssc = StreamingContext(spark.sparkContext, 10) ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.zip(rdd).coalesce(1)) ssc.start() ``` ## How was this patch tested? Unit tests were added in `python/pyspark/streaming/tests.py` and manually tested. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark SPARK-17756 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19498.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19498 commit f5a2a884d860e9c8b3f98fc4ae5f10eaf3c1a0a4 Author: hyukjinkwonDate: 2017-10-14T13:50:49Z Workaround to avoid return type mispatch in PythonTransformFunction --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org