This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bd09a3e44f1 [SPARK-42039][SQL] SPJ: Remove Option in KeyGroupedPartitioning#partitionValuesOpt bd09a3e44f1 is described below commit bd09a3e44f11153b7e32a153ce5d0b5d0da4ce0c Author: Chao Sun <sunc...@apple.com> AuthorDate: Tue Jan 17 11:51:47 2023 -0800 [SPARK-42039][SQL] SPJ: Remove Option in KeyGroupedPartitioning#partitionValuesOpt ### What changes were proposed in this pull request? Currently `KeyGroupedPartitioning#partitionValuesOpt` is of type: `Option[Seq[InternalRow]]`. This refactors it into `Seq[InternalRow]`. ### Why are the changes needed? It's unnecessary to use `Option` for the field. Originally I was thinking to use `None` for the case when all the input partitions are implicitly matched, so that we can skip comparing them in `EnsureRequirements`. However, I think it is not really a use case, and we can instead use `Seq.empty` for that if it comes up. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #39540 from sunchao/SPARK-42039. Authored-by: Chao Sun <sunc...@apple.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/sql/catalyst/plans/physical/partitioning.scala | 16 +++++++--------- .../sql/execution/datasources/v2/BatchScanExec.scala | 6 +++--- .../datasources/v2/DataSourceV2ScanExecBase.scala | 2 +- .../sql/execution/exchange/EnsureRequirements.scala | 7 ++----- .../sql/execution/exchange/EnsureRequirementsSuite.scala | 16 ++++++++-------- 5 files changed, 21 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index e6eaeda2d0c..73d39a19243 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -324,13 +324,13 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) * * @param expressions partition expressions for the partitioning. * @param numPartitions the number of partitions - * @param partitionValuesOpt if set, the values for the cluster keys of the distribution, must be - * in ascending order. + * @param partitionValues the values for the cluster keys of the distribution, must be + * in ascending order. */ case class KeyGroupedPartitioning( expressions: Seq[Expression], numPartitions: Int, - partitionValuesOpt: Option[Seq[InternalRow]] = None) extends Partitioning { + partitionValues: Seq[InternalRow] = Seq.empty) extends Partitioning { override def satisfies0(required: Distribution): Boolean = { super.satisfies0(required) || { @@ -360,7 +360,7 @@ object KeyGroupedPartitioning { def apply( expressions: Seq[Expression], partitionValues: Seq[InternalRow]): KeyGroupedPartitioning = { - KeyGroupedPartitioning(expressions, partitionValues.size, Some(partitionValues)) + KeyGroupedPartitioning(expressions, partitionValues.size, partitionValues) } def supportsExpressions(expressions: Seq[Expression]): Boolean = { @@ -692,14 +692,12 @@ case class KeyGroupedShuffleSpec( // partition keys must share overlapping positions in their respective clustering keys. // 3.3 each pair of partition expressions at the same index must share compatible // transform functions. - // 4. the partition values, if present on both sides, are following the same order. + // 4. the partition values from both sides are following the same order. case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning, otherDistribution) => distribution.clustering.length == otherDistribution.clustering.length && numPartitions == other.numPartitions && areKeysCompatible(otherSpec) && - partitioning.partitionValuesOpt.zip(otherPartitioning.partitionValuesOpt).forall { - case (left, right) => left.zip(right).forall { case (l, r) => - ordering.compare(l, r) == 0 - } + partitioning.partitionValues.zip(otherPartitioning.partitionValues).forall { + case (left, right) => ordering.compare(left, right) == 0 } case ShuffleSpecCollection(specs) => specs.exists(isCompatibleWith) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala index 025b1a3c38f..d6b76ae1096 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala @@ -84,7 +84,7 @@ case class BatchScanExec( val newRows = new InternalRowSet(p.expressions.map(_.dataType)) newRows ++= newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey()) - val oldRows = p.partitionValuesOpt.get.toSet + val oldRows = p.partitionValues.toSet // We require the new number of partition keys to be equal or less than the old number // of partition keys here. In the case of less than, empty partitions will be added for // those missing keys that are not present in the new input partitions. @@ -116,7 +116,7 @@ case class BatchScanExec( super.outputPartitioning match { case k: KeyGroupedPartitioning if commonPartitionValues.isDefined => val values = commonPartitionValues.get - k.copy(numPartitions = values.length, partitionValuesOpt = Some(values)) + k.copy(numPartitions = values.length, partitionValues = values) case p => p } } @@ -134,7 +134,7 @@ case class BatchScanExec( case p: KeyGroupedPartitioning => val partitionMapping = finalPartitions.map(s => s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap - finalPartitions = p.partitionValuesOpt.get.map { partValue => + finalPartitions = p.partitionValues.map { partValue => // Use empty partition for those partition values that are not present partitionMapping.getOrElse(partValue, Seq.empty) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index fa4ae171df5..556ae4afb63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -97,7 +97,7 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { keyGroupedPartitioning match { case Some(exprs) if KeyGroupedPartitioning.supportsExpressions(exprs) => groupedPartitions.map { partitionValues => - KeyGroupedPartitioning(exprs, partitionValues.size, Some(partitionValues.map(_._1))) + KeyGroupedPartitioning(exprs, partitionValues.size, partitionValues.map(_._1)) }.getOrElse(super.outputPartitioning) case _ => super.outputPartitioning diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 7706b26af70..f88436297e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -198,11 +198,8 @@ case class EnsureRequirements( // Check if the two children are partition keys compatible. If so, find the // common set of partition values, and adjust the plan accordingly. if (leftSpec.areKeysCompatible(rightSpec)) { - assert(leftSpec.partitioning.partitionValuesOpt.isDefined) - assert(rightSpec.partitioning.partitionValuesOpt.isDefined) - - val leftPartValues = leftSpec.partitioning.partitionValuesOpt.get - val rightPartValues = rightSpec.partitioning.partitionValuesOpt.get + val leftPartValues = leftSpec.partitioning.partitionValues + val rightPartValues = rightSpec.partitioning.partitionValues val mergedPartValues = Utils.mergeOrdered( Seq(leftPartValues, rightPartValues))(leftSpec.ordering).toSeq.distinct diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala index 7cfa00b4168..bc1fd7a5fa5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala @@ -1024,11 +1024,11 @@ class EnsureRequirementsSuite extends SharedSparkSession { var plan1 = DummySparkPlan( outputPartitioning = KeyGroupedPartitioning(bucket(4, exprB) :: bucket(8, exprC) :: Nil, - leftPartValues.length, Some(leftPartValues)) + leftPartValues.length, leftPartValues) ) var plan2 = DummySparkPlan( outputPartitioning = KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB) :: Nil, - rightPartValues.length, Some(rightPartValues)) + rightPartValues.length, rightPartValues) ) // simple case @@ -1047,9 +1047,9 @@ class EnsureRequirementsSuite extends SharedSparkSession { plan1 = DummySparkPlan(outputPartitioning = PartitioningCollection( Seq(KeyGroupedPartitioning(bucket(4, exprB) :: bucket(8, exprC) :: Nil, - leftPartValues.length, Some(leftPartValues)), + leftPartValues.length, leftPartValues), KeyGroupedPartitioning(bucket(4, exprB) :: bucket(8, exprC) :: Nil, - leftPartValues.length, Some(leftPartValues))) + leftPartValues.length, leftPartValues)) ) ) @@ -1074,15 +1074,15 @@ class EnsureRequirementsSuite extends SharedSparkSession { PartitioningCollection( Seq( KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB) :: Nil, - rightPartValues.length, Some(rightPartValues)), + rightPartValues.length, rightPartValues), KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB) :: Nil, - rightPartValues.length, Some(rightPartValues)))), + rightPartValues.length, rightPartValues))), PartitioningCollection( Seq( KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB) :: Nil, - rightPartValues.length, Some(rightPartValues)), + rightPartValues.length, rightPartValues), KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB) :: Nil, - rightPartValues.length, Some(rightPartValues)))) + rightPartValues.length, rightPartValues))) ) ) ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org