This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8c63485189c [SPARK-46380][SQL][FOLLOWUP] Simplify the code for ResolveInlineTables and ResolveInlineTablesSuite 8c63485189c is described below commit 8c63485189c87fd0a11b57c1a4c8ffa517f5f64e Author: Jiaan Geng <belie...@163.com> AuthorDate: Fri Dec 22 09:49:55 2023 +0800 [SPARK-46380][SQL][FOLLOWUP] Simplify the code for ResolveInlineTables and ResolveInlineTablesSuite ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/44316 replace current time/date prior to evaluating inline table expressions. This PR propose to simplify the code for `ResolveInlineTables` and let `ResolveInlineTablesSuite` apply the rule `ResolveInlineTables`. ### Why are the changes needed? Simplify the code for `ResolveInlineTables` and `ResolveInlineTablesSuite`. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? Test cases updated. GA tests. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #44447 from beliefer/SPARK-46380_followup. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/analysis/ResolveInlineTables.scala | 28 +++++++++------------- .../analysis/ResolveInlineTablesSuite.scala | 12 +++++----- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index 73600f5c706..811e02b4d97 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -95,30 +95,24 @@ object ResolveInlineTables extends Rule[LogicalPlan] private[analysis] def findCommonTypesAndCast(table: UnresolvedInlineTable): ResolvedInlineTable = { // For each column, traverse all the values and find a common data type and nullability. - val fields = table.rows.transpose.zip(table.names).map { case (column, name) => + val (fields, columns) = table.rows.transpose.zip(table.names).map { case (column, name) => val inputTypes = column.map(_.dataType) val tpe = TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse { table.failAnalysis( errorClass = "INVALID_INLINE_TABLE.INCOMPATIBLE_TYPES_IN_INLINE_TABLE", messageParameters = Map("colName" -> toSQLId(name))) } - StructField(name, tpe, nullable = column.exists(_.nullable)) - } - val attributes = DataTypeUtils.toAttributes(StructType(fields)) - assert(fields.size == table.names.size) - - val castedRows: Seq[Seq[Expression]] = table.rows.map { row => - row.zipWithIndex.map { - case (e, ci) => - val targetType = fields(ci).dataType - val castedExpr = if (DataTypeUtils.sameType(e.dataType, targetType)) { - e - } else { - cast(e, targetType) - } - castedExpr + val newColumn = column.map { + case expr if DataTypeUtils.sameType(expr.dataType, tpe) => + expr + case expr => + cast(expr, tpe) } - } + (StructField(name, tpe, nullable = column.exists(_.nullable)), newColumn) + }.unzip + assert(fields.size == table.names.size) + val attributes = DataTypeUtils.toAttributes(StructType(fields)) + val castedRows: Seq[Seq[Expression]] = columns.transpose ResolvedInlineTable(castedRows, attributes) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala index 758b6b73e4e..3e014d1c11d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala @@ -86,8 +86,9 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter { test("cast and execute") { val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L)))) - val resolved = ResolveInlineTables.findCommonTypesAndCast(table) - val converted = ResolveInlineTables.earlyEvalIfPossible(resolved).asInstanceOf[LocalRelation] + val resolved = ResolveInlineTables(table) + assert(resolved.isInstanceOf[LocalRelation]) + val converted = resolved.asInstanceOf[LocalRelation] assert(converted.output.map(_.dataType) == Seq(LongType)) assert(converted.data.size == 2) @@ -98,12 +99,11 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter { test("cast and execute CURRENT_LIKE expressions") { val table = UnresolvedInlineTable(Seq("c1"), Seq( Seq(CurrentTimestamp()), Seq(CurrentTimestamp()))) - val casted = ResolveInlineTables.findCommonTypesAndCast(table) - val earlyEval = ResolveInlineTables.earlyEvalIfPossible(casted) + val resolved = ResolveInlineTables(table) // Early eval should keep it in expression form. - assert(earlyEval.isInstanceOf[ResolvedInlineTable]) + assert(resolved.isInstanceOf[ResolvedInlineTable]) - EvalInlineTables(ComputeCurrentTime(earlyEval)) match { + EvalInlineTables(ComputeCurrentTime(resolved)) match { case LocalRelation(output, data, _) => assert(output.map(_.dataType) == Seq(TimestampType)) assert(data.size == 2) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org