Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19498#discussion_r146531880
  
    --- Diff: python/pyspark/streaming/util.py ---
    @@ -64,7 +64,11 @@ def call(self, milliseconds, jrdds):
                 t = datetime.fromtimestamp(milliseconds / 1000.0)
                 r = self.func(t, *rdds)
                 if r:
    -                return r._jrdd
    +                # Here, we work around to ensure `_jrdd` is `JavaRDD` by 
wrapping it by `map`.
    +                # 
org.apache.spark.streaming.api.python.PythonTransformFunction requires to return
    +                # `JavaRDD`; however, this could be `JavaPairRDD` by some 
APIs, for example, `zip`.
    +                # See SPARK-17756.
    +                return r.map(lambda x: x)._jrdd
    --- End diff --
    
    Yup, it guess it would but I could not think of a better and simpler way 
(and I saw this workaround is already used above in 
`python/pyspark/streaming/context.py`). If a little bit more uglier (possibly 
flaky imho) way is okay for this concern, I could do this, for example, as 
below:
    
    ```python
    if r._jrdd.getClass().toString() == "JavaRDD":
        return r._jrdd
    else:
        return r.map(lambda x: x)._jrdd
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to