This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0de6f2995c78 [SPARK-51451][SQL] Fix ExtractGenerator to wait for UnresolvedStarWithColumns to be resolved 0de6f2995c78 is described below commit 0de6f2995c786bb0be0020fc76d22e79e4f9631f Author: Takuya Ueshin <ues...@databricks.com> AuthorDate: Tue Mar 18 14:21:41 2025 +0800 [SPARK-51451][SQL] Fix ExtractGenerator to wait for UnresolvedStarWithColumns to be resolved ### What changes were proposed in this pull request? Fixes `ExtractGenerator` to wait for `UnresolvedStarWithColumns` to be resolved. ### Why are the changes needed? `df.withColumn` is now analyzed in the analyzer, it causes `ExtractGenerator` rule to misunderstand that the generator is nested. This happens with Spark Connect more often because Spark Classic usually can resolve `UnresolvedStarWithColumns` before `ExtractGenerator` rule, whereas Spark Connect sometimes needs several iteration of resolving rules. ```py from pyspark.sql.functions import * df = spark.createDataFrame([("082017",)], ['dt']) df_dt = df.select(date_format(to_date(col("dt"), "MMyyyy"), "MM/dd/yyyy").alias("dt")) monthArray = [lit(x) for x in range(0, 12)] df_month_y = df_dt.withColumn("month_y", explode(array(monthArray))) df_month_y.show() ``` ``` pyspark.errors.exceptions.connect.AnalysisException: [UNSUPPORTED_GENERATOR.NESTED_IN_EXPRESSIONS] The generator is not supported: nested in expressions "unresolvedstarwithcolumns(explode(array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)))". SQLSTATE: 42K0E ``` Its parsed plan is: ``` == Parsed Logical Plan == 'Project [unresolvedstarwithcolumns(month_y, 'explode('array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)), Some(List({})))] +- 'Project ['date_format('to_date('dt, MMyyyy), MM/dd/yyyy) AS dt#5] +- 'UnresolvedSubqueryColumnAliases [dt] +- LocalRelation [dt#4] ``` Here `explode` is nested in `unresolvedstarwithcolumns`. ### Does this PR introduce _any_ user-facing change? Yes, `df.withColumn` with generators will be back available. ### How was this patch tested? Added the related tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50286 from ueshin/issues/SPARK-51451/with_column_generator. Authored-by: Takuya Ueshin <ues...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- python/pyspark/sql/tests/test_dataframe.py | 28 +++++++++++++++++++++- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 ++++ .../spark/sql/catalyst/analysis/unresolved.scala | 4 ++++ 3 files changed, 35 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 706b8c0a8be8..e20ab09d8d1e 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -26,7 +26,7 @@ import io from contextlib import redirect_stdout from pyspark.sql import Row, functions, DataFrame -from pyspark.sql.functions import col, lit, count, struct +from pyspark.sql.functions import col, lit, count, struct, date_format, to_date, array, explode from pyspark.sql.types import ( StringType, IntegerType, @@ -1076,6 +1076,32 @@ class DataFrameTestsMixin: [Row(0), Row(0), Row(0)], ) + def test_with_column_and_generator(self): + # SPARK-51451: Generators should be available with withColumn + df = self.spark.createDataFrame([("082017",)], ["dt"]).select( + to_date(col("dt"), "MMyyyy").alias("dt") + ) + df_dt = df.withColumn("dt", date_format(col("dt"), "MM/dd/yyyy")) + monthArray = [lit(x) for x in range(0, 12)] + df_month_y = df_dt.withColumn("month_y", explode(array(monthArray))) + + assertDataFrameEqual( + df_month_y, + [Row(dt="08/01/2017", month_y=i) for i in range(12)], + ) + + df_dt_month_y = df.withColumns( + { + "dt": date_format(col("dt"), "MM/dd/yyyy"), + "month_y": explode(array(monthArray)), + } + ) + + assertDataFrameEqual( + df_dt_month_y, + [Row(dt="08/01/2017", month_y=i) for i in range(12)], + ) + class DataFrameTests(DataFrameTestsMixin, ReusedSQLTestCase): def test_query_execution_unsupported_in_classic(self): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index aac5bc08fe04..93e6fb6746a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2999,6 +2999,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( _.containsPattern(GENERATOR), ruleId) { + case p @ Project(Seq(UnresolvedStarWithColumns(_, _, _)), _) => + // UnresolvedStarWithColumns should be resolved before extracting. + p + case Project(projectList, _) if projectList.exists(hasNestedGenerator) => val nestedGenerator = projectList.find(hasNestedGenerator).get throw QueryCompilationErrors.nestedGeneratorError(trimAlias(nestedGenerator)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index fabe551d054c..93462e577391 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -776,6 +776,8 @@ case class UnresolvedStarWithColumns( replacedAndExistingColumns ++ newColumns } + + override def toString: String = super[Expression].toString } /** @@ -812,6 +814,8 @@ case class UnresolvedStarWithColumnsRenames( ) } } + + override def toString: String = super[LeafExpression].toString } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org