Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/21353#discussion_r188974319 --- Diff: core/src/main/scala/org/apache/spark/Dependency.scala --- @@ -88,14 +96,53 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( private[spark] val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName) - val shuffleId: Int = _rdd.context.newShuffleId() + val shuffleId: Int = if (isContinuous) { + // This will not be reset in continuous processing, set an invalid value for now. + Int.MinValue + } else { + _rdd.context.newShuffleId() + } - val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( - shuffleId, _rdd.partitions.length, this) + val shuffleHandle: ShuffleHandle = if (isContinuous) { + null + } else { + _rdd.context.env.shuffleManager.registerShuffle( + shuffleId, _rdd.partitions.length, this) + } - _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) + if (!isContinuous) { + _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) + } } +/** + * :: DeveloperApi :: + * Represents a dependency on the output of a shuffle stage of continuous type. + * Different with ShuffleDependency, the continuous dependency only create on Executor side, + * so the rdd in param is deserialized from taskBinary. + */ +@DeveloperApi +class ContinuousShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( --- End diff -- Changes about ShuffleDependency same in #21293
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org