Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22011#discussion_r208068705
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -2429,6 +2441,29 @@ def _wrap_function(sc, func, deserializer, 
serializer, profiler=None):
                                       sc.pythonVer, broadcast_vars, 
sc._javaAccumulator)
     
     
    +class RDDBarrier(object):
    +
    +    """
    +    .. note:: Experimental
    +
    +    An RDDBarrier turns an RDD into a barrier RDD, which forces Spark to 
launch tasks of the stage
    +    contains this RDD together.
    +    """
    +
    +    def __init__(self, rdd):
    +        self.rdd = rdd
    +        self._jrdd = rdd._jrdd
    +
    +    def mapPartitions(self, f, preservesPartitioning=False):
    --- End diff --
    
    If we expose a package private method to get the annotated RDD with 
`isBarrier=True` in `RDDBarrier`, we can implement `mapPartitions` easily here:
    
    ~~~python
    jBarrierRdd = self._jrdd.rdd.barrier().barrierRdd.javaRdd
    pyBarrierRdd = RDD(self._jrdd.rdd.barrier().barrierRdd.javaRdd)
    pyBarrierRdd.mapPartitions(f, preservesPartitioning)
    ~~~


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to