This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bd09a3e44f1 [SPARK-42039][SQL] SPJ: Remove Option in 
KeyGroupedPartitioning#partitionValuesOpt
bd09a3e44f1 is described below

commit bd09a3e44f11153b7e32a153ce5d0b5d0da4ce0c
Author: Chao Sun <sunc...@apple.com>
AuthorDate: Tue Jan 17 11:51:47 2023 -0800

    [SPARK-42039][SQL] SPJ: Remove Option in 
KeyGroupedPartitioning#partitionValuesOpt
    
    ### What changes were proposed in this pull request?
    
    Currently `KeyGroupedPartitioning#partitionValuesOpt` is of type: 
`Option[Seq[InternalRow]]`. This refactors it into
    `Seq[InternalRow]`.
    
    ### Why are the changes needed?
    
    It's unnecessary to use `Option` for the field. Originally I was thinking 
to use `None` for the case when all the input partitions are implicitly 
matched, so that we can skip comparing them in `EnsureRequirements`. However, I 
think it is not really a use case, and we can instead use `Seq.empty` for that 
if it comes up.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #39540 from sunchao/SPARK-42039.
    
    Authored-by: Chao Sun <sunc...@apple.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../spark/sql/catalyst/plans/physical/partitioning.scala | 16 +++++++---------
 .../sql/execution/datasources/v2/BatchScanExec.scala     |  6 +++---
 .../datasources/v2/DataSourceV2ScanExecBase.scala        |  2 +-
 .../sql/execution/exchange/EnsureRequirements.scala      |  7 ++-----
 .../sql/execution/exchange/EnsureRequirementsSuite.scala | 16 ++++++++--------
 5 files changed, 21 insertions(+), 26 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index e6eaeda2d0c..73d39a19243 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -324,13 +324,13 @@ case class HashPartitioning(expressions: Seq[Expression], 
numPartitions: Int)
  *
  * @param expressions partition expressions for the partitioning.
  * @param numPartitions the number of partitions
- * @param partitionValuesOpt if set, the values for the cluster keys of the 
distribution, must be
- *                           in ascending order.
+ * @param partitionValues the values for the cluster keys of the distribution, 
must be
+ *                        in ascending order.
  */
 case class KeyGroupedPartitioning(
     expressions: Seq[Expression],
     numPartitions: Int,
-    partitionValuesOpt: Option[Seq[InternalRow]] = None) extends Partitioning {
+    partitionValues: Seq[InternalRow] = Seq.empty) extends Partitioning {
 
   override def satisfies0(required: Distribution): Boolean = {
     super.satisfies0(required) || {
@@ -360,7 +360,7 @@ object KeyGroupedPartitioning {
   def apply(
       expressions: Seq[Expression],
       partitionValues: Seq[InternalRow]): KeyGroupedPartitioning = {
-    KeyGroupedPartitioning(expressions, partitionValues.size, 
Some(partitionValues))
+    KeyGroupedPartitioning(expressions, partitionValues.size, partitionValues)
   }
 
   def supportsExpressions(expressions: Seq[Expression]): Boolean = {
@@ -692,14 +692,12 @@ case class KeyGroupedShuffleSpec(
     //        partition keys must share overlapping positions in their 
respective clustering keys.
     //    3.3 each pair of partition expressions at the same index must share 
compatible
     //        transform functions.
-    //  4. the partition values, if present on both sides, are following the 
same order.
+    //  4. the partition values from both sides are following the same order.
     case otherSpec @ KeyGroupedShuffleSpec(otherPartitioning, 
otherDistribution) =>
       distribution.clustering.length == otherDistribution.clustering.length &&
         numPartitions == other.numPartitions && areKeysCompatible(otherSpec) &&
-          
partitioning.partitionValuesOpt.zip(otherPartitioning.partitionValuesOpt).forall
 {
-            case (left, right) => left.zip(right).forall { case (l, r) =>
-              ordering.compare(l, r) == 0
-            }
+          
partitioning.partitionValues.zip(otherPartitioning.partitionValues).forall {
+            case (left, right) => ordering.compare(left, right) == 0
           }
     case ShuffleSpecCollection(specs) =>
       specs.exists(isCompatibleWith)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index 025b1a3c38f..d6b76ae1096 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -84,7 +84,7 @@ case class BatchScanExec(
           val newRows = new InternalRowSet(p.expressions.map(_.dataType))
           newRows ++= 
newPartitions.map(_.asInstanceOf[HasPartitionKey].partitionKey())
 
-          val oldRows = p.partitionValuesOpt.get.toSet
+          val oldRows = p.partitionValues.toSet
           // We require the new number of partition keys to be equal or less 
than the old number
           // of partition keys here. In the case of less than, empty 
partitions will be added for
           // those missing keys that are not present in the new input 
partitions.
@@ -116,7 +116,7 @@ case class BatchScanExec(
     super.outputPartitioning match {
       case k: KeyGroupedPartitioning if commonPartitionValues.isDefined =>
         val values = commonPartitionValues.get
-        k.copy(numPartitions = values.length, partitionValuesOpt = 
Some(values))
+        k.copy(numPartitions = values.length, partitionValues = values)
       case p => p
     }
   }
@@ -134,7 +134,7 @@ case class BatchScanExec(
         case p: KeyGroupedPartitioning =>
           val partitionMapping = finalPartitions.map(s =>
             s.head.asInstanceOf[HasPartitionKey].partitionKey() -> s).toMap
-          finalPartitions = p.partitionValuesOpt.get.map { partValue =>
+          finalPartitions = p.partitionValues.map { partValue =>
             // Use empty partition for those partition values that are not 
present
             partitionMapping.getOrElse(partValue, Seq.empty)
           }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
index fa4ae171df5..556ae4afb63 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
@@ -97,7 +97,7 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
       keyGroupedPartitioning match {
         case Some(exprs) if KeyGroupedPartitioning.supportsExpressions(exprs) 
=>
           groupedPartitions.map { partitionValues =>
-            KeyGroupedPartitioning(exprs, partitionValues.size, 
Some(partitionValues.map(_._1)))
+            KeyGroupedPartitioning(exprs, partitionValues.size, 
partitionValues.map(_._1))
           }.getOrElse(super.outputPartitioning)
         case _ =>
           super.outputPartitioning
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 7706b26af70..f88436297e7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -198,11 +198,8 @@ case class EnsureRequirements(
                 // Check if the two children are partition keys compatible. If 
so, find the
                 // common set of partition values, and adjust the plan 
accordingly.
                 if (leftSpec.areKeysCompatible(rightSpec)) {
-                  assert(leftSpec.partitioning.partitionValuesOpt.isDefined)
-                  assert(rightSpec.partitioning.partitionValuesOpt.isDefined)
-
-                  val leftPartValues = 
leftSpec.partitioning.partitionValuesOpt.get
-                  val rightPartValues = 
rightSpec.partitioning.partitionValuesOpt.get
+                  val leftPartValues = leftSpec.partitioning.partitionValues
+                  val rightPartValues = rightSpec.partitioning.partitionValues
 
                   val mergedPartValues = Utils.mergeOrdered(
                     Seq(leftPartValues, 
rightPartValues))(leftSpec.ordering).toSeq.distinct
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
index 7cfa00b4168..bc1fd7a5fa5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/exchange/EnsureRequirementsSuite.scala
@@ -1024,11 +1024,11 @@ class EnsureRequirementsSuite extends 
SharedSparkSession {
 
       var plan1 = DummySparkPlan(
         outputPartitioning = KeyGroupedPartitioning(bucket(4, exprB) :: 
bucket(8, exprC) :: Nil,
-          leftPartValues.length, Some(leftPartValues))
+          leftPartValues.length, leftPartValues)
       )
       var plan2 = DummySparkPlan(
         outputPartitioning = KeyGroupedPartitioning(bucket(4, exprC) :: 
bucket(8, exprB) :: Nil,
-          rightPartValues.length, Some(rightPartValues))
+          rightPartValues.length, rightPartValues)
       )
 
       // simple case
@@ -1047,9 +1047,9 @@ class EnsureRequirementsSuite extends SharedSparkSession {
       plan1 = DummySparkPlan(outputPartitioning =
         PartitioningCollection(
           Seq(KeyGroupedPartitioning(bucket(4, exprB) :: bucket(8, exprC) :: 
Nil,
-            leftPartValues.length, Some(leftPartValues)),
+            leftPartValues.length, leftPartValues),
             KeyGroupedPartitioning(bucket(4, exprB) :: bucket(8, exprC) :: Nil,
-              leftPartValues.length, Some(leftPartValues)))
+              leftPartValues.length, leftPartValues))
         )
       )
 
@@ -1074,15 +1074,15 @@ class EnsureRequirementsSuite extends 
SharedSparkSession {
             PartitioningCollection(
               Seq(
                 KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB) :: 
Nil,
-                  rightPartValues.length, Some(rightPartValues)),
+                  rightPartValues.length, rightPartValues),
                 KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB) :: 
Nil,
-                  rightPartValues.length, Some(rightPartValues)))),
+                  rightPartValues.length, rightPartValues))),
               PartitioningCollection(
                 Seq(
                   KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB) 
:: Nil,
-                    rightPartValues.length, Some(rightPartValues)),
+                    rightPartValues.length, rightPartValues),
                   KeyGroupedPartitioning(bucket(4, exprC) :: bucket(8, exprB) 
:: Nil,
-                    rightPartValues.length, Some(rightPartValues))))
+                    rightPartValues.length, rightPartValues)))
           )
         )
       )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to