GitHub user HyukjinKwon opened a pull request:

    https://github.com/apache/spark/pull/19498

    [SPARK-17756][PYTHON][STREAMING] Workaround to avoid return type mismatch 
in PythonTransformFunction

    ## What changes were proposed in this pull request?
    
    This PR proposes to wrap the transformed rdd within `TransformFunction`. 
`PythonTransformFunction` looks requiring to return `JavaRDD` in `_jrdd`.
    
    
https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/python/pyspark/streaming/util.py#L67
    
    
https://github.com/apache/spark/blob/6ee28423ad1b2e6089b82af64a31d77d3552bb38/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala#L43
    
    However, this could be `JavaPairRDD` by some APIs, for example, `zip` in 
PySpark's RDD API.
    `_jrdd` could be checked as below:
    
    ```python
    >>> rdd.zip(rdd)._jrdd.getClass().toString()
    u'class org.apache.spark.api.java.JavaPairRDD'
    ```
    
    So, here, I wrapped it with `map` so that it ensures returning `JavaRDD`.
    
    ```python
    >>> rdd.zip(rdd).map(lambda x: x)._jrdd.getClass().toString()
    u'class org.apache.spark.api.java.JavaRDD'
    ```
    
    I tried to elaborate some failure cases as below:
    
    ```python
    from pyspark.streaming import StreamingContext
    ssc = StreamingContext(spark.sparkContext, 10)
    ssc.queueStream([sc.range(10)]) \
        .transform(lambda rdd: rdd.cartesian(rdd)) \
        .pprint()
    ssc.start()
    ```
    
    ```python
    from pyspark.streaming import StreamingContext
    ssc = StreamingContext(spark.sparkContext, 10)
    ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.cartesian(rdd))
    ssc.start()
    ```
    
    ```python
    from pyspark.streaming import StreamingContext
    ssc = StreamingContext(spark.sparkContext, 10)
    ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: rdd.zip(rdd))
    ssc.start()
    ```
    
    ```python
    from pyspark.streaming import StreamingContext
    ssc = StreamingContext(spark.sparkContext, 10)
    ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: 
rdd.zip(rdd).union(rdd.zip(rdd)))
    ssc.start()
    ```
    
    ```python
    from pyspark.streaming import StreamingContext
    ssc = StreamingContext(spark.sparkContext, 10)
    ssc.queueStream([sc.range(10)]).foreachRDD(lambda rdd: 
rdd.zip(rdd).coalesce(1))
    ssc.start()
    ```
    
    ## How was this patch tested?
    
    Unit tests were added in `python/pyspark/streaming/tests.py` and manually 
tested.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HyukjinKwon/spark SPARK-17756

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19498.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19498
    
----
commit f5a2a884d860e9c8b3f98fc4ae5f10eaf3c1a0a4
Author: hyukjinkwon <gurwls...@gmail.com>
Date:   2017-10-14T13:50:49Z

    Workaround to avoid return type mispatch in PythonTransformFunction

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to