pan3793 commented on code in PR #55519:
URL: https://github.com/apache/spark/pull/55519#discussion_r3165389572
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2142,15 +2142,18 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
- val V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS =
-
buildConf("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
- .doc("Whether to allow storage-partition join in the case where join
keys are " +
- "a subset of the partition keys of the source tables. At planning
time, " +
- "Spark will group the partitions by only those keys that are in the
join keys. " +
+ val V2_BUCKETING_ALLOW_KEYS_SUBSET_OF_PARTITION_KEYS =
+
buildConf("spark.sql.sources.v2.bucketing.allowKeysSubsetOfPartitionKeys.enabled")
+
.withAlternative("spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled")
+ .doc("Whether to allow storage-partitioned operations (joins and
aggregates) in the case " +
+ "where the operation's keys are a subset of the partition keys of the
source tables. At " +
+ "planning time, Spark will group the partitions by only those keys
that are in the " +
+ "operation's keys. " +
s"This is currently enabled only if
${REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION.key} " +
"is false."
)
.version("4.0.0")
Review Comment:
the new config name is clearer as it also applies to non-join operators,
e.g., agg.
since you renamed the config, this version should be `4.2.0`, also need to
document it in the migration guide
##########
sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala:
##########
@@ -3988,4 +3988,161 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase with
}
}
}
+
Review Comment:
can we have some cases in this suite that partition by transforms other than
`identity`? especially `bucket`.
side notes: I guess the adoption and test coverage for bucket is low. I
found some trickys for v2 bucket table, bucket is a normal transform in v2
concept, but is a special one for v1 table, for example, `SHOW CREATE TABLE`
prints wrong output for v2 table `PARTITIONED BY (bucket(4, a), bucket(2, b))`
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala:
##########
@@ -210,6 +210,229 @@ class ProjectedOrderingAndPartitioningSuite
assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a")
assert(outputOrdering.head.sameOrderExpressions.size == 0)
}
+
+ test("SPARK-46367: KeyedPartitioning expressions are projected through " +
+ "PartitioningPreservingUnaryExecNode") {
+ val a = AttributeReference("a", IntegerType)()
+ val partitionKeys = Seq(InternalRow(1), InternalRow(2), InternalRow(3))
+ val child = DummyLeafExecWithPartitioning(
+ output = Seq(a),
+ partitioning = KeyedPartitioning(Seq(a), partitionKeys))
+ val b = Alias(a, "b")()
+ val project = ProjectExec(Seq(b), child)
+
+ project.outputPartitioning match {
+ case kp: KeyedPartitioning =>
+ assert(kp.expressions === Seq(b.toAttribute),
+ "expressions must reference the aliased attribute, not the original")
+ assert(kp.partitionKeys ===
+ child.partitioning.asInstanceOf[KeyedPartitioning].partitionKeys,
+ "partition keys must be preserved after projection")
+ case other =>
+ fail(s"Expected KeyedPartitioning, got $other")
+ }
+ }
+
+ test("SPARK-46367: narrowing projection on KeyedPartitioning produces
projected partition keys") {
+ // KP([x, y], [(1,1),(1,2),(2,1),(2,2)]) through Project(x) should produce
+ // KP([x], [(1),(1),(2),(2)]) -- granularity narrows from 2 to 1.
+ val x = AttributeReference("x", IntegerType)()
+ val y = AttributeReference("y", IntegerType)()
+ val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1),
InternalRow(2, 2))
+ val child = DummyLeafExecWithPartitioning(
+ output = Seq(x, y),
+ partitioning = KeyedPartitioning(Seq(x, y), keys2d))
+ val project = ProjectExec(Seq(x), child)
+
+ project.outputPartitioning match {
+ case kp: KeyedPartitioning =>
+ assert(kp.expressions === Seq(x),
+ "narrowed partitioning must keep the projected expression")
+ assert(kp.numPartitions === 4,
+ "partition count must be preserved")
+ case other =>
+ fail(s"Expected KeyedPartitioning, got $other")
+ }
+ }
+
+ test("SPARK-46367: narrowing projection with alias shares partition keys
across alternatives") {
+ // KP([x, y], ...) through Project(x, x as x_alias) should produce
+ // PC(KP([x], keys1d), KP([x_alias], keys1d)) where both KPs reference the
same keys1d object.
+ val x = AttributeReference("x", IntegerType)()
+ val y = AttributeReference("y", IntegerType)()
+ val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1),
InternalRow(2, 2))
+ val child = DummyLeafExecWithPartitioning(
+ output = Seq(x, y),
+ partitioning = KeyedPartitioning(Seq(x, y), keys2d))
+ val xAlias = Alias(x, "x_alias")()
+ val project = ProjectExec(Seq(x, xAlias), child)
+
+ project.outputPartitioning match {
+ case pc: PartitioningCollection =>
+ val kps = pc.partitionings.map(_.asInstanceOf[KeyedPartitioning])
+ assert(kps.forall(_.expressions.length == 1),
+ "all narrowed KPs must have 1 expression")
+ assert(kps.map(_.expressions.head.asInstanceOf[Attribute].name).toSet
+ === Set("x", "x_alias"),
+ "both the original and aliased attribute must appear")
+ // The invariant: all KPs in the collection must share the same
partitionKeys object.
+ assert(kps.tail.forall(_.partitionKeys eq kps.head.partitionKeys),
+ "all KPs must share the same partitionKeys object")
+ case other =>
+ fail(s"Expected PartitioningCollection, got $other")
+ }
+ }
+
+ test("SPARK-46367: narrowing projection from 3 to 2 expressions with alias")
{
+ // KP([x, y, z], keys3d) through Project(x, x as x_alias, y) -- z is
dropped.
Review Comment:
seems all newly added cases have a prefix projection, then the projected KP
are always sorted, could you please add some cases like
```
KP([x, y, z], keys3d) through Project(z, z as z_alias, y) -- x is dropped.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]