(spark) branch master updated: [SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt

2023-12-21 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 43f79326106 [SPARK-46480][CORE][SQL] Fix NPE when table cache task 
attempt
43f79326106 is described below

commit 43f79326106acb277b9edfb28c34f5dc310b416b
Author: ulysses-you 
AuthorDate: Fri Dec 22 13:26:02 2023 +0800

[SPARK-46480][CORE][SQL] Fix NPE when table cache task attempt

### What changes were proposed in this pull request?

This pr adds a check: we only mark the cached partition is materialized if 
the task is not failed and not interrupted. And adds a new method `isFailed` in 
`TaskContext`.

### Why are the changes needed?

Before this pr, when do cache, task failure can cause NPE in other tasks

```
java.lang.NullPointerException
at java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.accessors1$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
 Source)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
```

### Does this PR introduce _any_ user-facing change?

yes, it's a bug fix

### How was this patch tested?

add test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #5 from ulysses-you/fix-cache.

Authored-by: ulysses-you 
Signed-off-by: Kent Yao 
---
 core/src/main/scala/org/apache/spark/BarrierTaskContext.scala  |  2 ++
 core/src/main/scala/org/apache/spark/TaskContext.scala |  5 +
 core/src/main/scala/org/apache/spark/TaskContextImpl.scala |  2 ++
 .../scala/org/apache/spark/scheduler/TaskContextSuite.scala| 10 ++
 project/MimaExcludes.scala |  4 +++-
 .../apache/spark/sql/execution/columnar/InMemoryRelation.scala |  8 +---
 6 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 0f9abaf94ae..50aff8b0fb1 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -194,6 +194,8 @@ class BarrierTaskContext private[spark] (
 
   override def isCompleted(): Boolean = taskContext.isCompleted()
 
+  override def isFailed(): Boolean = taskContext.isFailed()
+
   override def isInterrupted(): Boolean = taskContext.isInterrupted()
 
   override def addTaskCompletionListener(listener: TaskCompletionListener): 
this.type = {
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 0f8a10d734b..15ddd08fb4a 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -94,6 +94,11 @@ abstract class TaskContext extends Serializable {
*/
   def isCompleted(): Boolean
 
+  /**
+   * Returns true if the task has failed.
+   */
+  def isFailed(): Boolean
+
   /**
* Returns true if the task has been killed.
*/
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala 
b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 8d2c2ab9bc4..a3c36de1515 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -275,6 +275,8 @@ private[spark] class TaskContextImpl(
   @GuardedBy("this")
   override def isCompleted(): Boolean = synchronized(completed)
 
+  override def isFailed(): Boolean = synchronized(failureCauseOpt.isDefined)
+
   override def isInterrupted(): Boolean = reasonIfKilled.isDefined
 
   override def getLocalProperty(key: String): String = 
localProperties.getProperty(key)
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index c56fd3fd1f5..9aba41cea21 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ 

(spark) branch branch-3.5 updated: [SPARK-46464][DOC] Fix the scroll issue of tables when overflow

2023-12-21 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 98042e34796 [SPARK-46464][DOC] Fix the scroll issue of tables when 
overflow
98042e34796 is described below

commit 98042e34796ec8d83071256142f8e121f50ad1f4
Author: Kent Yao 
AuthorDate: Fri Dec 22 11:45:10 2023 +0800

[SPARK-46464][DOC] Fix the scroll issue of tables when overflow

### What changes were proposed in this pull request?


https://spark.apache.org/docs/3.4.1/running-on-kubernetes.html#spark-properties

https://spark.apache.org/docs/latest/running-on-kubernetes.html#spark-properties

As listed above, the doc content in 3.5.0 cannot scroll horizontally. Users 
can only see the rest of its content when a table overflows if they zoom out as 
much as possible, resulting in hard-to-read minor characters.

This PR changes the HTML body overflow-x from hidden to auto to enable the 
underlying table to scroll horizontally.

### Why are the changes needed?

Fix documentation

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

 Before

![image](https://github.com/apache/spark/assets/8326978/437bee91-ab0d-4616-aaaf-f99171dcf9f9)

 After

![image](https://github.com/apache/spark/assets/8326978/327ed82b-3e14-4a27-be1a-835a7b21c000)

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #44423 from yaooqinn/SPARK-46464.

Authored-by: Kent Yao 
Signed-off-by: Kent Yao 
(cherry picked from commit fc7d7bce7732a2bccb3a7ccf3ed6bed4ac65f8fc)
Signed-off-by: Kent Yao 
---
 docs/css/custom.css | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/css/custom.css b/docs/css/custom.css
index e7416d9ded6..1239c0ed440 100644
--- a/docs/css/custom.css
+++ b/docs/css/custom.css
@@ -7,7 +7,7 @@ body {
   font-style: normal;
   font-weight: 400;
   overflow-wrap: anywhere;
-  overflow-x: hidden;
+  overflow-x: auto;
   padding-top: 80px;
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated (6b931530d75 -> fc7d7bce773)

2023-12-21 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 6b931530d75 [SPARK-46472][PYTHON][DOCS] Refine docstring of 
`array_prepend/array_append/array_insert`
 add fc7d7bce773 [SPARK-46464][DOC] Fix the scroll issue of tables when 
overflow

No new revisions were added by this update.

Summary of changes:
 docs/css/custom.css | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46472][PYTHON][DOCS] Refine docstring of `array_prepend/array_append/array_insert`

2023-12-21 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6b931530d75 [SPARK-46472][PYTHON][DOCS] Refine docstring of 
`array_prepend/array_append/array_insert`
6b931530d75 is described below

commit 6b931530d75cb4f00236f9c6283de8ef450963ad
Author: yangjie01 
AuthorDate: Fri Dec 22 11:01:05 2023 +0800

[SPARK-46472][PYTHON][DOCS] Refine docstring of 
`array_prepend/array_append/array_insert`

### What changes were proposed in this pull request?
This pr refine docstring of `array_prepend/array_append/array_insert` and 
add some new examples.

### Why are the changes needed?
To improve PySpark documentation

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass Github Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44436 from LuciferYang/SPARK-46472.

Authored-by: yangjie01 
Signed-off-by: yangjie01 
---
 python/pyspark/sql/functions/builtin.py | 217 
 1 file changed, 191 insertions(+), 26 deletions(-)

diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index 54a91792404..571572df30a 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -12875,9 +12875,8 @@ def get(col: "ColumnOrName", index: 
Union["ColumnOrName", int]) -> Column:
 @_try_remote_functions
 def array_prepend(col: "ColumnOrName", value: Any) -> Column:
 """
-Collection function: Returns an array containing element as
-well as all elements from array. The new element is positioned
-at the beginning of the array.
+Array function: Returns an array containing the given element as
+the first element and the rest of the elements from the original array.
 
 .. versionadded:: 3.5.0
 
@@ -12891,13 +12890,72 @@ def array_prepend(col: "ColumnOrName", value: Any) -> 
Column:
 Returns
 ---
 :class:`~pyspark.sql.Column`
-an array excluding given value.
+an array with the given value prepended.
 
 Examples
 
->>> df = spark.createDataFrame([([2, 3, 4],), ([],)], ['data'])
->>> df.select(array_prepend(df.data, 1)).collect()
-[Row(array_prepend(data, 1)=[1, 2, 3, 4]), Row(array_prepend(data, 1)=[1])]
+Example 1: Prepending a column value to an array column
+
+>>> from pyspark.sql import Row, functions as sf
+>>> df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2="c")])
+>>> df.select(sf.array_prepend(df.c1, df.c2)).show()
++-+
+|array_prepend(c1, c2)|
++-+
+| [c, b, a, c]|
++-+
+
+Example 2: Prepending a numeric value to an array column
+
+>>> from pyspark.sql import functions as sf
+>>> df = spark.createDataFrame([([1, 2, 3],)], ['data'])
+>>> df.select(sf.array_prepend(df.data, 4)).show()
++--+
+|array_prepend(data, 4)|
++--+
+|  [4, 1, 2, 3]|
++--+
+
+Example 3: Prepending a null value to an array column
+
+>>> from pyspark.sql import functions as sf
+>>> df = spark.createDataFrame([([1, 2, 3],)], ['data'])
+>>> df.select(sf.array_prepend(df.data, None)).show()
++-+
+|array_prepend(data, NULL)|
++-+
+|  [NULL, 1, 2, 3]|
++-+
+
+Example 4: Prepending a value to a NULL array column
+
+>>> from pyspark.sql import functions as sf
+>>> from pyspark.sql.types import ArrayType, IntegerType, StructType, 
StructField
+>>> schema = StructType([
+...   StructField("data", ArrayType(IntegerType()), True)
+... ])
+>>> df = spark.createDataFrame([(None,)], schema=schema)
+>>> df.select(sf.array_prepend(df.data, 4)).show()
++--+
+|array_prepend(data, 4)|
++--+
+|  NULL|
++--+
+
+Example 5: Prepending a value to an empty array
+
+>>> from pyspark.sql import functions as sf
+>>> from pyspark.sql.types import ArrayType, IntegerType, StructType, 
StructField
+>>> schema = StructType([
+...   StructField("data", ArrayType(IntegerType()), True)
+... ])
+>>> df = spark.createDataFrame([([],)], schema=schema)
+>>> df.select(sf.array_prepend(df.data, 1)).show()
++--+
+|array_prepend(data, 1)|
++--+
+|   [1]|
++--+
 """
 return _invoke_function_over_columns("array_prepend", col, lit(value))
 
@@ -12965,7 +13023,7 @@ def 

(spark) branch branch-3.5 updated: [SPARK-46443][SQL] Decimal precision and scale should decided by H2 dialect

2023-12-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 286c469ad13 [SPARK-46443][SQL] Decimal precision and scale should 
decided by H2 dialect
286c469ad13 is described below

commit 286c469ad1305f91ea796fd453ae896617fb3883
Author: Jiaan Geng 
AuthorDate: Fri Dec 22 09:55:00 2023 +0800

[SPARK-46443][SQL] Decimal precision and scale should decided by H2 dialect

### What changes were proposed in this pull request?
This PR fix a but by make JDBC dialect decide the decimal precision and 
scale.

**How to reproduce the bug?**
https://github.com/apache/spark/pull/44397 proposed DS V2 push down 
`PERCENTILE_CONT` and `PERCENTILE_DISC`.
The bug fired when pushdown the below SQL to H2 JDBC.
`SELECT "DEPT",PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "SALARY" ASC 
NULLS FIRST) FROM "test"."employee" WHERE 1=0 GROUP BY "DEPT"`

**The root cause**
`getQueryOutputSchema` used to get the output schema of query by call 
`JdbcUtils.getSchema`.
The query for database H2 show below.
`SELECT "DEPT",PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "SALARY" ASC 
NULLS FIRST) FROM "test"."employee" WHERE 1=0 GROUP BY "DEPT"`
We can get the five variables from `ResultSetMetaData`, please refer:
```
columnName = "PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY SALARY NULLS 
FIRST)"
dataType = 2
typeName = "NUMERIC"
fieldSize = 10
fieldScale = 5
```
Then we get the catalyst schema with `JdbcUtils.getCatalystType`, it calls 
`DecimalType.bounded(precision, scale)` actually.
The `DecimalType.bounded(10, 5)` returns `DecimalType(38, 38)`.
At finally, `makeGetter` throws exception.
```
Caused by: org.apache.spark.SparkArithmeticException: 
[DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION] Decimal precision 42 exceeds max 
precision 38. SQLSTATE: 22003
at 
org.apache.spark.sql.errors.DataTypeErrors$.decimalPrecisionExceedsMaxPrecisionError(DataTypeErrors.scala:48)
at org.apache.spark.sql.types.Decimal.set(Decimal.scala:124)
at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:577)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$4(JdbcUtils.scala:408)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.nullSafeConvert(JdbcUtils.scala:552)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3(JdbcUtils.scala:408)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3$adapted(JdbcUtils.scala:406)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:358)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:339)
```

### Why are the changes needed?
This PR fix the bug that `JdbcUtils` can't get the correct decimal type.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Fix a bug.

### How was this patch tested?
Manual tests in https://github.com/apache/spark/pull/44397

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #44398 from beliefer/SPARK-46443.

Authored-by: Jiaan Geng 
Signed-off-by: Wenchen Fan 
(cherry picked from commit a921da8509a19b2d23c30ad657725f760932236c)
Signed-off-by: Wenchen Fan 
---
 .../main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala | 16 +++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 8471a49153f..3f56eb035f5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -35,7 +35,7 @@ import 
org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.connector.catalog.index.TableIndex
 import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
NamedReference}
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
-import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, 
DecimalType, ShortType, StringType}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, 
DecimalType, MetadataBuilder, ShortType, StringType}
 
 private[sql] object H2Dialect extends JdbcDialect {
   override def canHandle(url: String): Boolean =
@@ -57,6 +57,20 @@ private[sql] object H2Dialect extends JdbcDialect {
   override def isSupportedFunction(funcName: String): Boolean =
 supportedFunctions.contains(funcName)
 
+  override def 

(spark) branch master updated: [SPARK-46443][SQL] Decimal precision and scale should decided by H2 dialect

2023-12-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a921da8509a [SPARK-46443][SQL] Decimal precision and scale should 
decided by H2 dialect
a921da8509a is described below

commit a921da8509a19b2d23c30ad657725f760932236c
Author: Jiaan Geng 
AuthorDate: Fri Dec 22 09:55:00 2023 +0800

[SPARK-46443][SQL] Decimal precision and scale should decided by H2 dialect

### What changes were proposed in this pull request?
This PR fix a but by make JDBC dialect decide the decimal precision and 
scale.

**How to reproduce the bug?**
https://github.com/apache/spark/pull/44397 proposed DS V2 push down 
`PERCENTILE_CONT` and `PERCENTILE_DISC`.
The bug fired when pushdown the below SQL to H2 JDBC.
`SELECT "DEPT",PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "SALARY" ASC 
NULLS FIRST) FROM "test"."employee" WHERE 1=0 GROUP BY "DEPT"`

**The root cause**
`getQueryOutputSchema` used to get the output schema of query by call 
`JdbcUtils.getSchema`.
The query for database H2 show below.
`SELECT "DEPT",PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "SALARY" ASC 
NULLS FIRST) FROM "test"."employee" WHERE 1=0 GROUP BY "DEPT"`
We can get the five variables from `ResultSetMetaData`, please refer:
```
columnName = "PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY SALARY NULLS 
FIRST)"
dataType = 2
typeName = "NUMERIC"
fieldSize = 10
fieldScale = 5
```
Then we get the catalyst schema with `JdbcUtils.getCatalystType`, it calls 
`DecimalType.bounded(precision, scale)` actually.
The `DecimalType.bounded(10, 5)` returns `DecimalType(38, 38)`.
At finally, `makeGetter` throws exception.
```
Caused by: org.apache.spark.SparkArithmeticException: 
[DECIMAL_PRECISION_EXCEEDS_MAX_PRECISION] Decimal precision 42 exceeds max 
precision 38. SQLSTATE: 22003
at 
org.apache.spark.sql.errors.DataTypeErrors$.decimalPrecisionExceedsMaxPrecisionError(DataTypeErrors.scala:48)
at org.apache.spark.sql.types.Decimal.set(Decimal.scala:124)
at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:577)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$4(JdbcUtils.scala:408)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.nullSafeConvert(JdbcUtils.scala:552)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3(JdbcUtils.scala:408)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$3$adapted(JdbcUtils.scala:406)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:358)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:339)
```

### Why are the changes needed?
This PR fix the bug that `JdbcUtils` can't get the correct decimal type.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Fix a bug.

### How was this patch tested?
Manual tests in https://github.com/apache/spark/pull/44397

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #44398 from beliefer/SPARK-46443.

Authored-by: Jiaan Geng 
Signed-off-by: Wenchen Fan 
---
 .../main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala | 16 +++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 3c9bc0ed691..c3b4092c8e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.connector.catalog.index.TableIndex
 import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
NamedReference}
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
-import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, 
DecimalType, ShortType, StringType}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, 
DecimalType, MetadataBuilder, ShortType, StringType}
 
 private[sql] object H2Dialect extends JdbcDialect {
   override def canHandle(url: String): Boolean =
@@ -57,6 +57,20 @@ private[sql] object H2Dialect extends JdbcDialect {
   override def isSupportedFunction(funcName: String): Boolean =
 supportedFunctions.contains(funcName)
 
+  override def getCatalystType(
+  sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): 
Option[DataType] = {
+

(spark) branch master updated: [SPARK-46380][SQL][FOLLOWUP] Simplify the code for ResolveInlineTables and ResolveInlineTablesSuite

2023-12-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8c63485189c [SPARK-46380][SQL][FOLLOWUP] Simplify the code for 
ResolveInlineTables and ResolveInlineTablesSuite
8c63485189c is described below

commit 8c63485189c87fd0a11b57c1a4c8ffa517f5f64e
Author: Jiaan Geng 
AuthorDate: Fri Dec 22 09:49:55 2023 +0800

[SPARK-46380][SQL][FOLLOWUP] Simplify the code for ResolveInlineTables and 
ResolveInlineTablesSuite

### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/44316 replace current time/date prior 
to evaluating inline table expressions.
This PR propose to simplify the code for `ResolveInlineTables` and let 
`ResolveInlineTablesSuite` apply the rule `ResolveInlineTables`.

### Why are the changes needed?
Simplify the code for `ResolveInlineTables` and `ResolveInlineTablesSuite`.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
Test cases updated.
GA tests.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #7 from beliefer/SPARK-46380_followup.

Authored-by: Jiaan Geng 
Signed-off-by: Wenchen Fan 
---
 .../catalyst/analysis/ResolveInlineTables.scala| 28 +-
 .../analysis/ResolveInlineTablesSuite.scala| 12 +-
 2 files changed, 17 insertions(+), 23 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
index 73600f5c706..811e02b4d97 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
@@ -95,30 +95,24 @@ object ResolveInlineTables extends Rule[LogicalPlan]
   private[analysis] def findCommonTypesAndCast(table: UnresolvedInlineTable):
 ResolvedInlineTable = {
 // For each column, traverse all the values and find a common data type 
and nullability.
-val fields = table.rows.transpose.zip(table.names).map { case (column, 
name) =>
+val (fields, columns) = table.rows.transpose.zip(table.names).map { case 
(column, name) =>
   val inputTypes = column.map(_.dataType)
   val tpe = 
TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse {
 table.failAnalysis(
   errorClass = 
"INVALID_INLINE_TABLE.INCOMPATIBLE_TYPES_IN_INLINE_TABLE",
   messageParameters = Map("colName" -> toSQLId(name)))
   }
-  StructField(name, tpe, nullable = column.exists(_.nullable))
-}
-val attributes = DataTypeUtils.toAttributes(StructType(fields))
-assert(fields.size == table.names.size)
-
-val castedRows: Seq[Seq[Expression]] = table.rows.map { row =>
-  row.zipWithIndex.map {
-case (e, ci) =>
-  val targetType = fields(ci).dataType
-  val castedExpr = if (DataTypeUtils.sameType(e.dataType, targetType)) 
{
-e
-  } else {
-cast(e, targetType)
-  }
-  castedExpr
+  val newColumn = column.map {
+case expr if DataTypeUtils.sameType(expr.dataType, tpe) =>
+  expr
+case expr =>
+  cast(expr, tpe)
   }
-}
+  (StructField(name, tpe, nullable = column.exists(_.nullable)), newColumn)
+}.unzip
+assert(fields.size == table.names.size)
+val attributes = DataTypeUtils.toAttributes(StructType(fields))
+val castedRows: Seq[Seq[Expression]] = columns.transpose
 
 ResolvedInlineTable(castedRows, attributes)
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
index 758b6b73e4e..3e014d1c11d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala
@@ -86,8 +86,9 @@ class ResolveInlineTablesSuite extends AnalysisTest with 
BeforeAndAfter {
 
   test("cast and execute") {
 val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), 
Seq(lit(2L
-val resolved = ResolveInlineTables.findCommonTypesAndCast(table)
-val converted = 
ResolveInlineTables.earlyEvalIfPossible(resolved).asInstanceOf[LocalRelation]
+val resolved = ResolveInlineTables(table)
+assert(resolved.isInstanceOf[LocalRelation])
+val converted = resolved.asInstanceOf[LocalRelation]
 
 assert(converted.output.map(_.dataType) == Seq(LongType))
 

(spark) branch master updated: [SPARK-46468][SQL] Handle COUNT bug for EXISTS subqueries with Aggregate without grouping keys

2023-12-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3432fd8dba5 [SPARK-46468][SQL] Handle COUNT bug for EXISTS subqueries 
with Aggregate without grouping keys
3432fd8dba5 is described below

commit 3432fd8dba5bec623b14a4ec4306290eced6c93c
Author: Andrey Gubichev 
AuthorDate: Fri Dec 22 09:32:22 2023 +0800

[SPARK-46468][SQL] Handle COUNT bug for EXISTS subqueries with Aggregate 
without grouping keys

### What changes were proposed in this pull request?

As Aggregates with no grouping keys always return 1 row (can be NULL), an 
EXISTs over such subquery should always return true.
This reverts some changes done when we migrated EXISTS/IN to 
DecorrelateInnerQuery framework, in particular the static detection of 
potential count bug aggregates is removed (just having an empty grouping key 
should trigger the count bug treatment now; scalar subqueries still have extra 
checks that are evaluating the aggregate on an empty input). I suspect the same 
correctness problem was present in the legacy framework (added one test in the 
legacy section of exists-count-bug.sql)

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Query tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44451 from agubichev/SPARK-46468_count.

Authored-by: Andrey Gubichev 
Signed-off-by: Wenchen Fan 
---
 .../catalyst/optimizer/DecorrelateInnerQuery.scala | 22 +-
 .../exists-subquery/exists-aggregate.sql.out   | 29 ++
 .../exists-subquery/exists-count-bug.sql.out   | 34 ++
 .../subquery/exists-subquery/exists-aggregate.sql  |  9 ++
 .../subquery/exists-subquery/exists-count-bug.sql  |  5 
 .../sql-tests/results/join-lateral.sql.out |  1 +
 .../exists-subquery/exists-aggregate.sql.out   | 22 ++
 .../exists-subquery/exists-count-bug.sql.out   | 17 +++
 8 files changed, 118 insertions(+), 21 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index feb01d1ce3f..eca392fd84c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -22,7 +22,6 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE
@@ -462,22 +461,6 @@ object DecorrelateInnerQuery extends PredicateHelper {
   p.mapChildren(rewriteDomainJoins(outerPlan, _, conditions))
   }
 
-  private def isCountBugFree(aggregateExpressions: Seq[NamedExpression]): 
Boolean = {
-// The COUNT bug only appears if an aggregate expression returns a 
non-NULL result on an empty
-// input.
-// Typical example (hence the name) is COUNT(*) that returns 0 from an 
empty result.
-// However, SUM(x) IS NULL is another case that returns 0, and in general 
any IS/NOT IS and CASE
-// expressions are suspect (and the combination of those).
-// For now we conservatively accept only those expressions that are 
guaranteed to be safe.
-aggregateExpressions.forall {
-  case _ : AttributeReference => true
-  case Alias(_: AttributeReference, _) => true
-  case Alias(_: Literal, _) => true
-  case Alias(a: AggregateExpression, _) if 
a.aggregateFunction.defaultResult == None => true
-  case _ => false
-}
-  }
-
   def apply(
   innerPlan: LogicalPlan,
   outerPlan: LogicalPlan,
@@ -727,8 +710,6 @@ object DecorrelateInnerQuery extends PredicateHelper {
   case a @ Aggregate(groupingExpressions, aggregateExpressions, child) 
=>
 val outerReferences = collectOuterReferences(a.expressions)
 val newOuterReferences = parentOuterReferences ++ outerReferences
-val countBugSusceptible = groupingExpressions.isEmpty &&
-  !isCountBugFree(aggregateExpressions)
 val (newChild, joinCond, outerReferenceMap) =
   decorrelate(child, newOuterReferences, aggregated = true, 
underSetOp)
 // Replace all outer references in grouping and aggregate 
expressions, and 

(spark) branch master updated: [SPARK-45525][SQL][PYTHON] Support for Python data source write using DSv2

2023-12-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4fcd5bfe003 [SPARK-45525][SQL][PYTHON] Support for Python data source 
write using DSv2
4fcd5bfe003 is described below

commit 4fcd5bfe003bb546ca888efaf1d39c15c9685673
Author: allisonwang-db 
AuthorDate: Fri Dec 22 09:28:47 2023 +0800

[SPARK-45525][SQL][PYTHON] Support for Python data source write using DSv2

### What changes were proposed in this pull request?

This PR adds initial support for Python data source write by implementing 
the DSv2 `SupportsWrite` interface for `PythonTableProvider`.

Note this PR only supports the `def write(self, iterator)` API. `commit` 
and `abort` will be supported in 
[SPARK-45914](https://issues.apache.org/jira/browse/SPARK-45914).

### Why are the changes needed?

To support Python data source APIs. For instance:

```python
class SimpleWriter(DataSourceWriter):
def write(self, iterator: Iterator[Row]) -> WriterCommitMessage:
for row in iterator:
print(row)
return WriterCommitMessage()

class SimpleDataSource(DataSource):
def writer(self, schema, overwrite):
return SimpleWriter()

# Regsiter the Python data source
spark.dataSource.register(SimpleDataSource)

df.range(10).write.format("SimpleDataSource").mode("append").save()
```

### Does this PR introduce _any_ user-facing change?

Yes, this PR supports writing data into a Python data source.

### How was this patch tested?

New unit tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43791 from allisonwang-db/spark-45525-data-source-write.

Authored-by: allisonwang-db 
Signed-off-by: Wenchen Fan 
---
 .../src/main/resources/error/error-classes.json|   6 +
 docs/sql-error-conditions.md   |   6 +
 python/pyspark/errors/error_classes.py |   5 +
 python/pyspark/sql/tests/test_python_datasource.py |  36 ++-
 .../pyspark/sql/worker/write_into_data_source.py   | 233 ++
 .../spark/sql/errors/QueryExecutionErrors.scala|   6 +
 .../python/UserDefinedPythonDataSource.scala   | 269 +
 .../execution/python/PythonDataSourceSuite.scala   |  95 
 8 files changed, 612 insertions(+), 44 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index df223f3298e..8970045d4ab 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2513,6 +2513,12 @@
 ],
 "sqlState" : "42601"
   },
+  "INVALID_WRITER_COMMIT_MESSAGE" : {
+"message" : [
+  "The data source writer has generated an invalid number of commit 
messages. Expected exactly one writer commit message from each task, but 
received ."
+],
+"sqlState" : "42KDE"
+  },
   "INVALID_WRITE_DISTRIBUTION" : {
 "message" : [
   "The requested write distribution is invalid."
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index a1af6863913..0722cae5815 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1398,6 +1398,12 @@ Rewrite the query to avoid window functions, aggregate 
functions, and generator
 
 Cannot specify ORDER BY or a window frame for ``.
 
+### INVALID_WRITER_COMMIT_MESSAGE
+
+[SQLSTATE: 
42KDE](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+The data source writer has generated an invalid number of commit messages. 
Expected exactly one writer commit message from each task, but received 
``.
+
 ### 
[INVALID_WRITE_DISTRIBUTION](sql-error-conditions-invalid-write-distribution-error-class.html)
 
 [SQLSTATE: 
42000](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index bb278481262..2200b73dffc 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -772,6 +772,11 @@ ERROR_CLASSES_JSON = """
   "Expected , but got ."
 ]
   },
+  "PYTHON_DATA_SOURCE_WRITE_ERROR" : {
+"message" : [
+  "Unable to write to the Python data source: ."
+]
+  },
   "PYTHON_HASH_SEED_NOT_SET" : {
 "message" : [
   "Randomness of hash of string should be disabled via PYTHONHASHSEED."
diff --git a/python/pyspark/sql/tests/test_python_datasource.py 
b/python/pyspark/sql/tests/test_python_datasource.py
index 74ef6a87458..b1bba584d85 100644
--- a/python/pyspark/sql/tests/test_python_datasource.py
+++ 

(spark) branch master updated: [SPARK-46437][DOCS] Remove cruft from the built-in SQL functions documentation

2023-12-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e4b5977c9b7 [SPARK-46437][DOCS] Remove cruft from the built-in SQL 
functions documentation
e4b5977c9b7 is described below

commit e4b5977c9b7b808d32a6370dccc33eaeb235085e
Author: Nicholas Chammas 
AuthorDate: Fri Dec 22 09:54:13 2023 +0900

[SPARK-46437][DOCS] Remove cruft from the built-in SQL functions 
documentation

### What changes were proposed in this pull request?

- Remove a bunch of Liquid directives that are not necessary.
- Add a table of contents to the built-in SQL functions page.
- Move the generated HTML for built-in SQL functions to a subdirectory.

### Why are the changes needed?

To reduce confusion for maintainers.

### Does this PR introduce _any_ user-facing change?

Yes. It adds a table of contents and change the heading style of the 
examples.

Otherwise, the generated docs are identical.

### How was this patch tested?

I built Spark, ran `./sql/create-docs.sh`, and reviewed the generated docs 
in my browser.

The page is too long to screenshot completely, but here are a couple of 
screenshots.

https://github.com/apache/spark/assets/1039369/b285d8a2-6eab-488d-9e28-2fdc9cc833a9;>
https://github.com/apache/spark/assets/1039369/2f9670bc-773a-48a8-a0d0-54206b8a4887;>

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44393 from nchammas/sql-builtin-funcs-cruft.

Authored-by: Nicholas Chammas 
Signed-off-by: Hyukjin Kwon 
---
 docs/.gitignore   |   1 +
 docs/sql-ref-functions-builtin.md | 281 ++
 docs/sql-ref-functions.md |   2 +-
 sql/gen-sql-functions-docs.py |  15 +-
 4 files changed, 113 insertions(+), 186 deletions(-)

diff --git a/docs/.gitignore b/docs/.gitignore
index 9df83f37815..bcfdcbf5dcc 100644
--- a/docs/.gitignore
+++ b/docs/.gitignore
@@ -1 +1,2 @@
 generated-*.html
+_generated_function_html/
diff --git a/docs/sql-ref-functions-builtin.md 
b/docs/sql-ref-functions-builtin.md
index 0ff1432fabf..88ed309a883 100644
--- a/docs/sql-ref-functions-builtin.md
+++ b/docs/sql-ref-functions-builtin.md
@@ -17,202 +17,125 @@ license: |
   limitations under the License.
 ---
 
-{% for static_file in site.static_files %}
-{% if static_file.name == 'generated-agg-funcs-table.html' %}
+* Table of contents
+{:toc}
+
 ### Aggregate Functions
-{% include_relative generated-agg-funcs-table.html %}
- Examples
-{% include_relative generated-agg-funcs-examples.html %}
-{% break %}
-{% endif %}
-{% endfor %}
-
-{% for static_file in site.static_files %}
-{% if static_file.name == 'generated-window-funcs-table.html' %}
+{% include_relative _generated_function_html/agg-funcs-table.html %}
+
+**Examples**
+{% include_relative _generated_function_html/agg-funcs-examples.html %}
+
 ### Window Functions
-{% include_relative generated-window-funcs-table.html %}
- Examples
-{% include_relative generated-window-funcs-examples.html %}
-{% break %}
-{% endif %}
-{% endfor %}
-
-{% for static_file in site.static_files %}
-{% if static_file.name == 'generated-array-funcs-table.html' %}
+{% include_relative _generated_function_html/window-funcs-table.html %}
+
+**Examples**
+{% include_relative _generated_function_html/window-funcs-examples.html %}
+
 ### Array Functions
-{% include_relative generated-array-funcs-table.html %}
- Examples
-{% include_relative generated-array-funcs-examples.html %}
-{% break %}
-{% endif %}
-{% endfor %}
-
-{% for static_file in site.static_files %}
-{% if static_file.name == 'generated-collection-funcs-table.html' %}
+{% include_relative _generated_function_html/array-funcs-table.html %}
+
+**Examples**
+{% include_relative _generated_function_html/array-funcs-examples.html %}
+
 ### Collection Functions
-{% include_relative generated-collection-funcs-table.html %}
- Examples
-{% include_relative generated-collection-funcs-examples.html %}
-{% break %}
-{% endif %}
-{% endfor %}
-
-{% for static_file in site.static_files %}
-{% if static_file.name == 'generated-struct-funcs-table.html' %}
+{% include_relative _generated_function_html/collection-funcs-table.html %}
+
+**Examples**
+{% include_relative _generated_function_html/collection-funcs-examples.html %}
+
 ### STRUCT Functions
-{% include_relative generated-struct-funcs-table.html %}
- Examples
-{% include_relative generated-struct-funcs-examples.html %}
-{% break %}
-{% endif %}
-{% endfor %}
-
-{% for static_file in site.static_files %}
-{% if static_file.name == 'generated-map-funcs-table.html' %}
+{% include_relative 

(spark) branch master updated: [SPARK-46471][PS][TESTS][FOLLOWUPS] Reorganize `OpsOnDiffFramesEnabledTests`: Factor out `test_arithmetic_chain_*`

2023-12-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new db162e5a139 [SPARK-46471][PS][TESTS][FOLLOWUPS] Reorganize 
`OpsOnDiffFramesEnabledTests`: Factor out `test_arithmetic_chain_*`
db162e5a139 is described below

commit db162e5a139d355264ce5c538687efa66e62c8c4
Author: Ruifeng Zheng 
AuthorDate: Fri Dec 22 08:57:52 2023 +0900

[SPARK-46471][PS][TESTS][FOLLOWUPS] Reorganize 
`OpsOnDiffFramesEnabledTests`: Factor out `test_arithmetic_chain_*`

### What changes were proposed in this pull request?
Factor out `test_arithmetic_chain_*`

### Why are the changes needed?
for testing parallelism

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #3 from zhengruifeng/ps_test_diff_ops_arithmetic_chain.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 dev/sparktestsupport/modules.py|   6 +
 .../test_parity_arithmetic_chain.py|  41 +
 .../test_parity_arithmetic_chain_ext.py|  41 +
 .../test_parity_arithmetic_chain_ext_float.py  |  43 +
 .../tests/diff_frames_ops/test_arithmetic_chain.py | 189 +
 .../diff_frames_ops/test_arithmetic_chain_ext.py   | 120 +
 .../test_arithmetic_chain_ext_float.py | 122 +
 .../pandas/tests/test_ops_on_diff_frames.py| 117 -
 8 files changed, 562 insertions(+), 117 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 3e0b364ca84..47db204e2fa 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -867,6 +867,9 @@ pyspark_pandas_slow = Module(
 "pyspark.pandas.tests.diff_frames_ops.test_arithmetic",
 "pyspark.pandas.tests.diff_frames_ops.test_arithmetic_ext",
 "pyspark.pandas.tests.diff_frames_ops.test_arithmetic_ext_float",
+"pyspark.pandas.tests.diff_frames_ops.test_arithmetic_chain",
+"pyspark.pandas.tests.diff_frames_ops.test_arithmetic_chain_ext",
+"pyspark.pandas.tests.diff_frames_ops.test_arithmetic_chain_ext_float",
 "pyspark.pandas.tests.diff_frames_ops.test_basic_slow",
 "pyspark.pandas.tests.diff_frames_ops.test_cov",
 "pyspark.pandas.tests.diff_frames_ops.test_corrwith",
@@ -1229,6 +1232,9 @@ pyspark_pandas_connect_part3 = Module(
 "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic",
 
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_ext",
 
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_ext_float",
+
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_chain",
+
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_chain_ext",
+
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_chain_ext_float",
 "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby",
 
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_aggregate",
 
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_apply",
diff --git 
a/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_arithmetic_chain.py
 
b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_arithmetic_chain.py
new file mode 100644
index 000..d24a4a41d0b
--- /dev/null
+++ 
b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_arithmetic_chain.py
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.pandas.tests.diff_frames_ops.test_arithmetic_chain import 
ArithmeticChainMixin
+from pyspark.testing.connectutils import ReusedConnectTestCase
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
+
+
+class ArithmeticChainParityTests(
+ArithmeticChainMixin,
+PandasOnSparkTestUtils,
+

(spark) branch master updated: [SPARK-46476][BUILD][CORE][CONNECT] Move `IvyTestUtils` back to `src/test` directory

2023-12-21 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2723bb11e3c7 [SPARK-46476][BUILD][CORE][CONNECT] Move `IvyTestUtils` 
back to `src/test` directory
2723bb11e3c7 is described below

commit 2723bb11e3c7b9e2252e26cca904e2e53aaa68fb
Author: yangjie01 
AuthorDate: Thu Dec 21 20:50:34 2023 +0800

[SPARK-46476][BUILD][CORE][CONNECT] Move `IvyTestUtils` back to `src/test` 
directory

### What changes were proposed in this pull request?
This pr move `IvyTestUtils` back to `src/test` directory because it has 
been in the `src/test` directory before the refactoring work of 
https://github.com/apache/spark/pull/43354.

Meanwhile, in order to make the `core` and `connect-client-jvm` module use 
`IvyTestUtils` in the tests, this pr has added the corresponding Maven 
dependencies in the respective `pom.xml` files.

### Why are the changes needed?
Move `IvyTestUtils` back to `src/test` directory

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #0 from LuciferYang/mv-IvyTestUtils-to-test-dir.

Authored-by: yangjie01 
Signed-off-by: yangjie01 
---
 .../{main => test}/scala/org/apache/spark/util/IvyTestUtils.scala  | 0
 connector/connect/client/jvm/pom.xml   | 7 +++
 core/pom.xml   | 7 +++
 3 files changed, 14 insertions(+)

diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/IvyTestUtils.scala 
b/common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala
similarity index 100%
rename from common/utils/src/main/scala/org/apache/spark/util/IvyTestUtils.scala
rename to common/utils/src/test/scala/org/apache/spark/util/IvyTestUtils.scala
diff --git a/connector/connect/client/jvm/pom.xml 
b/connector/connect/client/jvm/pom.xml
index eb98e4203a96..8057a33df178 100644
--- a/connector/connect/client/jvm/pom.xml
+++ b/connector/connect/client/jvm/pom.xml
@@ -82,6 +82,13 @@
   scalacheck_${scala.binary.version}
   test
 
+
+  org.apache.spark
+  spark-common-utils_${scala.binary.version}
+  ${project.version}
+  tests
+  test
+
 
 
   com.typesafe
diff --git a/core/pom.xml b/core/pom.xml
index f2d1f4eb144a..c093213bd6b9 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -451,6 +451,13 @@
   tests
   test
 
+
+  org.apache.spark
+  spark-common-utils_${scala.binary.version}
+  ${project.version}
+  tests
+  test
+
 
 

(spark) branch master updated: [SPARK-46470][PS][TESTS] Move `test_series_datetime` to `pyspark.pandas.tests.series.*`

2023-12-21 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 87d22fd346ce [SPARK-46470][PS][TESTS] Move `test_series_datetime` to 
`pyspark.pandas.tests.series.*`
87d22fd346ce is described below

commit 87d22fd346cefcda1c44f8f4c4e60d665143b2bb
Author: Ruifeng Zheng 
AuthorDate: Thu Dec 21 17:53:39 2023 +0800

[SPARK-46470][PS][TESTS] Move `test_series_datetime` to 
`pyspark.pandas.tests.series.*`

### What changes were proposed in this pull request?
Move `test_series_datetime` to `pyspark.pandas.tests.series.*`

### Why are the changes needed?
move the test to the right place

### Does this PR introduce _any_ user-facing change?
no, test only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44434 from zhengruifeng/ps_test_mv_ser_ts.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 dev/sparktestsupport/modules.py   | 4 ++--
 .../test_parity_datetime.py}  | 8 +---
 .../tests/{test_series_datetime.py => series/test_datetime.py}| 8 ++--
 3 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 7a5ac426dc7c..3e0b364ca846 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -770,7 +770,7 @@ pyspark_pandas = Module(
 "pyspark.pandas.tests.window.test_groupby_rolling_adv",
 "pyspark.pandas.tests.window.test_groupby_rolling_count",
 "pyspark.pandas.tests.test_scalars",
-"pyspark.pandas.tests.test_series_datetime",
+"pyspark.pandas.tests.series.test_datetime",
 "pyspark.pandas.tests.series.test_string_ops_adv",
 "pyspark.pandas.tests.series.test_string_ops_basic",
 "pyspark.pandas.tests.test_spark_functions",
@@ -1049,7 +1049,7 @@ pyspark_pandas_connect_part0 = Module(
 "pyspark.pandas.tests.connect.resample.test_parity_on",
 "pyspark.pandas.tests.connect.resample.test_parity_timezone",
 "pyspark.pandas.tests.connect.test_parity_scalars",
-"pyspark.pandas.tests.connect.test_parity_series_datetime",
+"pyspark.pandas.tests.connect.series.test_parity_datetime",
 "pyspark.pandas.tests.connect.series.test_parity_string_ops_adv",
 "pyspark.pandas.tests.connect.series.test_parity_string_ops_basic",
 "pyspark.pandas.tests.connect.test_parity_spark_functions",
diff --git a/python/pyspark/pandas/tests/connect/test_parity_series_datetime.py 
b/python/pyspark/pandas/tests/connect/series/test_parity_datetime.py
similarity index 85%
rename from python/pyspark/pandas/tests/connect/test_parity_series_datetime.py
rename to python/pyspark/pandas/tests/connect/series/test_parity_datetime.py
index 0842558d0e3f..89fa7b8ceef8 100644
--- a/python/pyspark/pandas/tests/connect/test_parity_series_datetime.py
+++ b/python/pyspark/pandas/tests/connect/series/test_parity_datetime.py
@@ -16,19 +16,21 @@
 #
 import unittest
 
-from pyspark.pandas.tests.test_series_datetime import SeriesDateTimeTestsMixin
+from pyspark.pandas.tests.series.test_datetime import SeriesDateTimeTestsMixin
 from pyspark.testing.connectutils import ReusedConnectTestCase
 from pyspark.testing.pandasutils import PandasOnSparkTestUtils
 
 
 class SeriesDateTimeParityTests(
-SeriesDateTimeTestsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase
+SeriesDateTimeTestsMixin,
+PandasOnSparkTestUtils,
+ReusedConnectTestCase,
 ):
 pass
 
 
 if __name__ == "__main__":
-from pyspark.pandas.tests.connect.test_parity_series_datetime import *  # 
noqa: F401
+from pyspark.pandas.tests.connect.series.test_parity_datetime import *  # 
noqa: F401
 
 try:
 import xmlrunner
diff --git a/python/pyspark/pandas/tests/test_series_datetime.py 
b/python/pyspark/pandas/tests/series/test_datetime.py
similarity index 98%
rename from python/pyspark/pandas/tests/test_series_datetime.py
rename to python/pyspark/pandas/tests/series/test_datetime.py
index 89d4b70e0b51..e203da29f497 100644
--- a/python/pyspark/pandas/tests/test_series_datetime.py
+++ b/python/pyspark/pandas/tests/series/test_datetime.py
@@ -280,12 +280,16 @@ class SeriesDateTimeTestsMixin:
 )
 
 
-class SeriesDateTimeTests(SeriesDateTimeTestsMixin, PandasOnSparkTestCase, 
SQLTestUtils):
+class SeriesDateTimeTests(
+SeriesDateTimeTestsMixin,
+PandasOnSparkTestCase,
+SQLTestUtils,
+):
 pass
 
 
 if __name__ == "__main__":
-from pyspark.pandas.tests.test_series_datetime import *  # noqa: F401
+from pyspark.pandas.tests.series.test_datetime import *  # noqa: F401
 
 try:
 import xmlrunner



(spark) branch master updated: [SPARK-46466][SQL] Vectorized parquet reader should never do rebase for timestamp ntz

2023-12-21 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4d21e5547580 [SPARK-46466][SQL] Vectorized parquet reader should never 
do rebase for timestamp ntz
4d21e5547580 is described below

commit 4d21e55475807a089979cffb54076bcb3ae9c02d
Author: Wenchen Fan 
AuthorDate: Thu Dec 21 12:42:19 2023 +0300

[SPARK-46466][SQL] Vectorized parquet reader should never do rebase for 
timestamp ntz

### What changes were proposed in this pull request?

This fixes a correctness bug. The TIMESTAMP_NTZ is a new data type in Spark 
and has no legacy files that need to do calendar rebase. However, the 
vectorized parquet reader treat it the same as LTZ and may do rebase if the 
parquet file was written with the legacy rebase mode. This PR fixes it to never 
do rebase for NTZ.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

Yes, now we can correctly write and read back NTZ value even if the date is 
before 1582.

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44428 from cloud-fan/ntz.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Max Gekk 
---
 .../parquet/ParquetVectorUpdaterFactory.java   | 27 --
 .../datasources/parquet/ParquetQuerySuite.scala| 12 ++
 2 files changed, 27 insertions(+), 12 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 918f21716f45..31a1957b4fb9 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -109,24 +109,32 @@ public class ParquetVectorUpdaterFactory {
   // For unsigned int64, it stores as plain signed int64 in Parquet 
when dictionary
   // fallbacks. We read them as decimal values.
   return new UnsignedLongUpdater();
-} else if (isTimestamp(sparkType) &&
+} else if (sparkType == DataTypes.TimestampType &&
   isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
-  validateTimestampType(sparkType);
   if ("CORRECTED".equals(datetimeRebaseMode)) {
 return new LongUpdater();
   } else {
 boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
 return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz);
   }
-} else if (isTimestamp(sparkType) &&
+} else if (sparkType == DataTypes.TimestampType &&
   isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
-  validateTimestampType(sparkType);
   if ("CORRECTED".equals(datetimeRebaseMode)) {
 return new LongAsMicrosUpdater();
   } else {
 final boolean failIfRebase = 
"EXCEPTION".equals(datetimeRebaseMode);
 return new LongAsMicrosRebaseUpdater(failIfRebase, 
datetimeRebaseTz);
   }
+} else if (sparkType == DataTypes.TimestampNTZType &&
+  isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
+  validateTimestampNTZType();
+  // TIMESTAMP_NTZ is a new data type and has no legacy files that 
need to do rebase.
+  return new LongUpdater();
+} else if (sparkType == DataTypes.TimestampNTZType &&
+  isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
+  validateTimestampNTZType();
+  // TIMESTAMP_NTZ is a new data type and has no legacy files that 
need to do rebase.
+  return new LongAsMicrosUpdater();
 } else if (sparkType instanceof DayTimeIntervalType) {
   return new LongUpdater();
 }
@@ -195,12 +203,11 @@ public class ParquetVectorUpdaterFactory {
   annotation.getUnit() == unit;
   }
 
-  void validateTimestampType(DataType sparkType) {
+  private void validateTimestampNTZType() {
 assert(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation);
-// Throw an exception if the Parquet type is TimestampLTZ and the Catalyst 
type is TimestampNTZ.
+// Throw an exception if the Parquet type is TimestampLTZ as the Catalyst 
type is TimestampNTZ.
 // This is to avoid mistakes in reading the timestamp values.
-if (((TimestampLogicalTypeAnnotation) 
logicalTypeAnnotation).isAdjustedToUTC() &&
-  sparkType == DataTypes.TimestampNTZType) {
+if (((TimestampLogicalTypeAnnotation) 

(spark) branch master updated: [SPARK-46471][PS][TESTS] Reorganize `OpsOnDiffFramesEnabledTests`: Factor out `test_arithmetic_*`

2023-12-21 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c10b2c0fbaeb [SPARK-46471][PS][TESTS] Reorganize 
`OpsOnDiffFramesEnabledTests`: Factor out `test_arithmetic_*`
c10b2c0fbaeb is described below

commit c10b2c0fbaeb5a497599bb77c11577e78266904a
Author: Ruifeng Zheng 
AuthorDate: Thu Dec 21 16:53:58 2023 +0800

[SPARK-46471][PS][TESTS] Reorganize `OpsOnDiffFramesEnabledTests`: Factor 
out `test_arithmetic_*`

### What changes were proposed in this pull request?
Factor out `test_arithmetic_*` from `OpsOnDiffFramesEnabledTests`

### Why are the changes needed?
`OpsOnDiffFramesEnabledTests` and its parity test are slow:
```
Starting test(python3.9): 
pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames (temp output: 
/__w/spark/spark/python/target/6b1d192e-052f-42d4-9023-04df84120fce/python3.9__pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames__gycsek91.log)
Finished test(python3.9): 
pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames (740s)
```

break it into small tests to be more suitable for parallelism

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44435 from zhengruifeng/ps_test_diff_ops_arithmetic.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 dev/sparktestsupport/modules.py|   6 +
 .../diff_frames_ops/test_parity_arithmetic.py  |  41 ++
 .../diff_frames_ops/test_parity_arithmetic_ext.py  |  41 ++
 .../test_parity_arithmetic_ext_float.py|  41 ++
 .../tests/diff_frames_ops/test_arithmetic.py   | 156 +
 .../tests/diff_frames_ops/test_arithmetic_ext.py   |  99 +
 .../diff_frames_ops/test_arithmetic_ext_float.py   |  99 +
 .../pandas/tests/test_ops_on_diff_frames.py|  89 
 8 files changed, 483 insertions(+), 89 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index cbd3b35c0015..7a5ac426dc7c 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -864,6 +864,9 @@ pyspark_pandas_slow = Module(
 "pyspark.pandas.tests.test_indexing",
 "pyspark.pandas.tests.test_ops_on_diff_frames",
 "pyspark.pandas.tests.diff_frames_ops.test_align",
+"pyspark.pandas.tests.diff_frames_ops.test_arithmetic",
+"pyspark.pandas.tests.diff_frames_ops.test_arithmetic_ext",
+"pyspark.pandas.tests.diff_frames_ops.test_arithmetic_ext_float",
 "pyspark.pandas.tests.diff_frames_ops.test_basic_slow",
 "pyspark.pandas.tests.diff_frames_ops.test_cov",
 "pyspark.pandas.tests.diff_frames_ops.test_corrwith",
@@ -1223,6 +1226,9 @@ pyspark_pandas_connect_part3 = Module(
 "pyspark.pandas.tests.connect.indexes.test_parity_datetime_property",
 "pyspark.pandas.tests.connect.indexes.test_parity_datetime_round",
 "pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames",
+"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic",
+
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_ext",
+
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_ext_float",
 "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby",
 
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_aggregate",
 
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_apply",
diff --git 
a/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_arithmetic.py 
b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_arithmetic.py
new file mode 100644
index ..669d6ace2404
--- /dev/null
+++ 
b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_arithmetic.py
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

(spark) branch master updated: [SPARK-46378][SQL][FOLLOWUP] Do not rely on TreeNodeTag in Project

2023-12-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0e94f340a63a [SPARK-46378][SQL][FOLLOWUP] Do not rely on TreeNodeTag 
in Project
0e94f340a63a is described below

commit 0e94f340a63af07f1b105c61e3f884993ee305e6
Author: Wenchen Fan 
AuthorDate: Thu Dec 21 16:23:25 2023 +0800

[SPARK-46378][SQL][FOLLOWUP] Do not rely on TreeNodeTag in Project

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/44310 . It turns 
out that `TreeNodeTag` in `Project` is way too fragile. `Project` is a very 
basic node and very easy to get removed/transformed during plan optimization.

This PR switches to a different approach: since we can't retain the 
information (input data order doesn't matter) from `Aggregate`, let's leverage 
this information immediately. We pull out the expensive part of 
`EliminateSorts` to a new rule, so that we can safely call `EliminateSorts` 
right before we turn `Aggregate` into `Project`.

### Why are the changes needed?

to make the optimizer more robust.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #44429 from cloud-fan/sort.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 35 
 .../catalyst/optimizer/RemoveRedundantSorts.scala  | 62 ++
 .../plans/logical/basicLogicalOperators.scala  |  3 --
 .../catalyst/optimizer/EliminateSortsSuite.scala   |  3 +-
 .../datasources/V1WriteCommandSuite.scala  | 54 ---
 5 files changed, 111 insertions(+), 46 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 5a19c5e3c241..1a831b958ef2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -211,7 +211,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
 Batch("Join Reorder", FixedPoint(1),
   CostBasedJoinReorder) :+
 Batch("Eliminate Sorts", Once,
-  EliminateSorts) :+
+  EliminateSorts,
+  RemoveRedundantSorts) :+
 Batch("Decimal Optimizations", fixedPoint,
   DecimalAggregates) :+
 // This batch must run after "Decimal Optimizations", as that one may 
change the
@@ -771,11 +772,11 @@ object LimitPushDown extends Rule[LogicalPlan] {
   LocalLimit(exp, project.copy(child = pushLocalLimitThroughJoin(exp, 
join)))
 // Push down limit 1 through Aggregate and turn Aggregate into Project if 
it is group only.
 case Limit(le @ IntegerLiteral(1), a: Aggregate) if a.groupOnly =>
-  val project = Project(a.aggregateExpressions, LocalLimit(le, a.child))
-  project.setTagValue(Project.dataOrderIrrelevantTag, ())
-  Limit(le, project)
+  val newAgg = EliminateSorts(a.copy(child = LocalLimit(le, 
a.child))).asInstanceOf[Aggregate]
+  Limit(le, Project(newAgg.aggregateExpressions, newAgg.child))
 case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if 
a.groupOnly =>
-  Limit(le, p.copy(child = Project(a.aggregateExpressions, LocalLimit(le, 
a.child
+  val newAgg = EliminateSorts(a.copy(child = LocalLimit(le, 
a.child))).asInstanceOf[Aggregate]
+  Limit(le, p.copy(child = Project(newAgg.aggregateExpressions, 
newAgg.child)))
 // Merge offset value and limit value into LocalLimit and pushes down 
LocalLimit through Offset.
 case LocalLimit(le, Offset(oe, grandChild)) =>
   Offset(oe, LocalLimit(Add(le, oe), grandChild))
@@ -1557,38 +1558,30 @@ object CombineFilters extends Rule[LogicalPlan] with 
PredicateHelper {
  * Note that changes in the final output ordering may affect the file size 
(SPARK-32318).
  * This rule handles the following cases:
  * 1) if the sort order is empty or the sort order does not have any reference
- * 2) if the Sort operator is a local sort and the child is already sorted
- * 3) if there is another Sort operator separated by 0...n Project, Filter, 
Repartition or
+ * 2) if there is another Sort operator separated by 0...n Project, Filter, 
Repartition or
  *RepartitionByExpression, RebalancePartitions (with deterministic 
expressions) operators
- * 4) if the Sort operator is within Join separated by 0...n Project, Filter, 
Repartition or
+ * 3) if the Sort operator is within Join separated by 0...n Project, Filter, 
Repartition or
  *

(spark) branch branch-3.5 updated: [SPARK-46380][SQL] Replace current time/date prior to evaluating inline table expressions

2023-12-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new d7534a3ec1ea [SPARK-46380][SQL] Replace current time/date prior to 
evaluating inline table expressions
d7534a3ec1ea is described below

commit d7534a3ec1eab53bbd349f9ae31684337c734958
Author: Aleksandar Tomic 
AuthorDate: Thu Dec 21 15:58:15 2023 +0800

[SPARK-46380][SQL] Replace current time/date prior to evaluating inline 
table expressions

With this PR proposal is to do inline table resolution in two phases:
1) If there are no expressions that depend on current context (e.g. 
expressions that depend on CURRENT_DATABASE, CURRENT_USER, CURRENT_TIME etc.) 
they will be evaluated as part of ResolveInlineTable rule.
2) Expressions that do depend on CURRENT_* evaluation will be kept as 
expressions and they evaluation will be delayed to post analysis phase.

This PR aims to solve two problems with inline tables.

Example1:
```sql
SELECT COUNT(DISTINCT ct) FROM VALUES
(CURRENT_TIMESTAMP()),
(CURRENT_TIMESTAMP()),
(CURRENT_TIMESTAMP()) as data(ct)
```
Prior to this change this example would return 3 (i.e. all 
CURRENT_TIMESTAMP expressions would return different value since they would be 
evaluated individually as part of inline table evaluation). After this change 
result is 1.

Example 2:
```sql
CREATE VIEW V as (SELECT * FROM VALUES(CURRENT_TIMESTAMP())
```
In this example VIEW would be saved with literal evaluated during VIEW 
creation. After this change CURRENT_TIMESTAMP() will eval during VIEW execution.

See section above.

New test that validates this behaviour is introduced.

No.

Closes #44316 from dbatomic/inline_tables_curr_time_fix.

Lead-authored-by: Aleksandar Tomic 
Co-authored-by: Aleksandar Tomic 
<150942779+dbato...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
(cherry picked from commit 5fe963f8560ef05925d127e82ab7ef28d6a1d7bc)
Signed-off-by: Wenchen Fan 
---
 .../catalyst/analysis/ResolveInlineTables.scala| 68 --
 .../spark/sql/catalyst/analysis/unresolved.scala   | 15 +
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  4 +-
 .../sql/catalyst/optimizer/finishAnalysis.scala| 33 +++
 .../sql/catalyst/rules/RuleIdCollection.scala  |  1 +
 .../spark/sql/catalyst/trees/TreePatterns.scala|  1 +
 .../analysis/ResolveInlineTablesSuite.scala| 31 --
 .../analyzer-results/inline-table.sql.out  | 16 -
 .../postgreSQL/create_view.sql.out |  2 +-
 .../resources/sql-tests/inputs/inline-table.sql|  6 ++
 .../sql-tests/results/inline-table.sql.out | 16 +
 .../apache/spark/sql/execution/SQLViewSuite.scala  | 14 +
 12 files changed, 165 insertions(+), 42 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
index 760ea466b857..73600f5c7064 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
@@ -17,28 +17,29 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import scala.util.control.NonFatal
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper, 
Expression}
+import org.apache.spark.sql.catalyst.optimizer.EvalInlineTables
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.AlwaysProcess
+import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.TypeUtils.{toSQLExpr, toSQLId}
 import org.apache.spark.sql.types.{StructField, StructType}
 
 /**
- * An analyzer rule that replaces [[UnresolvedInlineTable]] with 
[[LocalRelation]].
+ * An analyzer rule that replaces [[UnresolvedInlineTable]] with 
[[ResolvedInlineTable]].
  */
 object ResolveInlineTables extends Rule[LogicalPlan]
   with CastSupport with AliasHelper with EvalHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
-AlwaysProcess.fn, ruleId) {
-case table: UnresolvedInlineTable if table.expressionsResolved =>
-  validateInputDimension(table)
-  validateInputEvaluable(table)
-  convert(table)
+  override def 

(spark) branch master updated: [SPARK-46380][SQL] Replace current time/date prior to evaluating inline table expressions

2023-12-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5fe963f8560e [SPARK-46380][SQL] Replace current time/date prior to 
evaluating inline table expressions
5fe963f8560e is described below

commit 5fe963f8560ef05925d127e82ab7ef28d6a1d7bc
Author: Aleksandar Tomic 
AuthorDate: Thu Dec 21 15:58:15 2023 +0800

[SPARK-46380][SQL] Replace current time/date prior to evaluating inline 
table expressions

### What changes were proposed in this pull request?

With this PR proposal is to do inline table resolution in two phases:
1) If there are no expressions that depend on current context (e.g. 
expressions that depend on CURRENT_DATABASE, CURRENT_USER, CURRENT_TIME etc.) 
they will be evaluated as part of ResolveInlineTable rule.
2) Expressions that do depend on CURRENT_* evaluation will be kept as 
expressions and they evaluation will be delayed to post analysis phase.

### Why are the changes needed?

This PR aims to solve two problems with inline tables.

Example1:
```sql
SELECT COUNT(DISTINCT ct) FROM VALUES
(CURRENT_TIMESTAMP()),
(CURRENT_TIMESTAMP()),
(CURRENT_TIMESTAMP()) as data(ct)
```
Prior to this change this example would return 3 (i.e. all 
CURRENT_TIMESTAMP expressions would return different value since they would be 
evaluated individually as part of inline table evaluation). After this change 
result is 1.

Example 2:
```sql
CREATE VIEW V as (SELECT * FROM VALUES(CURRENT_TIMESTAMP())
```
In this example VIEW would be saved with literal evaluated during VIEW 
creation. After this change CURRENT_TIMESTAMP() will eval during VIEW execution.

### Does this PR introduce _any_ user-facing change?

See section above.

### How was this patch tested?

New test that validates this behaviour is introduced.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44316 from dbatomic/inline_tables_curr_time_fix.

Lead-authored-by: Aleksandar Tomic 
Co-authored-by: Aleksandar Tomic 
<150942779+dbato...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
---
 .../catalyst/analysis/ResolveInlineTables.scala| 68 --
 .../spark/sql/catalyst/analysis/unresolved.scala   | 15 +
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  4 +-
 .../sql/catalyst/optimizer/finishAnalysis.scala| 34 ++-
 .../sql/catalyst/rules/RuleIdCollection.scala  |  1 +
 .../spark/sql/catalyst/trees/TreePatterns.scala|  1 +
 .../analysis/ResolveInlineTablesSuite.scala| 31 --
 .../analyzer-results/inline-table.sql.out  | 16 -
 .../postgreSQL/create_view.sql.out |  2 +-
 .../resources/sql-tests/inputs/inline-table.sql|  6 ++
 .../sql-tests/results/inline-table.sql.out | 16 +
 .../apache/spark/sql/execution/SQLViewSuite.scala  | 14 +
 12 files changed, 165 insertions(+), 43 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
index 760ea466b857..73600f5c7064 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala
@@ -17,28 +17,29 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import scala.util.control.NonFatal
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper}
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper, 
Expression}
+import org.apache.spark.sql.catalyst.optimizer.EvalInlineTables
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.AlwaysProcess
+import org.apache.spark.sql.catalyst.trees.TreePattern.CURRENT_LIKE
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.util.TypeUtils.{toSQLExpr, toSQLId}
 import org.apache.spark.sql.types.{StructField, StructType}
 
 /**
- * An analyzer rule that replaces [[UnresolvedInlineTable]] with 
[[LocalRelation]].
+ * An analyzer rule that replaces [[UnresolvedInlineTable]] with 
[[ResolvedInlineTable]].
  */
 object ResolveInlineTables extends Rule[LogicalPlan]
   with CastSupport with AliasHelper with EvalHelper {
-  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
-AlwaysProcess.fn, ruleId) {
-case