peter-toth commented on code in PR #55519:
URL: https://github.com/apache/spark/pull/55519#discussion_r3166932495
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala:
##########
@@ -210,6 +210,229 @@ class ProjectedOrderingAndPartitioningSuite
assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a")
assert(outputOrdering.head.sameOrderExpressions.size == 0)
}
+
+ test("SPARK-46367: KeyedPartitioning expressions are projected through " +
+ "PartitioningPreservingUnaryExecNode") {
+ val a = AttributeReference("a", IntegerType)()
+ val partitionKeys = Seq(InternalRow(1), InternalRow(2), InternalRow(3))
+ val child = DummyLeafExecWithPartitioning(
+ output = Seq(a),
+ partitioning = KeyedPartitioning(Seq(a), partitionKeys))
+ val b = Alias(a, "b")()
+ val project = ProjectExec(Seq(b), child)
+
+ project.outputPartitioning match {
+ case kp: KeyedPartitioning =>
+ assert(kp.expressions === Seq(b.toAttribute),
+ "expressions must reference the aliased attribute, not the original")
+ assert(kp.partitionKeys ===
+ child.partitioning.asInstanceOf[KeyedPartitioning].partitionKeys,
+ "partition keys must be preserved after projection")
+ case other =>
+ fail(s"Expected KeyedPartitioning, got $other")
+ }
+ }
+
+ test("SPARK-46367: narrowing projection on KeyedPartitioning produces
projected partition keys") {
+ // KP([x, y], [(1,1),(1,2),(2,1),(2,2)]) through Project(x) should produce
+ // KP([x], [(1),(1),(2),(2)]) -- granularity narrows from 2 to 1.
+ val x = AttributeReference("x", IntegerType)()
+ val y = AttributeReference("y", IntegerType)()
+ val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1),
InternalRow(2, 2))
+ val child = DummyLeafExecWithPartitioning(
+ output = Seq(x, y),
+ partitioning = KeyedPartitioning(Seq(x, y), keys2d))
+ val project = ProjectExec(Seq(x), child)
+
+ project.outputPartitioning match {
+ case kp: KeyedPartitioning =>
+ assert(kp.expressions === Seq(x),
+ "narrowed partitioning must keep the projected expression")
+ assert(kp.numPartitions === 4,
+ "partition count must be preserved")
+ case other =>
+ fail(s"Expected KeyedPartitioning, got $other")
+ }
+ }
+
+ test("SPARK-46367: narrowing projection with alias shares partition keys
across alternatives") {
+ // KP([x, y], ...) through Project(x, x as x_alias) should produce
+ // PC(KP([x], keys1d), KP([x_alias], keys1d)) where both KPs reference the
same keys1d object.
+ val x = AttributeReference("x", IntegerType)()
+ val y = AttributeReference("y", IntegerType)()
+ val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1),
InternalRow(2, 2))
+ val child = DummyLeafExecWithPartitioning(
+ output = Seq(x, y),
+ partitioning = KeyedPartitioning(Seq(x, y), keys2d))
+ val xAlias = Alias(x, "x_alias")()
+ val project = ProjectExec(Seq(x, xAlias), child)
+
+ project.outputPartitioning match {
+ case pc: PartitioningCollection =>
+ val kps = pc.partitionings.map(_.asInstanceOf[KeyedPartitioning])
+ assert(kps.forall(_.expressions.length == 1),
+ "all narrowed KPs must have 1 expression")
+ assert(kps.map(_.expressions.head.asInstanceOf[Attribute].name).toSet
+ === Set("x", "x_alias"),
+ "both the original and aliased attribute must appear")
+ // The invariant: all KPs in the collection must share the same
partitionKeys object.
+ assert(kps.tail.forall(_.partitionKeys eq kps.head.partitionKeys),
+ "all KPs must share the same partitionKeys object")
+ case other =>
+ fail(s"Expected PartitioningCollection, got $other")
+ }
+ }
+
+ test("SPARK-46367: narrowing projection from 3 to 2 expressions with alias")
{
+ // KP([x, y, z], keys3d) through Project(x, x as x_alias, y) -- z is
dropped.
Review Comment:
Absolutely, added new tests in
https://github.com/apache/spark/pull/55519/commits/bcf0be11509865da1ff96ea43bb2b066608a8771.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]