This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0de6f2995c78 [SPARK-51451][SQL] Fix ExtractGenerator to wait for 
UnresolvedStarWithColumns to be resolved
0de6f2995c78 is described below

commit 0de6f2995c786bb0be0020fc76d22e79e4f9631f
Author: Takuya Ueshin <ues...@databricks.com>
AuthorDate: Tue Mar 18 14:21:41 2025 +0800

    [SPARK-51451][SQL] Fix ExtractGenerator to wait for 
UnresolvedStarWithColumns to be resolved
    
    ### What changes were proposed in this pull request?
    
    Fixes `ExtractGenerator` to wait for `UnresolvedStarWithColumns` to be 
resolved.
    
    ### Why are the changes needed?
    
    `df.withColumn` is now analyzed in the analyzer, it causes 
`ExtractGenerator` rule to misunderstand that the generator is nested.
    
    This happens with Spark Connect more often because Spark Classic usually 
can resolve `UnresolvedStarWithColumns` before `ExtractGenerator` rule, whereas 
Spark Connect sometimes needs several iteration of resolving rules.
    
    ```py
    from pyspark.sql.functions import *
    
    df = spark.createDataFrame([("082017",)], ['dt'])
    df_dt = df.select(date_format(to_date(col("dt"), "MMyyyy"), 
"MM/dd/yyyy").alias("dt"))
    
    monthArray = [lit(x) for x in range(0, 12)]
    df_month_y = df_dt.withColumn("month_y", explode(array(monthArray)))
    
    df_month_y.show()
    ```
    
    ```
    pyspark.errors.exceptions.connect.AnalysisException: 
[UNSUPPORTED_GENERATOR.NESTED_IN_EXPRESSIONS] The generator is not supported: 
nested in expressions "unresolvedstarwithcolumns(explode(array(0, 1, 2, 3, 4, 
5, 6, 7, 8, 9, 10, 11)))". SQLSTATE: 42K0E
    ```
    
    Its parsed plan is:
    
    ```
    == Parsed Logical Plan ==
    'Project [unresolvedstarwithcolumns(month_y, 'explode('array(0, 1, 2, 3, 4, 
5, 6, 7, 8, 9, 10, 11)), Some(List({})))]
    +- 'Project ['date_format('to_date('dt, MMyyyy), MM/dd/yyyy) AS dt#5]
       +- 'UnresolvedSubqueryColumnAliases [dt]
          +- LocalRelation [dt#4]
    ```
    
    Here `explode` is nested in `unresolvedstarwithcolumns`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, `df.withColumn` with generators will be back available.
    
    ### How was this patch tested?
    
    Added the related tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #50286 from ueshin/issues/SPARK-51451/with_column_generator.
    
    Authored-by: Takuya Ueshin <ues...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 python/pyspark/sql/tests/test_dataframe.py         | 28 +++++++++++++++++++++-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  4 ++++
 .../spark/sql/catalyst/analysis/unresolved.scala   |  4 ++++
 3 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_dataframe.py 
b/python/pyspark/sql/tests/test_dataframe.py
index 706b8c0a8be8..e20ab09d8d1e 100644
--- a/python/pyspark/sql/tests/test_dataframe.py
+++ b/python/pyspark/sql/tests/test_dataframe.py
@@ -26,7 +26,7 @@ import io
 from contextlib import redirect_stdout
 
 from pyspark.sql import Row, functions, DataFrame
-from pyspark.sql.functions import col, lit, count, struct
+from pyspark.sql.functions import col, lit, count, struct, date_format, 
to_date, array, explode
 from pyspark.sql.types import (
     StringType,
     IntegerType,
@@ -1076,6 +1076,32 @@ class DataFrameTestsMixin:
                     [Row(0), Row(0), Row(0)],
                 )
 
+    def test_with_column_and_generator(self):
+        # SPARK-51451: Generators should be available with withColumn
+        df = self.spark.createDataFrame([("082017",)], ["dt"]).select(
+            to_date(col("dt"), "MMyyyy").alias("dt")
+        )
+        df_dt = df.withColumn("dt", date_format(col("dt"), "MM/dd/yyyy"))
+        monthArray = [lit(x) for x in range(0, 12)]
+        df_month_y = df_dt.withColumn("month_y", explode(array(monthArray)))
+
+        assertDataFrameEqual(
+            df_month_y,
+            [Row(dt="08/01/2017", month_y=i) for i in range(12)],
+        )
+
+        df_dt_month_y = df.withColumns(
+            {
+                "dt": date_format(col("dt"), "MM/dd/yyyy"),
+                "month_y": explode(array(monthArray)),
+            }
+        )
+
+        assertDataFrameEqual(
+            df_dt_month_y,
+            [Row(dt="08/01/2017", month_y=i) for i in range(12)],
+        )
+
 
 class DataFrameTests(DataFrameTestsMixin, ReusedSQLTestCase):
     def test_query_execution_unsupported_in_classic(self):
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index aac5bc08fe04..93e6fb6746a1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2999,6 +2999,10 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
 
     def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
       _.containsPattern(GENERATOR), ruleId) {
+      case p @ Project(Seq(UnresolvedStarWithColumns(_, _, _)), _) =>
+        // UnresolvedStarWithColumns should be resolved before extracting.
+        p
+
       case Project(projectList, _) if projectList.exists(hasNestedGenerator) =>
         val nestedGenerator = projectList.find(hasNestedGenerator).get
         throw 
QueryCompilationErrors.nestedGeneratorError(trimAlias(nestedGenerator))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index fabe551d054c..93462e577391 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -776,6 +776,8 @@ case class UnresolvedStarWithColumns(
 
     replacedAndExistingColumns ++ newColumns
   }
+
+  override def toString: String = super[Expression].toString
 }
 
 /**
@@ -812,6 +814,8 @@ case class UnresolvedStarWithColumnsRenames(
         )
     }
   }
+
+  override def toString: String = super[LeafExpression].toString
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to