peter-toth commented on code in PR #55519:
URL: https://github.com/apache/spark/pull/55519#discussion_r3166932495


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala:
##########
@@ -210,6 +210,229 @@ class ProjectedOrderingAndPartitioningSuite
     assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a")
     assert(outputOrdering.head.sameOrderExpressions.size == 0)
   }
+
+  test("SPARK-46367: KeyedPartitioning expressions are projected through " +
+      "PartitioningPreservingUnaryExecNode") {
+    val a = AttributeReference("a", IntegerType)()
+    val partitionKeys = Seq(InternalRow(1), InternalRow(2), InternalRow(3))
+    val child = DummyLeafExecWithPartitioning(
+      output = Seq(a),
+      partitioning = KeyedPartitioning(Seq(a), partitionKeys))
+    val b = Alias(a, "b")()
+    val project = ProjectExec(Seq(b), child)
+
+    project.outputPartitioning match {
+      case kp: KeyedPartitioning =>
+        assert(kp.expressions === Seq(b.toAttribute),
+          "expressions must reference the aliased attribute, not the original")
+        assert(kp.partitionKeys ===
+          child.partitioning.asInstanceOf[KeyedPartitioning].partitionKeys,
+          "partition keys must be preserved after projection")
+      case other =>
+        fail(s"Expected KeyedPartitioning, got $other")
+    }
+  }
+
+  test("SPARK-46367: narrowing projection on KeyedPartitioning produces 
projected partition keys") {
+    // KP([x, y], [(1,1),(1,2),(2,1),(2,2)]) through Project(x) should produce
+    // KP([x], [(1),(1),(2),(2)]) -- granularity narrows from 2 to 1.
+    val x = AttributeReference("x", IntegerType)()
+    val y = AttributeReference("y", IntegerType)()
+    val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1), 
InternalRow(2, 2))
+    val child = DummyLeafExecWithPartitioning(
+      output = Seq(x, y),
+      partitioning = KeyedPartitioning(Seq(x, y), keys2d))
+    val project = ProjectExec(Seq(x), child)
+
+    project.outputPartitioning match {
+      case kp: KeyedPartitioning =>
+        assert(kp.expressions === Seq(x),
+          "narrowed partitioning must keep the projected expression")
+        assert(kp.numPartitions === 4,
+          "partition count must be preserved")
+      case other =>
+        fail(s"Expected KeyedPartitioning, got $other")
+    }
+  }
+
+  test("SPARK-46367: narrowing projection with alias shares partition keys 
across alternatives") {
+    // KP([x, y], ...) through Project(x, x as x_alias) should produce
+    // PC(KP([x], keys1d), KP([x_alias], keys1d)) where both KPs reference the 
same keys1d object.
+    val x = AttributeReference("x", IntegerType)()
+    val y = AttributeReference("y", IntegerType)()
+    val keys2d = Seq(InternalRow(1, 1), InternalRow(1, 2), InternalRow(2, 1), 
InternalRow(2, 2))
+    val child = DummyLeafExecWithPartitioning(
+      output = Seq(x, y),
+      partitioning = KeyedPartitioning(Seq(x, y), keys2d))
+    val xAlias = Alias(x, "x_alias")()
+    val project = ProjectExec(Seq(x, xAlias), child)
+
+    project.outputPartitioning match {
+      case pc: PartitioningCollection =>
+        val kps = pc.partitionings.map(_.asInstanceOf[KeyedPartitioning])
+        assert(kps.forall(_.expressions.length == 1),
+          "all narrowed KPs must have 1 expression")
+        assert(kps.map(_.expressions.head.asInstanceOf[Attribute].name).toSet
+          === Set("x", "x_alias"),
+          "both the original and aliased attribute must appear")
+        // The invariant: all KPs in the collection must share the same 
partitionKeys object.
+        assert(kps.tail.forall(_.partitionKeys eq kps.head.partitionKeys),
+          "all KPs must share the same partitionKeys object")
+      case other =>
+        fail(s"Expected PartitioningCollection, got $other")
+    }
+  }
+
+  test("SPARK-46367: narrowing projection from 3 to 2 expressions with alias") 
{
+    // KP([x, y, z], keys3d) through Project(x, x as x_alias, y) -- z is 
dropped.

Review Comment:
   Absolutely, added new tests in 
https://github.com/apache/spark/pull/55519/commits/bcf0be11509865da1ff96ea43bb2b066608a8771.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to