Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21293#discussion_r187597922
  
    --- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
    @@ -88,14 +90,53 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
       private[spark] val combinerClassName: Option[String] =
         Option(reflect.classTag[C]).map(_.runtimeClass.getName)
     
    -  val shuffleId: Int = _rdd.context.newShuffleId()
    +  val shuffleId: Int = if (isContinuous) {
    +    // This will not be reset in continuous processing, set an invalid 
value for now.
    +    Int.MinValue
    +  } else {
    +    _rdd.context.newShuffleId()
    +  }
     
    -  val shuffleHandle: ShuffleHandle = 
_rdd.context.env.shuffleManager.registerShuffle(
    -    shuffleId, _rdd.partitions.length, this)
    +  val shuffleHandle: ShuffleHandle = if (isContinuous) {
    +    null
    +  } else {
    +    _rdd.context.env.shuffleManager.registerShuffle(
    +      shuffleId, _rdd.partitions.length, this)
    +  }
     
    -  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    +  if (!isContinuous) {
    +    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
    +  }
     }
     
    +/**
    + * :: DeveloperApi ::
    + * Represents a dependency on the output of a shuffle stage of continuous 
type.
    + * Different with ShuffleDependency, the continuous dependency only create 
on Executor side,
    + * so the rdd in param is deserialized from taskBinary.
    + */
    +@DeveloperApi
    +class ContinuousShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    +    rdd: RDD[_ <: Product2[K, V]],
    +    dep: ShuffleDependency[K, V, C],
    +    continuousEpoch: Int,
    +    totalShuffleNum: Int,
    +    shuffleNumMaps: Int)
    +  extends ShuffleDependency[K, V, C](
    +    rdd,
    +    dep.partitioner,
    +    dep.serializer,
    +    dep.keyOrdering,
    +    dep.aggregator,
    +    dep.mapSideCombine, true) {
    +
    +  val baseShuffleId: Int = dep.shuffleId
    +
    +  override val shuffleId: Int = continuousEpoch * totalShuffleNum + 
baseShuffleId
    --- End diff --
    
    Here we use the generated shuffleId only for shuffle registering and 
identifying, can we consider the DAG never change and each epoch will has same 
dependency just with diff shuffleId?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to