cloud-fan commented on code in PR #55556:
URL: https://github.com/apache/spark/pull/55556#discussion_r3152264827


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -527,10 +527,25 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val isMetadataAccess = u.containsTag(LogicalPlan.IS_METADATA_COL)
-
-    val (resolved, matched) = resolveDataFrameColumnByPlanId(
-      u, planId, isMetadataAccess, q, 0)
+    val (resolved, matched) = if (u.containsTag(LogicalPlan.IS_METADATA_COL)) {
+      // Metadata access (e.g. `df["_metadata"]`): the resolved attribute lives
+      // in `p.metadataOutput`, so filter ancestors by `p.metadataOutput`.

Review Comment:
   The comment claims "the resolved attribute lives in `p.metadataOutput`," but 
`getMetadataAttributeByNameOpt` (LogicalPlan.scala:56) explicitly looks in 
`(metadataOutput ++ output).collectFirst` with the note "An already-referenced 
column might appear in `output` instead of `metadataOutput`." The resolved 
attribute can be in `p.output`, not `p.metadataOutput`.
   
   This matters because the filter is now narrower than pre-SPARK-55070 
(`p.output ++ p.metadataOutput`). At any ancestor that clears `metadataOutput` 
to `Nil` (Aggregate, Limit, Sort, Window, set ops — 
basicLogicalOperators.scala:404, 448, 510, 853, 885, 1228, 1513, 1573, 1684) 
but carries the metadata attribute through `output`, the strict-metadata filter 
would reject a candidate the previous code accepted. Is the narrowing 
intentional? If so, can you spell that out in the comment and add a test for 
the metadata-access path? The PR description doesn't mention this behavior 
change.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala:
##########
@@ -527,10 +527,25 @@ trait ColumnResolutionHelper extends Logging with 
DataTypeErrorsBase {
     val planId = planIdOpt.get
     logDebug(s"Extract plan_id $planId from $u")
 
-    val isMetadataAccess = u.containsTag(LogicalPlan.IS_METADATA_COL)
-
-    val (resolved, matched) = resolveDataFrameColumnByPlanId(
-      u, planId, isMetadataAccess, q, 0)
+    val (resolved, matched) = if (u.containsTag(LogicalPlan.IS_METADATA_COL)) {
+      // Metadata access (e.g. `df["_metadata"]`): the resolved attribute lives
+      // in `p.metadataOutput`, so filter ancestors by `p.metadataOutput`.
+      resolveDataFrameColumnByPlanId(
+        u, planId, true, q, 0, plan => AttributeSet(plan.metadataOutput))
+    } else {
+      // Regular access: try the strict `p.outputSet` filter first.
+      // That drops candidates hidden at an ancestor, e.g. the right side's 
join
+      // key after a natural/USING join. Fall back to `p.output ++ 
p.metadataOutput`
+      // only when strict resolves nothing, handling the SPARK-55070
+      // `rhs["join_key"]` case. Mirrors `outputAttributes.resolve orElse
+      // outputMetadataAttributes.resolve` in `LogicalPlan.resolve`.
+      resolveDataFrameColumnByPlanId(
+          u, planId, false, q, 0, plan => plan.outputSet) match {
+        case (Some(r), m) => (Some(r), m)
+        case _ => resolveDataFrameColumnByPlanId(u, planId, false, q, 0,

Review Comment:
   The fallback re-walks the tree from scratch — re-descending, re-running 
`p.resolve(u.nameParts, conf.resolver)` at every matched node, and re-merging — 
only to swap the filter set. Resolution at matched nodes and the descent are 
identical between the two passes; only `getAllowed(p)` differs.
   
   Consider collapsing to a single walk by exposing the two filter components 
per level (e.g. `(p.outputSet, AttributeSet(p.metadataOutput))`) and tracking 
pass-states on each candidate as it flows up. Concretely: drop `getAllowed`, 
return candidates as `(NamedExpression, depth, passesStrict)`; at every 
ancestor, use `r.references.subsetOf(AttributeSet(p.output ++ 
p.metadataOutput))` as the survival gate (matches today's broad filter) and 
AND-in `r.references.subsetOf(p.outputSet)` to update `passesStrict`. At the 
top of `resolveDataFrameColumn`, prefer the `passesStrict` subset and fall back 
to all survivors. That preserves the foldLeft merge and the strict-then-broad 
precedence, but pays one walk instead of two.
   
   Not a blocker — just feels like the two passes are doing the same descent 
twice when the only difference is the filter.



##########
python/pyspark/sql/tests/test_column.py:
##########
@@ -558,6 +558,26 @@ def test_select_join_keys(self):
             self.assertTrue(df1.join(df2, "id", how).select(df1["id"]).count() 
>= 0, how)
             self.assertTrue(df1.join(df2, "id", how).select(df2["id"]).count() 
>= 0, how)
 
+    def 
test_select_regular_column_with_reused_dataframe_hidden_in_natural_join(self):
+        # A DataFrame appears both as a direct join side and inside a 
natural/USING
+        # join that hides one of its columns into `metadataOutput`. When 
resolving
+        # `dim["dim_id"]`, two candidates match the plan id: one from 
`p.output`
+        # (the direct join side) and one only visible via `p.metadataOutput` 
(the
+        # reused `dim` nested under the USING-join wrapper). We should prefer 
the
+        # regular candidate and not throw AMBIGUOUS_COLUMN_REFERENCE.
+        fact = self.spark.createDataFrame([(1, 10, "T1"), (2, 20, "T2")], 
["id", "fk", "txn_id"])
+        dim = self.spark.createDataFrame([(10, "X"), (20, "Y"), (30, "Z")], 
["dim_id", "dim_name"])
+        events = self.spark.createDataFrame(
+            [(10, "T1", 100), (20, "T2", 200)], ["dim_id", "txn_id", "amount"]
+        )
+        enriched = events.join(dim, "dim_id", "left")
+        result = (
+            fact.join(dim, fact["fk"] == dim["dim_id"], "left")
+            .join(enriched, "txn_id", "full_outer")
+            .select(dim["dim_id"])
+        )
+        self.assertTrue(result.count() >= 0)

Review Comment:
   `assertTrue(result.count() >= 0)` only checks that the query analyzes. The 
whole point of the fix is *which* `dim["dim_id"]` candidate wins (the 
direct-side one, not the hidden nested one). A regression that silently picks 
the hidden candidate would still pass this test. Consider asserting the 
resolved values match the direct-`dim` rows so the test catches both the 
original ambiguous-error regression and a future "wrong winner" regression.
   
   ```suggestion
           self.assertEqual(
               sorted(r[0] for r in result.collect()),
               sorted([10, 20]),
           )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to