This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0512b3f [SPARK-31134][SQL] optimize skew join after shuffle partitions are coalesced 0512b3f is described below commit 0512b3f427274c8bda249fba02cd16f5694a4ea5 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Mar 17 00:23:16 2020 -0700 [SPARK-31134][SQL] optimize skew join after shuffle partitions are coalesced ### What changes were proposed in this pull request? Run the `OptimizeSkewedJoin` rule after the `CoalesceShufflePartitions` rule. ### Why are the changes needed? Remove duplicated coalescing code in `OptimizeSkewedJoin`. ### Does this PR introduce any user-facing change? No ### How was this patch tested? existing tests Closes #27893 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> (cherry picked from commit 30d95356f1881c32eb39e51525d2bcb331fcf867) Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 9 +- .../adaptive/CoalesceShufflePartitions.scala | 2 - .../execution/adaptive/OptimizeSkewedJoin.scala | 272 ++++++++++----------- .../execution/adaptive/ShufflePartitionsUtil.scala | 18 +- .../sql/execution/ShufflePartitionsUtilSuite.scala | 2 - 5 files changed, 146 insertions(+), 157 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 68da06d..b54a32f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -96,13 +96,10 @@ case class AdaptiveSparkPlanExec( // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, context.subqueryCache), - // Here the 'OptimizeSkewedJoin' rule should be executed - // before 'CoalesceShufflePartitions', as the skewed partition handled - // in 'OptimizeSkewedJoin' rule, should be omitted in 'CoalesceShufflePartitions'. - OptimizeSkewedJoin(conf), CoalesceShufflePartitions(context.session), - // The rule of 'OptimizeLocalShuffleReader' need to make use of the 'partitionStartIndices' - // in 'CoalesceShufflePartitions' rule. So it must be after 'CoalesceShufflePartitions' rule. + // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs' + // added by `CoalesceShufflePartitions`. So they must be executed after it. + OptimizeSkewedJoin(conf), OptimizeLocalShuffleReader(conf), ApplyColumnarRulesAndInsertTransitions(conf, context.session.sessionState.columnarRules), CollapseCodegenStages(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index d2a7f6a..226d692 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -74,8 +74,6 @@ case class CoalesceShufflePartitions(session: SparkSession) extends Rule[SparkPl .getOrElse(session.sparkContext.defaultParallelism) val partitionSpecs = ShufflePartitionsUtil.coalescePartitions( validMetrics.toArray, - firstPartitionIndex = 0, - lastPartitionIndex = distinctNumPreShufflePartitions.head, advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES), minNumPartitions = minPartitionNum) // This transformation adds new nodes, so we must use `transformUp` here. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index db65af6..e02b9af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.commons.io.FileUtils -import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkContext, SparkEnv} +import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ @@ -83,14 +83,14 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { /** * The goal of skew join optimization is to make the data distribution more even. The target size * to split skewed partitions is the average size of non-skewed partition, or the - * target post-shuffle partition size if avg size is smaller than it. + * advisory partition size if avg size is smaller than it. */ - private def targetSize(stats: MapOutputStatistics, medianSize: Long): Long = { - val targetPostShuffleSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) - val nonSkewSizes = stats.bytesByPartitionId.filterNot(isSkewed(_, medianSize)) + private def targetSize(sizes: Seq[Long], medianSize: Long): Long = { + val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) + val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize)) // It's impossible that all the partitions are skewed, as we use median size to define skew. assert(nonSkewSizes.nonEmpty) - math.max(targetPostShuffleSize, nonSkewSizes.sum / nonSkewSizes.length) + math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length) } /** @@ -102,21 +102,29 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { } /** - * Split the skewed partition based on the map size and the max split number. + * Splits the skewed partition based on the map size and the target partition size + * after split, and create a list of `PartialMapperPartitionSpec`. Returns None if can't split. */ - private def getMapStartIndices( - stage: ShuffleQueryStageExec, - partitionId: Int, - targetSize: Long): Seq[Int] = { - val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId - val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId) - ShufflePartitionsUtil.splitSizeListByTargetSize(mapPartitionSizes, targetSize) - } - - private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics = { - assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" + - " already be ready when executing OptimizeSkewedPartitions rule") - stage.resultOption.get.asInstanceOf[MapOutputStatistics] + private def createSkewPartitionSpecs( + shuffleId: Int, + reducerId: Int, + targetSize: Long): Option[Seq[PartialReducerPartitionSpec]] = { + val mapPartitionSizes = getMapSizesForReduceId(shuffleId, reducerId) + val mapStartIndices = ShufflePartitionsUtil.splitSizeListByTargetSize( + mapPartitionSizes, targetSize) + if (mapStartIndices.length > 1) { + Some(mapStartIndices.indices.map { i => + val startMapIndex = mapStartIndices(i) + val endMapIndex = if (i == mapStartIndices.length - 1) { + mapPartitionSizes.length + } else { + mapStartIndices(i + 1) + } + PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex) + }) + } else { + None + } } private def canSplitLeftSide(joinType: JoinType) = { @@ -128,12 +136,9 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { joinType == Inner || joinType == Cross || joinType == RightOuter } - private def getNumMappers(stage: ShuffleQueryStageExec): Int = { - stage.shuffle.shuffleDependency.rdd.partitions.length - } - - private def getSizeInfo(medianSize: Long, maxSize: Long): String = { - s"median size: $medianSize, max size: ${maxSize}" + private def getSizeInfo(medianSize: Long, sizes: Seq[Long]): String = { + s"median size: $medianSize, max size: ${sizes.max}, min size: ${sizes.min}, avg size: " + + sizes.sum / sizes.length } /* @@ -150,101 +155,90 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { */ def optimizeSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp { case smj @ SortMergeJoinExec(_, _, joinType, _, - s1 @ SortExec(_, _, left: ShuffleQueryStageExec, _), - s2 @ SortExec(_, _, right: ShuffleQueryStageExec, _), _) + s1 @ SortExec(_, _, ShuffleStage(left: ShuffleStageInfo), _), + s2 @ SortExec(_, _, ShuffleStage(right: ShuffleStageInfo), _), _) if supportedJoinTypes.contains(joinType) => - val leftStats = getStatistics(left) - val rightStats = getStatistics(right) - val numPartitions = leftStats.bytesByPartitionId.length - - val leftMedSize = medianSize(leftStats) - val rightMedSize = medianSize(rightStats) + assert(left.partitionsWithSizes.length == right.partitionsWithSizes.length) + val numPartitions = left.partitionsWithSizes.length + // We use the median size of the original shuffle partitions to detect skewed partitions. + val leftMedSize = medianSize(left.mapStats) + val rightMedSize = medianSize(right.mapStats) logDebug( s""" - |Try to optimize skewed join. - |Left side partition size: - |${getSizeInfo(leftMedSize, leftStats.bytesByPartitionId.max)} - |Right side partition size: - |${getSizeInfo(rightMedSize, rightStats.bytesByPartitionId.max)} + |Optimizing skewed join. + |Left side partitions size info: + |${getSizeInfo(leftMedSize, left.mapStats.bytesByPartitionId)} + |Right side partitions size info: + |${getSizeInfo(rightMedSize, right.mapStats.bytesByPartitionId)} """.stripMargin) val canSplitLeft = canSplitLeftSide(joinType) val canSplitRight = canSplitRightSide(joinType) - val leftTargetSize = targetSize(leftStats, leftMedSize) - val rightTargetSize = targetSize(rightStats, rightMedSize) + // We use the actual partition sizes (may be coalesced) to calculate target size, so that + // the final data distribution is even (coalesced partitions + split partitions). + val leftActualSizes = left.partitionsWithSizes.map(_._2) + val rightActualSizes = right.partitionsWithSizes.map(_._2) + val leftTargetSize = targetSize(leftActualSizes, leftMedSize) + val rightTargetSize = targetSize(rightActualSizes, rightMedSize) val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] - // This is used to delay the creation of non-skew partitions so that we can potentially - // coalesce them like `CoalesceShufflePartitions` does. - val nonSkewPartitionIndices = mutable.ArrayBuffer.empty[Int] val leftSkewDesc = new SkewDesc val rightSkewDesc = new SkewDesc for (partitionIndex <- 0 until numPartitions) { - val leftSize = leftStats.bytesByPartitionId(partitionIndex) - val isLeftSkew = isSkewed(leftSize, leftMedSize) && canSplitLeft - val rightSize = rightStats.bytesByPartitionId(partitionIndex) - val isRightSkew = isSkewed(rightSize, rightMedSize) && canSplitRight - if (isLeftSkew || isRightSkew) { - if (nonSkewPartitionIndices.nonEmpty) { - // As soon as we see a skew, we'll "flush" out unhandled non-skew partitions. - createNonSkewPartitions(leftStats, rightStats, nonSkewPartitionIndices).foreach { p => - leftSidePartitions += p - rightSidePartitions += p - } - nonSkewPartitionIndices.clear() - } - - val leftParts = if (isLeftSkew) { - val mapStartIndices = getMapStartIndices(left, partitionIndex, leftTargetSize) - if (mapStartIndices.length > 1) { - leftSkewDesc.addPartitionSize(leftSize) - createSkewPartitions(partitionIndex, mapStartIndices, getNumMappers(left)) - } else { - Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1)) - } - } else { - Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1)) - } - - val rightParts = if (isRightSkew) { - val mapStartIndices = getMapStartIndices(right, partitionIndex, rightTargetSize) - if (mapStartIndices.length > 1) { - rightSkewDesc.addPartitionSize(rightSize) - createSkewPartitions(partitionIndex, mapStartIndices, getNumMappers(right)) - } else { - Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1)) - } - } else { - Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1)) + val isLeftSkew = isSkewed(leftActualSizes(partitionIndex), leftMedSize) && canSplitLeft + val leftPartSpec = left.partitionsWithSizes(partitionIndex)._1 + val isLeftCoalesced = leftPartSpec.startReducerIndex + 1 < leftPartSpec.endReducerIndex + + val isRightSkew = isSkewed(rightActualSizes(partitionIndex), rightMedSize) && canSplitRight + val rightPartSpec = right.partitionsWithSizes(partitionIndex)._1 + val isRightCoalesced = rightPartSpec.startReducerIndex + 1 < rightPartSpec.endReducerIndex + + // A skewed partition should never be coalesced, but skip it here just to be safe. + val leftParts = if (isLeftSkew && !isLeftCoalesced) { + val reducerId = leftPartSpec.startReducerIndex + val skewSpecs = createSkewPartitionSpecs( + left.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, leftTargetSize) + if (skewSpecs.isDefined) { + logDebug(s"Left side partition $partitionIndex is skewed, split it into " + + s"${skewSpecs.get.length} parts.") + leftSkewDesc.addPartitionSize(leftActualSizes(partitionIndex)) } + skewSpecs.getOrElse(Seq(leftPartSpec)) + } else { + Seq(leftPartSpec) + } - for { - leftSidePartition <- leftParts - rightSidePartition <- rightParts - } { - leftSidePartitions += leftSidePartition - rightSidePartitions += rightSidePartition + // A skewed partition should never be coalesced, but skip it here just to be safe. + val rightParts = if (isRightSkew && !isRightCoalesced) { + val reducerId = rightPartSpec.startReducerIndex + val skewSpecs = createSkewPartitionSpecs( + right.shuffleStage.shuffle.shuffleDependency.shuffleId, reducerId, rightTargetSize) + if (skewSpecs.isDefined) { + logDebug(s"Right side partition $partitionIndex is skewed, split it into " + + s"${skewSpecs.get.length} parts.") + rightSkewDesc.addPartitionSize(rightActualSizes(partitionIndex)) } + skewSpecs.getOrElse(Seq(rightPartSpec)) } else { - // Add to `nonSkewPartitionIndices` first, and add real partitions later, in case we can - // coalesce the non-skew partitions. - nonSkewPartitionIndices += partitionIndex - // If this is the last partition, add real partition immediately. - if (partitionIndex == numPartitions - 1) { - createNonSkewPartitions(leftStats, rightStats, nonSkewPartitionIndices).foreach { p => - leftSidePartitions += p - rightSidePartitions += p - } - nonSkewPartitionIndices.clear() - } + Seq(rightPartSpec) + } + + for { + leftSidePartition <- leftParts + rightSidePartition <- rightParts + } { + leftSidePartitions += leftSidePartition + rightSidePartitions += rightSidePartition } } logDebug("number of skewed partitions: " + s"left ${leftSkewDesc.numPartitions}, right ${rightSkewDesc.numPartitions}") if (leftSkewDesc.numPartitions > 0 || rightSkewDesc.numPartitions > 0) { - val newLeft = CustomShuffleReaderExec(left, leftSidePartitions, leftSkewDesc.toString) - val newRight = CustomShuffleReaderExec(right, rightSidePartitions, rightSkewDesc.toString) + val newLeft = CustomShuffleReaderExec( + left.shuffleStage, leftSidePartitions, leftSkewDesc.toString) + val newRight = CustomShuffleReaderExec( + right.shuffleStage, rightSidePartitions, rightSkewDesc.toString) smj.copy( left = s1.copy(child = newLeft), right = s2.copy(child = newRight), isSkewJoin = true) } else { @@ -252,44 +246,6 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { } } - private def createNonSkewPartitions( - leftStats: MapOutputStatistics, - rightStats: MapOutputStatistics, - nonSkewPartitionIndices: Seq[Int]): Seq[ShufflePartitionSpec] = { - assert(nonSkewPartitionIndices.nonEmpty) - val shouldCoalesce = conf.getConf(SQLConf.COALESCE_PARTITIONS_ENABLED) - if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) { - nonSkewPartitionIndices.map(i => CoalescedPartitionSpec(i, i + 1)) - } else { - // We fall back to Spark default parallelism if the minimum number of coalesced partitions - // is not set, so to avoid perf regressions compared to no coalescing. - val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM) - .getOrElse(SparkContext.getActive.get.defaultParallelism) - ShufflePartitionsUtil.coalescePartitions( - Array(leftStats, rightStats), - firstPartitionIndex = nonSkewPartitionIndices.head, - // `lastPartitionIndex` is exclusive. - lastPartitionIndex = nonSkewPartitionIndices.last + 1, - advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES), - minNumPartitions = minPartitionNum) - } - } - - private def createSkewPartitions( - reducerIndex: Int, - mapStartIndices: Seq[Int], - numMappers: Int): Seq[PartialReducerPartitionSpec] = { - mapStartIndices.indices.map { i => - val startMapIndex = mapStartIndices(i) - val endMapIndex = if (i == mapStartIndices.length - 1) { - numMappers - } else { - mapStartIndices(i + 1) - } - PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex) - } - } - override def apply(plan: SparkPlan): SparkPlan = { if (!conf.getConf(SQLConf.SKEW_JOIN_ENABLED)) { return plan @@ -328,6 +284,48 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { } } +private object ShuffleStage { + def unapply(plan: SparkPlan): Option[ShuffleStageInfo] = plan match { + case s: ShuffleQueryStageExec => + val mapStats = getMapStats(s) + val sizes = mapStats.bytesByPartitionId + val partitions = sizes.zipWithIndex.map { + case (size, i) => CoalescedPartitionSpec(i, i + 1) -> size + } + Some(ShuffleStageInfo(s, mapStats, partitions)) + + case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs, _) => + val mapStats = getMapStats(s) + val sizes = mapStats.bytesByPartitionId + val partitions = partitionSpecs.map { + case spec @ CoalescedPartitionSpec(start, end) => + var sum = 0L + var i = start + while (i < end) { + sum += sizes(i) + i += 1 + } + spec -> sum + case other => throw new IllegalArgumentException( + s"Expect CoalescedPartitionSpec but got $other") + } + Some(ShuffleStageInfo(s, mapStats, partitions)) + + case _ => None + } + + private def getMapStats(stage: ShuffleQueryStageExec): MapOutputStatistics = { + assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" + + " already be ready when executing OptimizeSkewedPartitions rule") + stage.resultOption.get.asInstanceOf[MapOutputStatistics] + } +} + +private case class ShuffleStageInfo( + shuffleStage: ShuffleQueryStageExec, + mapStats: MapOutputStatistics, + partitionsWithSizes: Seq[(CoalescedPartitionSpec, Long)]) + private class SkewDesc { private[this] var numSkewedPartitions: Int = 0 private[this] var totalSize: Long = 0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 292df11..208cc05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -28,10 +28,9 @@ object ShufflePartitionsUtil extends Logging { final val MERGED_PARTITION_FACTOR = 1.2 /** - * Coalesce the same range of partitions (`firstPartitionIndex` to `lastPartitionIndex`, the - * start is inclusive and the end is exclusive) from multiple shuffles. This method assumes that - * all the shuffles have the same number of partitions, and the partitions of same index will be - * read together by one task. + * Coalesce the partitions from multiple shuffles. This method assumes that all the shuffles + * have the same number of partitions, and the partitions of same index will be read together + * by one task. * * The strategy used to determine the number of coalesced partitions is described as follows. * To determine the number of coalesced partitions, we have a target size for a coalesced @@ -56,8 +55,6 @@ object ShufflePartitionsUtil extends Logging { */ def coalescePartitions( mapOutputStatistics: Array[MapOutputStatistics], - firstPartitionIndex: Int, - lastPartitionIndex: Int, advisoryTargetSize: Long, minNumPartitions: Int): Seq[ShufflePartitionSpec] = { // If `minNumPartitions` is very large, it is possible that we need to use a value less than @@ -87,11 +84,12 @@ object ShufflePartitionsUtil extends Logging { "There should be only one distinct value of the number of shuffle partitions " + "among registered Exchange operators.") + val numPartitions = distinctNumShufflePartitions.head val partitionSpecs = ArrayBuffer[CoalescedPartitionSpec]() - var latestSplitPoint = firstPartitionIndex + var latestSplitPoint = 0 var coalescedSize = 0L - var i = firstPartitionIndex - while (i < lastPartitionIndex) { + var i = 0 + while (i < numPartitions) { // We calculate the total size of i-th shuffle partitions from all shuffles. var totalSizeOfCurrentPartition = 0L var j = 0 @@ -112,7 +110,7 @@ object ShufflePartitionsUtil extends Logging { } i += 1 } - partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, lastPartitionIndex) + partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions) partitionSpecs } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala index 2cd4c98..7acc33c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala @@ -33,8 +33,6 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite { } val estimatedPartitionStartIndices = ShufflePartitionsUtil.coalescePartitions( mapOutputStatistics, - 0, - bytesByPartitionIdArray.head.length, targetSize, minNumPartitions) assert(estimatedPartitionStartIndices === expectedPartitionStartIndices) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org