pan3793 commented on code in PR #55519:
URL: https://github.com/apache/spark/pull/55519#discussion_r3165389572


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2142,15 +2142,18 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
-   val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
-    
buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
-      .doc("Whether to allow storage-partition join in the case where join 
keys are " +
-        "a subset of the partition keys of the source tables. At planning 
time, " +
-        "Spark will group the partitions by only those keys that are in the 
join keys. " +
+   val V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS =
+    
buildConf("spark.sql.sources.v2.bucketing.allowKeysSubsetOfPartitionKeys.enabled")
+      
.withAlternative("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
+      .doc("Whether to allow storage-partitioned operations (joins and 
aggregates) in the case " +
+        "where the operation's keys are a subset of the partition keys of the 
source tables. At " +
+        "planning time, Spark will group the partitions by only those keys 
that are in the " +
+        "operation's keys. " +
         s"This is currently enabled only if 
${REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key} " +
         "is false."
       )
       .version("4.0.0")

Review Comment:
   the new config name is clearer as it also applies to non-join operators, 
e.g., agg.
   
   since you renamed the config, this version should be `4.2.0`, also need to 
document it in the migration guide



##########
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala:
##########
@@ -3988,4 +3988,161 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase with
       }
     }
   }
+

Review Comment:
   can we have some cases in this suite that partition by transforms other than 
`identity`? especially `bucket`.
   
   side notes: I guess the adoption and test coverage for bucket is low. I 
found some trickys for v2 bucket table, bucket is a normal transform in v2 
concept, but is a special one for v1 table, for example, `SHOW CREATE TABLE` 
prints wrong output for v2 table `PARTITIONED BY (bucket(4, a), bucket(2, b))`



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala:
##########
@@ -210,6 +210,229 @@ class ProjectedOrderingAndPartitioningSuite
     assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a")
     assert(outputOrdering.head.sameOrderExpressions.size == 0)
   }
+
+  test("SPARK-46367: KeyedPartitioning expressions are projected through " +
+      "PartitioningPreservingUnaryExecNode") {
+    val a = AttributeReference("a", IntegerType)()
+    val partitionKeys = Seq(InternalRow(1), InternalRow(2), InternalRow(3))
+    val child = DummyLeafExecWithPartitioning(
+      output = Seq(a),
+      partitioning = KeyedPartitioning(Seq(a), partitionKeys))
+    val b = Alias(a, "b")()
+    val project = ProjectExec(Seq(b), child)
+
+    project.outputPartitioning match {
+      case kp: KeyedPartitioning =>
+        assert(kp.expressions === Seq(b.toAttribute),
+          "expressions must reference the aliased attribute, not the original")
+        assert(kp.partitionKeys ===
+          child.partitioning.asInstanceOf[KeyedPartitioning].partitionKeys,
+          "partition keys must be preserved after projection")
+      case other =>
+        fail(s"Expected KeyedPartitioning, got $other")
+    }
+  }
+
+  test("SPARK-46367: narrowing projection on KeyedPartitioning produces 
projected partition keys") {
+    // KP([x, y], [(1,1),(1,2),(2,1),(2,2)]) through Project(x) should produce
+    // KP([x], [(1),(1),(2),(2)]) -- granularity narrows from 2 to 1.
+    val x = AttributeReference("x", IntegerType)()
+    val y = AttributeReference("y", IntegerType)()
+    val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1), 
InternalRow(2, 2))
+    val child = DummyLeafExecWithPartitioning(
+      output = Seq(x, y),
+      partitioning = KeyedPartitioning(Seq(x, y), keys2d))
+    val project = ProjectExec(Seq(x), child)
+
+    project.outputPartitioning match {
+      case kp: KeyedPartitioning =>
+        assert(kp.expressions === Seq(x),
+          "narrowed partitioning must keep the projected expression")
+        assert(kp.numPartitions === 4,
+          "partition count must be preserved")
+      case other =>
+        fail(s"Expected KeyedPartitioning, got $other")
+    }
+  }
+
+  test("SPARK-46367: narrowing projection with alias shares partition keys 
across alternatives") {
+    // KP([x, y], ...) through Project(x, x as x_alias) should produce
+    // PC(KP([x], keys1d), KP([x_alias], keys1d)) where both KPs reference the 
same keys1d object.
+    val x = AttributeReference("x", IntegerType)()
+    val y = AttributeReference("y", IntegerType)()
+    val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1), 
InternalRow(2, 2))
+    val child = DummyLeafExecWithPartitioning(
+      output = Seq(x, y),
+      partitioning = KeyedPartitioning(Seq(x, y), keys2d))
+    val xAlias = Alias(x, "x_alias")()
+    val project = ProjectExec(Seq(x, xAlias), child)
+
+    project.outputPartitioning match {
+      case pc: PartitioningCollection =>
+        val kps = pc.partitionings.map(_.asInstanceOf[KeyedPartitioning])
+        assert(kps.forall(_.expressions.length == 1),
+          "all narrowed KPs must have 1 expression")
+        assert(kps.map(_.expressions.head.asInstanceOf[Attribute].name).toSet
+          === Set("x", "x_alias"),
+          "both the original and aliased attribute must appear")
+        // The invariant: all KPs in the collection must share the same 
partitionKeys object.
+        assert(kps.tail.forall(_.partitionKeys eq kps.head.partitionKeys),
+          "all KPs must share the same partitionKeys object")
+      case other =>
+        fail(s"Expected PartitioningCollection, got $other")
+    }
+  }
+
+  test("SPARK-46367: narrowing projection from 3 to 2 expressions with alias") 
{
+    // KP([x, y, z], keys3d) through Project(x, x as x_alias, y) -- z is 
dropped.

Review Comment:
   seems all newly added cases have a prefix projection, then the projected KP 
are always sorted, could you please add some cases like
   
   ```
   KP([x, y, z], keys3d) through Project(z, z as z_alias, y) -- x is dropped.
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to