Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22112#discussion_r212193206
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
    @@ -1865,6 +1876,39 @@ abstract class RDD[T: ClassTag](
       // RDD chain.
       @transient protected lazy val isBarrier_ : Boolean =
         dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, 
_]]).exists(_.rdd.isBarrier())
    +
    +  /**
    +   * Returns the random level of this RDD's computing function. Please 
refer to [[RDD.RandomLevel]]
    +   * for the definition of random level.
    +   *
    +   * By default, an RDD without parents(root RDD) is IDEMPOTENT. For RDDs 
with parents, the random
    +   * level of current RDD is the random level of the parent which is 
random most.
    +   */
    +  // TODO: make it public so users can set random level to their custom 
RDDs.
    +  // TODO: this can be per-partition. e.g. UnionRDD can have different 
random level for different
    +  // partitions.
    +  private[spark] def computingRandomLevel: RDD.RandomLevel.Value = {
    +    val parentRandomLevels = dependencies.map {
    +      case dep: ShuffleDependency[_, _, _] =>
    +        if (dep.rdd.computingRandomLevel == RDD.RandomLevel.INDETERMINATE) 
{
    +          RDD.RandomLevel.INDETERMINATE
    --- End diff --
    
    It does not matter what the parent RDD's order was - after shuffle, 
currently, it is going to be always UNORDERED - unless Aggregator and key 
ordering is specified in dep.
    Given [this 
comment](https://github.com/apache/spark/pull/22112#issuecomment-414034703), 
and adding a few missing cases, this becomes:
    ```
      // If checkpointed already - then always same order
      case dep: Dependency if dep.rdd.isCheckpointed => 
RDD.RandomLevel.IDEMPOTENT
      // if same partitioner, then shuffle not done.
      case dep: ShuffleDependency[_, _, _] if dep.partitioner == partitioner => 
dep.rdd.computingRandomLevel
      // if aggregator specified (and so unique keys) and key ordering 
specified - then consistent ordering.
      case dep: ShuffleDependency[_, _, _] if dep.keyOrdering.isDefined && 
dep.aggregator.isDefined =>
        RDD.RandomLevel.IDEMPOTENT
      // All other shuffle cases, we dont know the output order in spark.
      case dep: ShuffleDependency[_, _, _] => RDD.RandomLevel.INDETERMINATE
    ```



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to