Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21293#discussion_r187639422 --- Diff: core/src/main/scala/org/apache/spark/Dependency.scala --- @@ -88,14 +90,53 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( private[spark] val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName) - val shuffleId: Int = _rdd.context.newShuffleId() + val shuffleId: Int = if (isContinuous) { + // This will not be reset in continuous processing, set an invalid value for now. + Int.MinValue + } else { + _rdd.context.newShuffleId() + } - val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( - shuffleId, _rdd.partitions.length, this) + val shuffleHandle: ShuffleHandle = if (isContinuous) { + null + } else { + _rdd.context.env.shuffleManager.registerShuffle( + shuffleId, _rdd.partitions.length, this) + } - _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) + if (!isContinuous) { + _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) + } } +/** + * :: DeveloperApi :: + * Represents a dependency on the output of a shuffle stage of continuous type. + * Different with ShuffleDependency, the continuous dependency only create on Executor side, + * so the rdd in param is deserialized from taskBinary. + */ +@DeveloperApi +class ContinuousShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( + rdd: RDD[_ <: Product2[K, V]], + dep: ShuffleDependency[K, V, C], + continuousEpoch: Int, + totalShuffleNum: Int, + shuffleNumMaps: Int) + extends ShuffleDependency[K, V, C]( + rdd, + dep.partitioner, + dep.serializer, + dep.keyOrdering, + dep.aggregator, + dep.mapSideCombine, true) { + + val baseShuffleId: Int = dep.shuffleId + + override val shuffleId: Int = continuousEpoch * totalShuffleNum + baseShuffleId --- End diff -- Sorry, I'm not sure I understand. The SparkContext is responsible for assigning shuffle IDs, so it's not valid to register a ContinuousShuffleDependency with a shuffle ID which SparkContext did not assign.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org