(spark) branch master updated: [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 43f79326106 [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt 43f79326106 is described below commit 43f79326106acb277b9edfb28c34f5dc310b416b Author: ulysses-you AuthorDate: Fri Dec 22 13:26:02 2023 +0800 [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt ### What changes were proposed in this pull request? This pr adds a check: we only mark the cached partition is materialized if the task is not failed and not interrupted. And adds a new method `isFailed` in `TaskContext`. ### Why are the changes needed? Before this pr, when do cache, task failure can cause NPE in other tasks ``` java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:396) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.accessors1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown Source) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) ``` ### Does this PR introduce _any_ user-facing change? yes, it's a bug fix ### How was this patch tested? add test ### Was this patch authored or co-authored using generative AI tooling? no Closes #5 from ulysses-you/fix-cache. Authored-by: ulysses-you Signed-off-by: Kent Yao --- core/src/main/scala/org/apache/spark/BarrierTaskContext.scala | 2 ++ core/src/main/scala/org/apache/spark/TaskContext.scala | 5 + core/src/main/scala/org/apache/spark/TaskContextImpl.scala | 2 ++ .../scala/org/apache/spark/scheduler/TaskContextSuite.scala| 10 ++ project/MimaExcludes.scala | 4 +++- .../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 8 +--- 6 files changed, 27 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 0f9abaf94ae..50aff8b0fb1 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -194,6 +194,8 @@ class BarrierTaskContext private[spark] ( override def isCompleted(): Boolean = taskContext.isCompleted() + override def isFailed(): Boolean = taskContext.isFailed() + override def isInterrupted(): Boolean = taskContext.isInterrupted() override def addTaskCompletionListener(listener: TaskCompletionListener): this.type = { diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 0f8a10d734b..15ddd08fb4a 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -94,6 +94,11 @@ abstract class TaskContext extends Serializable { */ def isCompleted(): Boolean + /** + * Returns true if the task has failed. + */ + def isFailed(): Boolean + /** * Returns true if the task has been killed. */ diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index 8d2c2ab9bc4..a3c36de1515 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -275,6 +275,8 @@ private[spark] class TaskContextImpl( @GuardedBy("this") override def isCompleted(): Boolean = synchronized(completed) + override def isFailed(): Boolean = synchronized(failureCauseOpt.isDefined) + override def isInterrupted(): Boolean = reasonIfKilled.isDefined override def getLocalProperty(key: String): String = localProperties.getProperty(key) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index c56fd3fd1f5..9aba41cea21 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++
(spark) branch branch-3.5 updated: [SPARK-46464][DOC] Fix the scroll issue of tables when overflow
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 98042e34796 [SPARK-46464][DOC] Fix the scroll issue of tables when overflow 98042e34796 is described below commit 98042e34796ec8d83071256142f8e121f50ad1f4 Author: Kent Yao AuthorDate: Fri Dec 22 11:45:10 2023 +0800 [SPARK-46464][DOC] Fix the scroll issue of tables when overflow ### What changes were proposed in this pull request? https://spark.apache.org/docs/3.4.1/running-on-kubernetes.html#spark-properties https://spark.apache.org/docs/latest/running-on-kubernetes.html#spark-properties As listed above, the doc content in 3.5.0 cannot scroll horizontally. Users can only see the rest of its content when a table overflows if they zoom out as much as possible, resulting in hard-to-read minor characters. This PR changes the HTML body overflow-x from hidden to auto to enable the underlying table to scroll horizontally. ### Why are the changes needed? Fix documentation ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Before ![image](https://github.com/apache/spark/assets/8326978/437bee91-ab0d-4616-aaaf-f99171dcf9f9) After ![image](https://github.com/apache/spark/assets/8326978/327ed82b-3e14-4a27-be1a-835a7b21c000) ### Was this patch authored or co-authored using generative AI tooling? no Closes #44423 from yaooqinn/SPARK-46464. Authored-by: Kent Yao Signed-off-by: Kent Yao (cherry picked from commit fc7d7bce7732a2bccb3a7ccf3ed6bed4ac65f8fc) Signed-off-by: Kent Yao --- docs/css/custom.css | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/css/custom.css b/docs/css/custom.css index e7416d9ded6..1239c0ed440 100644 --- a/docs/css/custom.css +++ b/docs/css/custom.css @@ -7,7 +7,7 @@ body { font-style: normal; font-weight: 400; overflow-wrap: anywhere; - overflow-x: hidden; + overflow-x: auto; padding-top: 80px; } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (6b931530d75 -> fc7d7bce773)
This is an automated email from the ASF dual-hosted git repository. yao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 6b931530d75 [SPARK-46472][PYTHON][DOCS] Refine docstring of `array_prepend/array_append/array_insert` add fc7d7bce773 [SPARK-46464][DOC] Fix the scroll issue of tables when overflow No new revisions were added by this update. Summary of changes: docs/css/custom.css | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46472][PYTHON][DOCS] Refine docstring of `array_prepend/array_append/array_insert`
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6b931530d75 [SPARK-46472][PYTHON][DOCS] Refine docstring of `array_prepend/array_append/array_insert` 6b931530d75 is described below commit 6b931530d75cb4f00236f9c6283de8ef450963ad Author: yangjie01 AuthorDate: Fri Dec 22 11:01:05 2023 +0800 [SPARK-46472][PYTHON][DOCS] Refine docstring of `array_prepend/array_append/array_insert` ### What changes were proposed in this pull request? This pr refine docstring of `array_prepend/array_append/array_insert` and add some new examples. ### Why are the changes needed? To improve PySpark documentation ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass Github Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #44436 from LuciferYang/SPARK-46472. Authored-by: yangjie01 Signed-off-by: yangjie01 --- python/pyspark/sql/functions/builtin.py | 217 1 file changed, 191 insertions(+), 26 deletions(-) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 54a91792404..571572df30a 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -12875,9 +12875,8 @@ def get(col: "ColumnOrName", index: Union["ColumnOrName", int]) -> Column: @_try_remote_functions def array_prepend(col: "ColumnOrName", value: Any) -> Column: """ -Collection function: Returns an array containing element as -well as all elements from array. The new element is positioned -at the beginning of the array. +Array function: Returns an array containing the given element as +the first element and the rest of the elements from the original array. .. versionadded:: 3.5.0 @@ -12891,13 +12890,72 @@ def array_prepend(col: "ColumnOrName", value: Any) -> Column: Returns --- :class:`~pyspark.sql.Column` -an array excluding given value. +an array with the given value prepended. Examples ->>> df = spark.createDataFrame([([2, 3, 4],), ([],)], ['data']) ->>> df.select(array_prepend(df.data, 1)).collect() -[Row(array_prepend(data, 1)=[1, 2, 3, 4]), Row(array_prepend(data, 1)=[1])] +Example 1: Prepending a column value to an array column + +>>> from pyspark.sql import Row, functions as sf +>>> df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2="c")]) +>>> df.select(sf.array_prepend(df.c1, df.c2)).show() ++-+ +|array_prepend(c1, c2)| ++-+ +| [c, b, a, c]| ++-+ + +Example 2: Prepending a numeric value to an array column + +>>> from pyspark.sql import functions as sf +>>> df = spark.createDataFrame([([1, 2, 3],)], ['data']) +>>> df.select(sf.array_prepend(df.data, 4)).show() ++--+ +|array_prepend(data, 4)| ++--+ +| [4, 1, 2, 3]| ++--+ + +Example 3: Prepending a null value to an array column + +>>> from pyspark.sql import functions as sf +>>> df = spark.createDataFrame([([1, 2, 3],)], ['data']) +>>> df.select(sf.array_prepend(df.data, None)).show() ++-+ +|array_prepend(data, NULL)| ++-+ +| [NULL, 1, 2, 3]| ++-+ + +Example 4: Prepending a value to a NULL array column + +>>> from pyspark.sql import functions as sf +>>> from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField +>>> schema = StructType([ +... StructField("data", ArrayType(IntegerType()), True) +... ]) +>>> df = spark.createDataFrame([(None,)], schema=schema) +>>> df.select(sf.array_prepend(df.data, 4)).show() ++--+ +|array_prepend(data, 4)| ++--+ +| NULL| ++--+ + +Example 5: Prepending a value to an empty array + +>>> from pyspark.sql import functions as sf +>>> from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField +>>> schema = StructType([ +... StructField("data", ArrayType(IntegerType()), True) +... ]) +>>> df = spark.createDataFrame([([],)], schema=schema) +>>> df.select(sf.array_prepend(df.data, 1)).show() ++--+ +|array_prepend(data, 1)| ++--+ +| [1]| ++--+ """ return _invoke_function_over_columns("array_prepend", col, lit(value)) @@ -12965,7 +13023,7 @@ def
(spark) branch branch-3.5 updated: [SPARK-46443][SQL] Decimal precision and scale should decided by H2 dialect
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 286c469ad13 [SPARK-46443][SQL] Decimal precision and scale should decided by H2 dialect 286c469ad13 is described below commit 286c469ad1305f91ea796fd453ae896617fb3883 Author: Jiaan Geng AuthorDate: Fri Dec 22 09:55:00 2023 +0800 [SPARK-46443][SQL] Decimal precision and scale should decided by H2 dialect ### What changes were proposed in this pull request? This PR fix a but by make JDBC dialect decide the decimal precision and scale. **How to reproduce the bug?** https://github.com/apache/spark/pull/44397 proposed DS V2 push down `PERCENTILE_CONT` and `PERCENTILE_DISC`. The bug fired when pushdown the below SQL to H2 JDBC. `SELECT "DEPT",PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "SALARY" ASC NULLS FIRST) FROM "test"."employee" WHERE 1=0 GROUP BY "DEPT"` **The root cause** `getQueryOutputSchema` used to get the output schema of query by call `JdbcUtils.getSchema`. The query for database H2 show below. `SELECT "DEPT",PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "SALARY" ASC NULLS FIRST) FROM "test"."employee" WHERE 1=0 GROUP BY "DEPT"` We can get the five variables from `ResultSetMetaData`, please refer: ``` columnName = "PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY SALARY NULLS FIRST)" dataType = 2 typeName = "NUMERIC" fieldSize = 10 fieldScale = 5 ``` Then we get the catalyst schema with `JdbcUtils.getCatalystType`, it calls `DecimalType.bounded(precision, scale)` actually. The `DecimalType.bounded(10, 5)` returns `DecimalType(38, 38)`. At finally, `makeGetter` throws exception. ``` Caused by: org.apache.spark.SparkArithmeticException: [DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION] Decimal precision 42 exceeds max precision 38. SQLSTATE: 22003 at org.apache.spark.sql.errors.DataTypeErrors$.decimalPrecisionExceedsMaxPrecisionError(DataTypeErrors.scala:48) at org.apache.spark.sql.types.Decimal.set(Decimal.scala:124) at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:577) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$4(JdbcUtils.scala:408) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.nullSafeConvert(JdbcUtils.scala:552) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3(JdbcUtils.scala:408) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3$adapted(JdbcUtils.scala:406) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:358) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:339) ``` ### Why are the changes needed? This PR fix the bug that `JdbcUtils` can't get the correct decimal type. ### Does this PR introduce _any_ user-facing change? 'Yes'. Fix a bug. ### How was this patch tested? Manual tests in https://github.com/apache/spark/pull/44397 ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #44398 from beliefer/SPARK-46443. Authored-by: Jiaan Geng Signed-off-by: Wenchen Fan (cherry picked from commit a921da8509a19b2d23c30ad657725f760932236c) Signed-off-by: Wenchen Fan --- .../main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala | 16 +++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 8471a49153f..3f56eb035f5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType} +import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, MetadataBuilder, ShortType, StringType} private[sql] object H2Dialect extends JdbcDialect { override def canHandle(url: String): Boolean = @@ -57,6 +57,20 @@ private[sql] object H2Dialect extends JdbcDialect { override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) + override def
(spark) branch master updated: [SPARK-46443][SQL] Decimal precision and scale should decided by H2 dialect
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a921da8509a [SPARK-46443][SQL] Decimal precision and scale should decided by H2 dialect a921da8509a is described below commit a921da8509a19b2d23c30ad657725f760932236c Author: Jiaan Geng AuthorDate: Fri Dec 22 09:55:00 2023 +0800 [SPARK-46443][SQL] Decimal precision and scale should decided by H2 dialect ### What changes were proposed in this pull request? This PR fix a but by make JDBC dialect decide the decimal precision and scale. **How to reproduce the bug?** https://github.com/apache/spark/pull/44397 proposed DS V2 push down `PERCENTILE_CONT` and `PERCENTILE_DISC`. The bug fired when pushdown the below SQL to H2 JDBC. `SELECT "DEPT",PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "SALARY" ASC NULLS FIRST) FROM "test"."employee" WHERE 1=0 GROUP BY "DEPT"` **The root cause** `getQueryOutputSchema` used to get the output schema of query by call `JdbcUtils.getSchema`. The query for database H2 show below. `SELECT "DEPT",PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "SALARY" ASC NULLS FIRST) FROM "test"."employee" WHERE 1=0 GROUP BY "DEPT"` We can get the five variables from `ResultSetMetaData`, please refer: ``` columnName = "PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY SALARY NULLS FIRST)" dataType = 2 typeName = "NUMERIC" fieldSize = 10 fieldScale = 5 ``` Then we get the catalyst schema with `JdbcUtils.getCatalystType`, it calls `DecimalType.bounded(precision, scale)` actually. The `DecimalType.bounded(10, 5)` returns `DecimalType(38, 38)`. At finally, `makeGetter` throws exception. ``` Caused by: org.apache.spark.SparkArithmeticException: [DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION] Decimal precision 42 exceeds max precision 38. SQLSTATE: 22003 at org.apache.spark.sql.errors.DataTypeErrors$.decimalPrecisionExceedsMaxPrecisionError(DataTypeErrors.scala:48) at org.apache.spark.sql.types.Decimal.set(Decimal.scala:124) at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:577) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$4(JdbcUtils.scala:408) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.nullSafeConvert(JdbcUtils.scala:552) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3(JdbcUtils.scala:408) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3$adapted(JdbcUtils.scala:406) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:358) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:339) ``` ### Why are the changes needed? This PR fix the bug that `JdbcUtils` can't get the correct decimal type. ### Does this PR introduce _any_ user-facing change? 'Yes'. Fix a bug. ### How was this patch tested? Manual tests in https://github.com/apache/spark/pull/44397 ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #44398 from beliefer/SPARK-46443. Authored-by: Jiaan Geng Signed-off-by: Wenchen Fan --- .../main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala | 16 +++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala index 3c9bc0ed691..c3b4092c8e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, ShortType, StringType} +import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DecimalType, MetadataBuilder, ShortType, StringType} private[sql] object H2Dialect extends JdbcDialect { override def canHandle(url: String): Boolean = @@ -57,6 +57,20 @@ private[sql] object H2Dialect extends JdbcDialect { override def isSupportedFunction(funcName: String): Boolean = supportedFunctions.contains(funcName) + override def getCatalystType( + sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { +
(spark) branch master updated: [SPARK-46380][SQL][FOLLOWUP] Simplify the code for ResolveInlineTables and ResolveInlineTablesSuite
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8c63485189c [SPARK-46380][SQL][FOLLOWUP] Simplify the code for ResolveInlineTables and ResolveInlineTablesSuite 8c63485189c is described below commit 8c63485189c87fd0a11b57c1a4c8ffa517f5f64e Author: Jiaan Geng AuthorDate: Fri Dec 22 09:49:55 2023 +0800 [SPARK-46380][SQL][FOLLOWUP] Simplify the code for ResolveInlineTables and ResolveInlineTablesSuite ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/44316 replace current time/date prior to evaluating inline table expressions. This PR propose to simplify the code for `ResolveInlineTables` and let `ResolveInlineTablesSuite` apply the rule `ResolveInlineTables`. ### Why are the changes needed? Simplify the code for `ResolveInlineTables` and `ResolveInlineTablesSuite`. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? Test cases updated. GA tests. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #7 from beliefer/SPARK-46380_followup. Authored-by: Jiaan Geng Signed-off-by: Wenchen Fan --- .../catalyst/analysis/ResolveInlineTables.scala| 28 +- .../analysis/ResolveInlineTablesSuite.scala| 12 +- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index 73600f5c706..811e02b4d97 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -95,30 +95,24 @@ object ResolveInlineTables extends Rule[LogicalPlan] private[analysis] def findCommonTypesAndCast(table: UnresolvedInlineTable): ResolvedInlineTable = { // For each column, traverse all the values and find a common data type and nullability. -val fields = table.rows.transpose.zip(table.names).map { case (column, name) => +val (fields, columns) = table.rows.transpose.zip(table.names).map { case (column, name) => val inputTypes = column.map(_.dataType) val tpe = TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse { table.failAnalysis( errorClass = "INVALID_INLINE_TABLE.INCOMPATIBLE_TYPES_IN_INLINE_TABLE", messageParameters = Map("colName" -> toSQLId(name))) } - StructField(name, tpe, nullable = column.exists(_.nullable)) -} -val attributes = DataTypeUtils.toAttributes(StructType(fields)) -assert(fields.size == table.names.size) - -val castedRows: Seq[Seq[Expression]] = table.rows.map { row => - row.zipWithIndex.map { -case (e, ci) => - val targetType = fields(ci).dataType - val castedExpr = if (DataTypeUtils.sameType(e.dataType, targetType)) { -e - } else { -cast(e, targetType) - } - castedExpr + val newColumn = column.map { +case expr if DataTypeUtils.sameType(expr.dataType, tpe) => + expr +case expr => + cast(expr, tpe) } -} + (StructField(name, tpe, nullable = column.exists(_.nullable)), newColumn) +}.unzip +assert(fields.size == table.names.size) +val attributes = DataTypeUtils.toAttributes(StructType(fields)) +val castedRows: Seq[Seq[Expression]] = columns.transpose ResolvedInlineTable(castedRows, attributes) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala index 758b6b73e4e..3e014d1c11d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala @@ -86,8 +86,9 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter { test("cast and execute") { val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L -val resolved = ResolveInlineTables.findCommonTypesAndCast(table) -val converted = ResolveInlineTables.earlyEvalIfPossible(resolved).asInstanceOf[LocalRelation] +val resolved = ResolveInlineTables(table) +assert(resolved.isInstanceOf[LocalRelation]) +val converted = resolved.asInstanceOf[LocalRelation] assert(converted.output.map(_.dataType) == Seq(LongType))
(spark) branch master updated: [SPARK-46468][SQL] Handle COUNT bug for EXISTS subqueries with Aggregate without grouping keys
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3432fd8dba5 [SPARK-46468][SQL] Handle COUNT bug for EXISTS subqueries with Aggregate without grouping keys 3432fd8dba5 is described below commit 3432fd8dba5bec623b14a4ec4306290eced6c93c Author: Andrey Gubichev AuthorDate: Fri Dec 22 09:32:22 2023 +0800 [SPARK-46468][SQL] Handle COUNT bug for EXISTS subqueries with Aggregate without grouping keys ### What changes were proposed in this pull request? As Aggregates with no grouping keys always return 1 row (can be NULL), an EXISTs over such subquery should always return true. This reverts some changes done when we migrated EXISTS/IN to DecorrelateInnerQuery framework, in particular the static detection of potential count bug aggregates is removed (just having an empty grouping key should trigger the count bug treatment now; scalar subqueries still have extra checks that are evaluating the aggregate on an empty input). I suspect the same correctness problem was present in the legacy framework (added one test in the legacy section of exists-count-bug.sql) ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Query tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #44451 from agubichev/SPARK-46468_count. Authored-by: Andrey Gubichev Signed-off-by: Wenchen Fan --- .../catalyst/optimizer/DecorrelateInnerQuery.scala | 22 +- .../exists-subquery/exists-aggregate.sql.out | 29 ++ .../exists-subquery/exists-count-bug.sql.out | 34 ++ .../subquery/exists-subquery/exists-aggregate.sql | 9 ++ .../subquery/exists-subquery/exists-count-bug.sql | 5 .../sql-tests/results/join-lateral.sql.out | 1 + .../exists-subquery/exists-aggregate.sql.out | 22 ++ .../exists-subquery/exists-count-bug.sql.out | 17 +++ 8 files changed, 118 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala index feb01d1ce3f..eca392fd84c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala @@ -22,7 +22,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.SubExprUtils._ -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE @@ -462,22 +461,6 @@ object DecorrelateInnerQuery extends PredicateHelper { p.mapChildren(rewriteDomainJoins(outerPlan, _, conditions)) } - private def isCountBugFree(aggregateExpressions: Seq[NamedExpression]): Boolean = { -// The COUNT bug only appears if an aggregate expression returns a non-NULL result on an empty -// input. -// Typical example (hence the name) is COUNT(*) that returns 0 from an empty result. -// However, SUM(x) IS NULL is another case that returns 0, and in general any IS/NOT IS and CASE -// expressions are suspect (and the combination of those). -// For now we conservatively accept only those expressions that are guaranteed to be safe. -aggregateExpressions.forall { - case _ : AttributeReference => true - case Alias(_: AttributeReference, _) => true - case Alias(_: Literal, _) => true - case Alias(a: AggregateExpression, _) if a.aggregateFunction.defaultResult == None => true - case _ => false -} - } - def apply( innerPlan: LogicalPlan, outerPlan: LogicalPlan, @@ -727,8 +710,6 @@ object DecorrelateInnerQuery extends PredicateHelper { case a @ Aggregate(groupingExpressions, aggregateExpressions, child) => val outerReferences = collectOuterReferences(a.expressions) val newOuterReferences = parentOuterReferences ++ outerReferences -val countBugSusceptible = groupingExpressions.isEmpty && - !isCountBugFree(aggregateExpressions) val (newChild, joinCond, outerReferenceMap) = decorrelate(child, newOuterReferences, aggregated = true, underSetOp) // Replace all outer references in grouping and aggregate expressions, and
(spark) branch master updated: [SPARK-45525][SQL][PYTHON] Support for Python data source write using DSv2
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4fcd5bfe003 [SPARK-45525][SQL][PYTHON] Support for Python data source write using DSv2 4fcd5bfe003 is described below commit 4fcd5bfe003bb546ca888efaf1d39c15c9685673 Author: allisonwang-db AuthorDate: Fri Dec 22 09:28:47 2023 +0800 [SPARK-45525][SQL][PYTHON] Support for Python data source write using DSv2 ### What changes were proposed in this pull request? This PR adds initial support for Python data source write by implementing the DSv2 `SupportsWrite` interface for `PythonTableProvider`. Note this PR only supports the `def write(self, iterator)` API. `commit` and `abort` will be supported in [SPARK-45914](https://issues.apache.org/jira/browse/SPARK-45914). ### Why are the changes needed? To support Python data source APIs. For instance: ```python class SimpleWriter(DataSourceWriter): def write(self, iterator: Iterator[Row]) -> WriterCommitMessage: for row in iterator: print(row) return WriterCommitMessage() class SimpleDataSource(DataSource): def writer(self, schema, overwrite): return SimpleWriter() # Regsiter the Python data source spark.dataSource.register(SimpleDataSource) df.range(10).write.format("SimpleDataSource").mode("append").save() ``` ### Does this PR introduce _any_ user-facing change? Yes, this PR supports writing data into a Python data source. ### How was this patch tested? New unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #43791 from allisonwang-db/spark-45525-data-source-write. Authored-by: allisonwang-db Signed-off-by: Wenchen Fan --- .../src/main/resources/error/error-classes.json| 6 + docs/sql-error-conditions.md | 6 + python/pyspark/errors/error_classes.py | 5 + python/pyspark/sql/tests/test_python_datasource.py | 36 ++- .../pyspark/sql/worker/write_into_data_source.py | 233 ++ .../spark/sql/errors/QueryExecutionErrors.scala| 6 + .../python/UserDefinedPythonDataSource.scala | 269 + .../execution/python/PythonDataSourceSuite.scala | 95 8 files changed, 612 insertions(+), 44 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index df223f3298e..8970045d4ab 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -2513,6 +2513,12 @@ ], "sqlState" : "42601" }, + "INVALID_WRITER_COMMIT_MESSAGE" : { +"message" : [ + "The data source writer has generated an invalid number of commit messages. Expected exactly one writer commit message from each task, but received ." +], +"sqlState" : "42KDE" + }, "INVALID_WRITE_DISTRIBUTION" : { "message" : [ "The requested write distribution is invalid." diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index a1af6863913..0722cae5815 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -1398,6 +1398,12 @@ Rewrite the query to avoid window functions, aggregate functions, and generator Cannot specify ORDER BY or a window frame for ``. +### INVALID_WRITER_COMMIT_MESSAGE + +[SQLSTATE: 42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) + +The data source writer has generated an invalid number of commit messages. Expected exactly one writer commit message from each task, but received ``. + ### [INVALID_WRITE_DISTRIBUTION](sql-error-conditions-invalid-write-distribution-error-class.html) [SQLSTATE: 42000](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/python/pyspark/errors/error_classes.py b/python/pyspark/errors/error_classes.py index bb278481262..2200b73dffc 100644 --- a/python/pyspark/errors/error_classes.py +++ b/python/pyspark/errors/error_classes.py @@ -772,6 +772,11 @@ ERROR_CLASSES_JSON = """ "Expected , but got ." ] }, + "PYTHON_DATA_SOURCE_WRITE_ERROR" : { +"message" : [ + "Unable to write to the Python data source: ." +] + }, "PYTHON_HASH_SEED_NOT_SET" : { "message" : [ "Randomness of hash of string should be disabled via PYTHONHASHSEED." diff --git a/python/pyspark/sql/tests/test_python_datasource.py b/python/pyspark/sql/tests/test_python_datasource.py index 74ef6a87458..b1bba584d85 100644 --- a/python/pyspark/sql/tests/test_python_datasource.py +++
(spark) branch master updated: [SPARK-46437][DOCS] Remove cruft from the built-in SQL functions documentation
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e4b5977c9b7 [SPARK-46437][DOCS] Remove cruft from the built-in SQL functions documentation e4b5977c9b7 is described below commit e4b5977c9b7b808d32a6370dccc33eaeb235085e Author: Nicholas Chammas AuthorDate: Fri Dec 22 09:54:13 2023 +0900 [SPARK-46437][DOCS] Remove cruft from the built-in SQL functions documentation ### What changes were proposed in this pull request? - Remove a bunch of Liquid directives that are not necessary. - Add a table of contents to the built-in SQL functions page. - Move the generated HTML for built-in SQL functions to a subdirectory. ### Why are the changes needed? To reduce confusion for maintainers. ### Does this PR introduce _any_ user-facing change? Yes. It adds a table of contents and change the heading style of the examples. Otherwise, the generated docs are identical. ### How was this patch tested? I built Spark, ran `./sql/create-docs.sh`, and reviewed the generated docs in my browser. The page is too long to screenshot completely, but here are a couple of screenshots. https://github.com/apache/spark/assets/1039369/b285d8a2-6eab-488d-9e28-2fdc9cc833a9;> https://github.com/apache/spark/assets/1039369/2f9670bc-773a-48a8-a0d0-54206b8a4887;> ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44393 from nchammas/sql-builtin-funcs-cruft. Authored-by: Nicholas Chammas Signed-off-by: Hyukjin Kwon --- docs/.gitignore | 1 + docs/sql-ref-functions-builtin.md | 281 ++ docs/sql-ref-functions.md | 2 +- sql/gen-sql-functions-docs.py | 15 +- 4 files changed, 113 insertions(+), 186 deletions(-) diff --git a/docs/.gitignore b/docs/.gitignore index 9df83f37815..bcfdcbf5dcc 100644 --- a/docs/.gitignore +++ b/docs/.gitignore @@ -1 +1,2 @@ generated-*.html +_generated_function_html/ diff --git a/docs/sql-ref-functions-builtin.md b/docs/sql-ref-functions-builtin.md index 0ff1432fabf..88ed309a883 100644 --- a/docs/sql-ref-functions-builtin.md +++ b/docs/sql-ref-functions-builtin.md @@ -17,202 +17,125 @@ license: | limitations under the License. --- -{% for static_file in site.static_files %} -{% if static_file.name == 'generated-agg-funcs-table.html' %} +* Table of contents +{:toc} + ### Aggregate Functions -{% include_relative generated-agg-funcs-table.html %} - Examples -{% include_relative generated-agg-funcs-examples.html %} -{% break %} -{% endif %} -{% endfor %} - -{% for static_file in site.static_files %} -{% if static_file.name == 'generated-window-funcs-table.html' %} +{% include_relative _generated_function_html/agg-funcs-table.html %} + +**Examples** +{% include_relative _generated_function_html/agg-funcs-examples.html %} + ### Window Functions -{% include_relative generated-window-funcs-table.html %} - Examples -{% include_relative generated-window-funcs-examples.html %} -{% break %} -{% endif %} -{% endfor %} - -{% for static_file in site.static_files %} -{% if static_file.name == 'generated-array-funcs-table.html' %} +{% include_relative _generated_function_html/window-funcs-table.html %} + +**Examples** +{% include_relative _generated_function_html/window-funcs-examples.html %} + ### Array Functions -{% include_relative generated-array-funcs-table.html %} - Examples -{% include_relative generated-array-funcs-examples.html %} -{% break %} -{% endif %} -{% endfor %} - -{% for static_file in site.static_files %} -{% if static_file.name == 'generated-collection-funcs-table.html' %} +{% include_relative _generated_function_html/array-funcs-table.html %} + +**Examples** +{% include_relative _generated_function_html/array-funcs-examples.html %} + ### Collection Functions -{% include_relative generated-collection-funcs-table.html %} - Examples -{% include_relative generated-collection-funcs-examples.html %} -{% break %} -{% endif %} -{% endfor %} - -{% for static_file in site.static_files %} -{% if static_file.name == 'generated-struct-funcs-table.html' %} +{% include_relative _generated_function_html/collection-funcs-table.html %} + +**Examples** +{% include_relative _generated_function_html/collection-funcs-examples.html %} + ### STRUCT Functions -{% include_relative generated-struct-funcs-table.html %} - Examples -{% include_relative generated-struct-funcs-examples.html %} -{% break %} -{% endif %} -{% endfor %} - -{% for static_file in site.static_files %} -{% if static_file.name == 'generated-map-funcs-table.html' %} +{% include_relative
(spark) branch master updated: [SPARK-46471][PS][TESTS][FOLLOWUPS] Reorganize `OpsOnDiffFramesEnabledTests`: Factor out `test_arithmetic_chain_*`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new db162e5a139 [SPARK-46471][PS][TESTS][FOLLOWUPS] Reorganize `OpsOnDiffFramesEnabledTests`: Factor out `test_arithmetic_chain_*` db162e5a139 is described below commit db162e5a139d355264ce5c538687efa66e62c8c4 Author: Ruifeng Zheng AuthorDate: Fri Dec 22 08:57:52 2023 +0900 [SPARK-46471][PS][TESTS][FOLLOWUPS] Reorganize `OpsOnDiffFramesEnabledTests`: Factor out `test_arithmetic_chain_*` ### What changes were proposed in this pull request? Factor out `test_arithmetic_chain_*` ### Why are the changes needed? for testing parallelism ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #3 from zhengruifeng/ps_test_diff_ops_arithmetic_chain. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- dev/sparktestsupport/modules.py| 6 + .../test_parity_arithmetic_chain.py| 41 + .../test_parity_arithmetic_chain_ext.py| 41 + .../test_parity_arithmetic_chain_ext_float.py | 43 + .../tests/diff_frames_ops/test_arithmetic_chain.py | 189 + .../diff_frames_ops/test_arithmetic_chain_ext.py | 120 + .../test_arithmetic_chain_ext_float.py | 122 + .../pandas/tests/test_ops_on_diff_frames.py| 117 - 8 files changed, 562 insertions(+), 117 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 3e0b364ca84..47db204e2fa 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -867,6 +867,9 @@ pyspark_pandas_slow = Module( "pyspark.pandas.tests.diff_frames_ops.test_arithmetic", "pyspark.pandas.tests.diff_frames_ops.test_arithmetic_ext", "pyspark.pandas.tests.diff_frames_ops.test_arithmetic_ext_float", +"pyspark.pandas.tests.diff_frames_ops.test_arithmetic_chain", +"pyspark.pandas.tests.diff_frames_ops.test_arithmetic_chain_ext", +"pyspark.pandas.tests.diff_frames_ops.test_arithmetic_chain_ext_float", "pyspark.pandas.tests.diff_frames_ops.test_basic_slow", "pyspark.pandas.tests.diff_frames_ops.test_cov", "pyspark.pandas.tests.diff_frames_ops.test_corrwith", @@ -1229,6 +1232,9 @@ pyspark_pandas_connect_part3 = Module( "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic", "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_ext", "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_ext_float", + "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_chain", + "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_chain_ext", + "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_chain_ext_float", "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby", "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_aggregate", "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_apply", diff --git a/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_arithmetic_chain.py b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_arithmetic_chain.py new file mode 100644 index 000..d24a4a41d0b --- /dev/null +++ b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_arithmetic_chain.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from pyspark.pandas.tests.diff_frames_ops.test_arithmetic_chain import ArithmeticChainMixin +from pyspark.testing.connectutils import ReusedConnectTestCase +from pyspark.testing.pandasutils import PandasOnSparkTestUtils + + +class ArithmeticChainParityTests( +ArithmeticChainMixin, +PandasOnSparkTestUtils, +
(spark) branch master updated: [SPARK-46476][BUILD][CORE][CONNECT] Move `IvyTestUtils` back to `src/test` directory
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2723bb11e3c7 [SPARK-46476][BUILD][CORE][CONNECT] Move `IvyTestUtils` back to `src/test` directory 2723bb11e3c7 is described below commit 2723bb11e3c7b9e2252e26cca904e2e53aaa68fb Author: yangjie01 AuthorDate: Thu Dec 21 20:50:34 2023 +0800 [SPARK-46476][BUILD][CORE][CONNECT] Move `IvyTestUtils` back to `src/test` directory ### What changes were proposed in this pull request? This pr move `IvyTestUtils` back to `src/test` directory because it has been in the `src/test` directory before the refactoring work of https://github.com/apache/spark/pull/43354. Meanwhile, in order to make the `core` and `connect-client-jvm` module use `IvyTestUtils` in the tests, this pr has added the corresponding Maven dependencies in the respective `pom.xml` files. ### Why are the changes needed? Move `IvyTestUtils` back to `src/test` directory ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #0 from LuciferYang/mv-IvyTestUtils-to-test-dir. Authored-by: yangjie01 Signed-off-by: yangjie01 --- .../{main => test}/scala/org/apache/spark/util/IvyTestUtils.scala | 0 connector/connect/client/jvm/pom.xml | 7 +++ core/pom.xml | 7 +++ 3 files changed, 14 insertions(+) diff --git a/common/utils/src/main/scala/org/apache/spark/util/IvyTestUtils.scala b/common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala similarity index 100% rename from common/utils/src/main/scala/org/apache/spark/util/IvyTestUtils.scala rename to common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala diff --git a/connector/connect/client/jvm/pom.xml b/connector/connect/client/jvm/pom.xml index eb98e4203a96..8057a33df178 100644 --- a/connector/connect/client/jvm/pom.xml +++ b/connector/connect/client/jvm/pom.xml @@ -82,6 +82,13 @@ scalacheck_${scala.binary.version} test + + org.apache.spark + spark-common-utils_${scala.binary.version} + ${project.version} + tests + test + com.typesafe diff --git a/core/pom.xml b/core/pom.xml index f2d1f4eb144a..c093213bd6b9 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -451,6 +451,13 @@ tests test + + org.apache.spark + spark-common-utils_${scala.binary.version} + ${project.version} + tests + test +
(spark) branch master updated: [SPARK-46470][PS][TESTS] Move `test_series_datetime` to `pyspark.pandas.tests.series.*`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 87d22fd346ce [SPARK-46470][PS][TESTS] Move `test_series_datetime` to `pyspark.pandas.tests.series.*` 87d22fd346ce is described below commit 87d22fd346cefcda1c44f8f4c4e60d665143b2bb Author: Ruifeng Zheng AuthorDate: Thu Dec 21 17:53:39 2023 +0800 [SPARK-46470][PS][TESTS] Move `test_series_datetime` to `pyspark.pandas.tests.series.*` ### What changes were proposed in this pull request? Move `test_series_datetime` to `pyspark.pandas.tests.series.*` ### Why are the changes needed? move the test to the right place ### Does this PR introduce _any_ user-facing change? no, test only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #44434 from zhengruifeng/ps_test_mv_ser_ts. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- dev/sparktestsupport/modules.py | 4 ++-- .../test_parity_datetime.py} | 8 +--- .../tests/{test_series_datetime.py => series/test_datetime.py}| 8 ++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 7a5ac426dc7c..3e0b364ca846 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -770,7 +770,7 @@ pyspark_pandas = Module( "pyspark.pandas.tests.window.test_groupby_rolling_adv", "pyspark.pandas.tests.window.test_groupby_rolling_count", "pyspark.pandas.tests.test_scalars", -"pyspark.pandas.tests.test_series_datetime", +"pyspark.pandas.tests.series.test_datetime", "pyspark.pandas.tests.series.test_string_ops_adv", "pyspark.pandas.tests.series.test_string_ops_basic", "pyspark.pandas.tests.test_spark_functions", @@ -1049,7 +1049,7 @@ pyspark_pandas_connect_part0 = Module( "pyspark.pandas.tests.connect.resample.test_parity_on", "pyspark.pandas.tests.connect.resample.test_parity_timezone", "pyspark.pandas.tests.connect.test_parity_scalars", -"pyspark.pandas.tests.connect.test_parity_series_datetime", +"pyspark.pandas.tests.connect.series.test_parity_datetime", "pyspark.pandas.tests.connect.series.test_parity_string_ops_adv", "pyspark.pandas.tests.connect.series.test_parity_string_ops_basic", "pyspark.pandas.tests.connect.test_parity_spark_functions", diff --git a/python/pyspark/pandas/tests/connect/test_parity_series_datetime.py b/python/pyspark/pandas/tests/connect/series/test_parity_datetime.py similarity index 85% rename from python/pyspark/pandas/tests/connect/test_parity_series_datetime.py rename to python/pyspark/pandas/tests/connect/series/test_parity_datetime.py index 0842558d0e3f..89fa7b8ceef8 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_series_datetime.py +++ b/python/pyspark/pandas/tests/connect/series/test_parity_datetime.py @@ -16,19 +16,21 @@ # import unittest -from pyspark.pandas.tests.test_series_datetime import SeriesDateTimeTestsMixin +from pyspark.pandas.tests.series.test_datetime import SeriesDateTimeTestsMixin from pyspark.testing.connectutils import ReusedConnectTestCase from pyspark.testing.pandasutils import PandasOnSparkTestUtils class SeriesDateTimeParityTests( -SeriesDateTimeTestsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase +SeriesDateTimeTestsMixin, +PandasOnSparkTestUtils, +ReusedConnectTestCase, ): pass if __name__ == "__main__": -from pyspark.pandas.tests.connect.test_parity_series_datetime import * # noqa: F401 +from pyspark.pandas.tests.connect.series.test_parity_datetime import * # noqa: F401 try: import xmlrunner diff --git a/python/pyspark/pandas/tests/test_series_datetime.py b/python/pyspark/pandas/tests/series/test_datetime.py similarity index 98% rename from python/pyspark/pandas/tests/test_series_datetime.py rename to python/pyspark/pandas/tests/series/test_datetime.py index 89d4b70e0b51..e203da29f497 100644 --- a/python/pyspark/pandas/tests/test_series_datetime.py +++ b/python/pyspark/pandas/tests/series/test_datetime.py @@ -280,12 +280,16 @@ class SeriesDateTimeTestsMixin: ) -class SeriesDateTimeTests(SeriesDateTimeTestsMixin, PandasOnSparkTestCase, SQLTestUtils): +class SeriesDateTimeTests( +SeriesDateTimeTestsMixin, +PandasOnSparkTestCase, +SQLTestUtils, +): pass if __name__ == "__main__": -from pyspark.pandas.tests.test_series_datetime import * # noqa: F401 +from pyspark.pandas.tests.series.test_datetime import * # noqa: F401 try: import xmlrunner
(spark) branch master updated: [SPARK-46466][SQL] Vectorized parquet reader should never do rebase for timestamp ntz
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4d21e5547580 [SPARK-46466][SQL] Vectorized parquet reader should never do rebase for timestamp ntz 4d21e5547580 is described below commit 4d21e55475807a089979cffb54076bcb3ae9c02d Author: Wenchen Fan AuthorDate: Thu Dec 21 12:42:19 2023 +0300 [SPARK-46466][SQL] Vectorized parquet reader should never do rebase for timestamp ntz ### What changes were proposed in this pull request? This fixes a correctness bug. The TIMESTAMP_NTZ is a new data type in Spark and has no legacy files that need to do calendar rebase. However, the vectorized parquet reader treat it the same as LTZ and may do rebase if the parquet file was written with the legacy rebase mode. This PR fixes it to never do rebase for NTZ. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? Yes, now we can correctly write and read back NTZ value even if the date is before 1582. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? No Closes #44428 from cloud-fan/ntz. Lead-authored-by: Wenchen Fan Co-authored-by: Wenchen Fan Signed-off-by: Max Gekk --- .../parquet/ParquetVectorUpdaterFactory.java | 27 -- .../datasources/parquet/ParquetQuerySuite.scala| 12 ++ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 918f21716f45..31a1957b4fb9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -109,24 +109,32 @@ public class ParquetVectorUpdaterFactory { // For unsigned int64, it stores as plain signed int64 in Parquet when dictionary // fallbacks. We read them as decimal values. return new UnsignedLongUpdater(); -} else if (isTimestamp(sparkType) && +} else if (sparkType == DataTypes.TimestampType && isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { - validateTimestampType(sparkType); if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongUpdater(); } else { boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz); } -} else if (isTimestamp(sparkType) && +} else if (sparkType == DataTypes.TimestampType && isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { - validateTimestampType(sparkType); if ("CORRECTED".equals(datetimeRebaseMode)) { return new LongAsMicrosUpdater(); } else { final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); return new LongAsMicrosRebaseUpdater(failIfRebase, datetimeRebaseTz); } +} else if (sparkType == DataTypes.TimestampNTZType && + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) { + validateTimestampNTZType(); + // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase. + return new LongUpdater(); +} else if (sparkType == DataTypes.TimestampNTZType && + isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) { + validateTimestampNTZType(); + // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase. + return new LongAsMicrosUpdater(); } else if (sparkType instanceof DayTimeIntervalType) { return new LongUpdater(); } @@ -195,12 +203,11 @@ public class ParquetVectorUpdaterFactory { annotation.getUnit() == unit; } - void validateTimestampType(DataType sparkType) { + private void validateTimestampNTZType() { assert(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation); -// Throw an exception if the Parquet type is TimestampLTZ and the Catalyst type is TimestampNTZ. +// Throw an exception if the Parquet type is TimestampLTZ as the Catalyst type is TimestampNTZ. // This is to avoid mistakes in reading the timestamp values. -if (((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC() && - sparkType == DataTypes.TimestampNTZType) { +if (((TimestampLogicalTypeAnnotation)
(spark) branch master updated: [SPARK-46471][PS][TESTS] Reorganize `OpsOnDiffFramesEnabledTests`: Factor out `test_arithmetic_*`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c10b2c0fbaeb [SPARK-46471][PS][TESTS] Reorganize `OpsOnDiffFramesEnabledTests`: Factor out `test_arithmetic_*` c10b2c0fbaeb is described below commit c10b2c0fbaeb5a497599bb77c11577e78266904a Author: Ruifeng Zheng AuthorDate: Thu Dec 21 16:53:58 2023 +0800 [SPARK-46471][PS][TESTS] Reorganize `OpsOnDiffFramesEnabledTests`: Factor out `test_arithmetic_*` ### What changes were proposed in this pull request? Factor out `test_arithmetic_*` from `OpsOnDiffFramesEnabledTests` ### Why are the changes needed? `OpsOnDiffFramesEnabledTests` and its parity test are slow: ``` Starting test(python3.9): pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames (temp output: /__w/spark/spark/python/target/6b1d192e-052f-42d4-9023-04df84120fce/python3.9__pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames__gycsek91.log) Finished test(python3.9): pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames (740s) ``` break it into small tests to be more suitable for parallelism ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #44435 from zhengruifeng/ps_test_diff_ops_arithmetic. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- dev/sparktestsupport/modules.py| 6 + .../diff_frames_ops/test_parity_arithmetic.py | 41 ++ .../diff_frames_ops/test_parity_arithmetic_ext.py | 41 ++ .../test_parity_arithmetic_ext_float.py| 41 ++ .../tests/diff_frames_ops/test_arithmetic.py | 156 + .../tests/diff_frames_ops/test_arithmetic_ext.py | 99 + .../diff_frames_ops/test_arithmetic_ext_float.py | 99 + .../pandas/tests/test_ops_on_diff_frames.py| 89 8 files changed, 483 insertions(+), 89 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index cbd3b35c0015..7a5ac426dc7c 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -864,6 +864,9 @@ pyspark_pandas_slow = Module( "pyspark.pandas.tests.test_indexing", "pyspark.pandas.tests.test_ops_on_diff_frames", "pyspark.pandas.tests.diff_frames_ops.test_align", +"pyspark.pandas.tests.diff_frames_ops.test_arithmetic", +"pyspark.pandas.tests.diff_frames_ops.test_arithmetic_ext", +"pyspark.pandas.tests.diff_frames_ops.test_arithmetic_ext_float", "pyspark.pandas.tests.diff_frames_ops.test_basic_slow", "pyspark.pandas.tests.diff_frames_ops.test_cov", "pyspark.pandas.tests.diff_frames_ops.test_corrwith", @@ -1223,6 +1226,9 @@ pyspark_pandas_connect_part3 = Module( "pyspark.pandas.tests.connect.indexes.test_parity_datetime_property", "pyspark.pandas.tests.connect.indexes.test_parity_datetime_round", "pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames", +"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic", + "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_ext", + "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_ext_float", "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby", "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_aggregate", "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_apply", diff --git a/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_arithmetic.py b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_arithmetic.py new file mode 100644 index ..669d6ace2404 --- /dev/null +++ b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_arithmetic.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License.
(spark) branch master updated: [SPARK-46378][SQL][FOLLOWUP] Do not rely on TreeNodeTag in Project
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0e94f340a63a [SPARK-46378][SQL][FOLLOWUP] Do not rely on TreeNodeTag in Project 0e94f340a63a is described below commit 0e94f340a63af07f1b105c61e3f884993ee305e6 Author: Wenchen Fan AuthorDate: Thu Dec 21 16:23:25 2023 +0800 [SPARK-46378][SQL][FOLLOWUP] Do not rely on TreeNodeTag in Project ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/44310 . It turns out that `TreeNodeTag` in `Project` is way too fragile. `Project` is a very basic node and very easy to get removed/transformed during plan optimization. This PR switches to a different approach: since we can't retain the information (input data order doesn't matter) from `Aggregate`, let's leverage this information immediately. We pull out the expensive part of `EliminateSorts` to a new rule, so that we can safely call `EliminateSorts` right before we turn `Aggregate` into `Project`. ### Why are the changes needed? to make the optimizer more robust. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #44429 from cloud-fan/sort. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 35 .../catalyst/optimizer/RemoveRedundantSorts.scala | 62 ++ .../plans/logical/basicLogicalOperators.scala | 3 -- .../catalyst/optimizer/EliminateSortsSuite.scala | 3 +- .../datasources/V1WriteCommandSuite.scala | 54 --- 5 files changed, 111 insertions(+), 46 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 5a19c5e3c241..1a831b958ef2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -211,7 +211,8 @@ abstract class Optimizer(catalogManager: CatalogManager) Batch("Join Reorder", FixedPoint(1), CostBasedJoinReorder) :+ Batch("Eliminate Sorts", Once, - EliminateSorts) :+ + EliminateSorts, + RemoveRedundantSorts) :+ Batch("Decimal Optimizations", fixedPoint, DecimalAggregates) :+ // This batch must run after "Decimal Optimizations", as that one may change the @@ -771,11 +772,11 @@ object LimitPushDown extends Rule[LogicalPlan] { LocalLimit(exp, project.copy(child = pushLocalLimitThroughJoin(exp, join))) // Push down limit 1 through Aggregate and turn Aggregate into Project if it is group only. case Limit(le @ IntegerLiteral(1), a: Aggregate) if a.groupOnly => - val project = Project(a.aggregateExpressions, LocalLimit(le, a.child)) - project.setTagValue(Project.dataOrderIrrelevantTag, ()) - Limit(le, project) + val newAgg = EliminateSorts(a.copy(child = LocalLimit(le, a.child))).asInstanceOf[Aggregate] + Limit(le, Project(newAgg.aggregateExpressions, newAgg.child)) case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if a.groupOnly => - Limit(le, p.copy(child = Project(a.aggregateExpressions, LocalLimit(le, a.child + val newAgg = EliminateSorts(a.copy(child = LocalLimit(le, a.child))).asInstanceOf[Aggregate] + Limit(le, p.copy(child = Project(newAgg.aggregateExpressions, newAgg.child))) // Merge offset value and limit value into LocalLimit and pushes down LocalLimit through Offset. case LocalLimit(le, Offset(oe, grandChild)) => Offset(oe, LocalLimit(Add(le, oe), grandChild)) @@ -1557,38 +1558,30 @@ object CombineFilters extends Rule[LogicalPlan] with PredicateHelper { * Note that changes in the final output ordering may affect the file size (SPARK-32318). * This rule handles the following cases: * 1) if the sort order is empty or the sort order does not have any reference - * 2) if the Sort operator is a local sort and the child is already sorted - * 3) if there is another Sort operator separated by 0...n Project, Filter, Repartition or + * 2) if there is another Sort operator separated by 0...n Project, Filter, Repartition or *RepartitionByExpression, RebalancePartitions (with deterministic expressions) operators - * 4) if the Sort operator is within Join separated by 0...n Project, Filter, Repartition or + * 3) if the Sort operator is within Join separated by 0...n Project, Filter, Repartition or *
(spark) branch branch-3.5 updated: [SPARK-46380][SQL] Replace current time/date prior to evaluating inline table expressions
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new d7534a3ec1ea [SPARK-46380][SQL] Replace current time/date prior to evaluating inline table expressions d7534a3ec1ea is described below commit d7534a3ec1eab53bbd349f9ae31684337c734958 Author: Aleksandar Tomic AuthorDate: Thu Dec 21 15:58:15 2023 +0800 [SPARK-46380][SQL] Replace current time/date prior to evaluating inline table expressions With this PR proposal is to do inline table resolution in two phases: 1) If there are no expressions that depend on current context (e.g. expressions that depend on CURRENT_DATABASE, CURRENT_USER, CURRENT_TIME etc.) they will be evaluated as part of ResolveInlineTable rule. 2) Expressions that do depend on CURRENT_* evaluation will be kept as expressions and they evaluation will be delayed to post analysis phase. This PR aims to solve two problems with inline tables. Example1: ```sql SELECT COUNT(DISTINCT ct) FROM VALUES (CURRENT_TIMESTAMP()), (CURRENT_TIMESTAMP()), (CURRENT_TIMESTAMP()) as data(ct) ``` Prior to this change this example would return 3 (i.e. all CURRENT_TIMESTAMP expressions would return different value since they would be evaluated individually as part of inline table evaluation). After this change result is 1. Example 2: ```sql CREATE VIEW V as (SELECT * FROM VALUES(CURRENT_TIMESTAMP()) ``` In this example VIEW would be saved with literal evaluated during VIEW creation. After this change CURRENT_TIMESTAMP() will eval during VIEW execution. See section above. New test that validates this behaviour is introduced. No. Closes #44316 from dbatomic/inline_tables_curr_time_fix. Lead-authored-by: Aleksandar Tomic Co-authored-by: Aleksandar Tomic <150942779+dbato...@users.noreply.github.com> Signed-off-by: Wenchen Fan (cherry picked from commit 5fe963f8560ef05925d127e82ab7ef28d6a1d7bc) Signed-off-by: Wenchen Fan --- .../catalyst/analysis/ResolveInlineTables.scala| 68 -- .../spark/sql/catalyst/analysis/unresolved.scala | 15 + .../spark/sql/catalyst/optimizer/Optimizer.scala | 4 +- .../sql/catalyst/optimizer/finishAnalysis.scala| 33 +++ .../sql/catalyst/rules/RuleIdCollection.scala | 1 + .../spark/sql/catalyst/trees/TreePatterns.scala| 1 + .../analysis/ResolveInlineTablesSuite.scala| 31 -- .../analyzer-results/inline-table.sql.out | 16 - .../postgreSQL/create_view.sql.out | 2 +- .../resources/sql-tests/inputs/inline-table.sql| 6 ++ .../sql-tests/results/inline-table.sql.out | 16 + .../apache/spark/sql/execution/SQLViewSuite.scala | 14 + 12 files changed, 165 insertions(+), 42 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index 760ea466b857..73600f5c7064 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -17,28 +17,29 @@ package org.apache.spark.sql.catalyst.analysis -import scala.util.control.NonFatal - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper} -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper, Expression} +import org.apache.spark.sql.catalyst.optimizer.EvalInlineTables +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.AlwaysProcess +import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.TypeUtils.{toSQLExpr, toSQLId} import org.apache.spark.sql.types.{StructField, StructType} /** - * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[ResolvedInlineTable]]. */ object ResolveInlineTables extends Rule[LogicalPlan] with CastSupport with AliasHelper with EvalHelper { - override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( -AlwaysProcess.fn, ruleId) { -case table: UnresolvedInlineTable if table.expressionsResolved => - validateInputDimension(table) - validateInputEvaluable(table) - convert(table) + override def
(spark) branch master updated: [SPARK-46380][SQL] Replace current time/date prior to evaluating inline table expressions
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5fe963f8560e [SPARK-46380][SQL] Replace current time/date prior to evaluating inline table expressions 5fe963f8560e is described below commit 5fe963f8560ef05925d127e82ab7ef28d6a1d7bc Author: Aleksandar Tomic AuthorDate: Thu Dec 21 15:58:15 2023 +0800 [SPARK-46380][SQL] Replace current time/date prior to evaluating inline table expressions ### What changes were proposed in this pull request? With this PR proposal is to do inline table resolution in two phases: 1) If there are no expressions that depend on current context (e.g. expressions that depend on CURRENT_DATABASE, CURRENT_USER, CURRENT_TIME etc.) they will be evaluated as part of ResolveInlineTable rule. 2) Expressions that do depend on CURRENT_* evaluation will be kept as expressions and they evaluation will be delayed to post analysis phase. ### Why are the changes needed? This PR aims to solve two problems with inline tables. Example1: ```sql SELECT COUNT(DISTINCT ct) FROM VALUES (CURRENT_TIMESTAMP()), (CURRENT_TIMESTAMP()), (CURRENT_TIMESTAMP()) as data(ct) ``` Prior to this change this example would return 3 (i.e. all CURRENT_TIMESTAMP expressions would return different value since they would be evaluated individually as part of inline table evaluation). After this change result is 1. Example 2: ```sql CREATE VIEW V as (SELECT * FROM VALUES(CURRENT_TIMESTAMP()) ``` In this example VIEW would be saved with literal evaluated during VIEW creation. After this change CURRENT_TIMESTAMP() will eval during VIEW execution. ### Does this PR introduce _any_ user-facing change? See section above. ### How was this patch tested? New test that validates this behaviour is introduced. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44316 from dbatomic/inline_tables_curr_time_fix. Lead-authored-by: Aleksandar Tomic Co-authored-by: Aleksandar Tomic <150942779+dbato...@users.noreply.github.com> Signed-off-by: Wenchen Fan --- .../catalyst/analysis/ResolveInlineTables.scala| 68 -- .../spark/sql/catalyst/analysis/unresolved.scala | 15 + .../spark/sql/catalyst/optimizer/Optimizer.scala | 4 +- .../sql/catalyst/optimizer/finishAnalysis.scala| 34 ++- .../sql/catalyst/rules/RuleIdCollection.scala | 1 + .../spark/sql/catalyst/trees/TreePatterns.scala| 1 + .../analysis/ResolveInlineTablesSuite.scala| 31 -- .../analyzer-results/inline-table.sql.out | 16 - .../postgreSQL/create_view.sql.out | 2 +- .../resources/sql-tests/inputs/inline-table.sql| 6 ++ .../sql-tests/results/inline-table.sql.out | 16 + .../apache/spark/sql/execution/SQLViewSuite.scala | 14 + 12 files changed, 165 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index 760ea466b857..73600f5c7064 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -17,28 +17,29 @@ package org.apache.spark.sql.catalyst.analysis -import scala.util.control.NonFatal - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper} -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper, Expression} +import org.apache.spark.sql.catalyst.optimizer.EvalInlineTables +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.AlwaysProcess +import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.TypeUtils.{toSQLExpr, toSQLId} import org.apache.spark.sql.types.{StructField, StructType} /** - * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[ResolvedInlineTable]]. */ object ResolveInlineTables extends Rule[LogicalPlan] with CastSupport with AliasHelper with EvalHelper { - override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( -AlwaysProcess.fn, ruleId) { -case