Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19498#discussion_r146531880 --- Diff: python/pyspark/streaming/util.py --- @@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds): t = datetime.fromtimestamp(milliseconds / 1000.0) r = self.func(t, *rdds) if r: - return r._jrdd + # Here, we work around to ensure `_jrdd` is `JavaRDD` by wrapping it by `map`. + # org.apache.spark.streaming.api.python.PythonTransformFunction requires to return + # `JavaRDD`; however, this could be `JavaPairRDD` by some APIs, for example, `zip`. + # See SPARK-17756. + return r.map(lambda x: x)._jrdd --- End diff -- Yup, it guess it would but I could not think of a better and simpler way (and I saw this workaround is already used above in `python/pyspark/streaming/context.py`). If a little bit more uglier (possibly flaky imho) way is okay for this concern, I could do this, for example, as below: ```python if r._jrdd.getClass().toString() == "JavaRDD": return r._jrdd else: return r.map(lambda x: x)._jrdd ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org