(spark) branch master updated: [SPARK-46465][PYTHON][DOCS][FOLLOWUPS] Add `Column.isNaN` to API references

2023-12-20 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 558e7a047bfd [SPARK-46465][PYTHON][DOCS][FOLLOWUPS] Add `Column.isNaN` 
to API references
558e7a047bfd is described below

commit 558e7a047bfdd7f9c52545a2f8ab02c54a7ca2d5
Author: Ruifeng Zheng 
AuthorDate: Thu Dec 21 12:15:41 2023 +0800

[SPARK-46465][PYTHON][DOCS][FOLLOWUPS] Add `Column.isNaN` to API references

### What changes were proposed in this pull request?
Add `Column.isNaN` to API references

### Why are the changes needed?
it's missing in https://github.com/apache/spark/pull/44422

### Does this PR introduce _any_ user-facing change?
yes, document

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44432 from zhengruifeng/py_doc_isnan.

Lead-authored-by: Ruifeng Zheng 
Co-authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 python/docs/source/reference/pyspark.sql/column.rst | 1 +
 1 file changed, 1 insertion(+)

diff --git a/python/docs/source/reference/pyspark.sql/column.rst 
b/python/docs/source/reference/pyspark.sql/column.rst
index b897b5c00c42..08052bcc4683 100644
--- a/python/docs/source/reference/pyspark.sql/column.rst
+++ b/python/docs/source/reference/pyspark.sql/column.rst
@@ -46,6 +46,7 @@ Column
 Column.getField
 Column.getItem
 Column.ilike
+Column.isNaN
 Column.isNotNull
 Column.isNull
 Column.isin


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46469][SQL] Clean up useless local variables in `InsertIntoHiveTable`

2023-12-20 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fd8d17a136eb [SPARK-46469][SQL] Clean up useless local variables in 
`InsertIntoHiveTable`
fd8d17a136eb is described below

commit fd8d17a136eb785d642faba3cc8c198a727c0563
Author: yangjie01 
AuthorDate: Thu Dec 21 12:06:10 2023 +0800

[SPARK-46469][SQL] Clean up useless local variables in `InsertIntoHiveTable`

### What changes were proposed in this pull request?
This pr aims to cleans up the unused local variables `partitionPath`, 
`hiveVersion`, and `doHiveOverwrite` in `InsertIntoHiveTable`. The code that 
used these variables has already been cleaned up in SPARK-45309 | 
https://github.com/apache/spark/pull/43098.

### Why are the changes needed?
Code cleanup.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44433 from LuciferYang/insert-into-hive-table.

Lead-authored-by: yangjie01 
Co-authored-by: YangJie 
Signed-off-by: Kent Yao 
---
 .../sql/hive/execution/InsertIntoHiveTable.scala   | 30 +-
 1 file changed, 1 insertion(+), 29 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index ee4a6a3e1eb9..74d131d6664f 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.execution.datasources.{FileFormat, V1WriteCommand, 
V1WritesUtils}
-import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.client.HiveClientImpl
 
 
@@ -207,34 +206,7 @@ case class InsertIntoHiveTable(
 table.database,
 table.identifier.table,
 partitionSpec)
-
-var doHiveOverwrite = overwrite
-
 if (oldPart.isEmpty || !ifPartitionNotExists) {
-  // SPARK-29295: When insert overwrite to a Hive external table 
partition, if the
-  // partition does not exist, Hive will not check if the external 
partition directory
-  // exists or not before copying files. So if users drop the 
partition, and then do
-  // insert overwrite to the same partition, the partition will have 
both old and new
-  // data. We construct partition path. If the path exists, we delete 
it manually.
-  val partitionPath = if (oldPart.isEmpty && overwrite
-  && table.tableType == CatalogTableType.EXTERNAL) {
-val partitionColumnNames = table.partitionColumnNames
-val tablePath = new Path(table.location)
-Some(ExternalCatalogUtils.generatePartitionPath(partitionSpec,
-  partitionColumnNames, tablePath))
-  } else {
-oldPart.flatMap(_.storage.locationUri.map(uri => new Path(uri)))
-  }
-
-  // SPARK-18107: Insert overwrite runs much slower than hive-client.
-  // Newer Hive largely improves insert overwrite performance. As 
Spark uses older Hive
-  // version and we may not want to catch up new Hive version every 
time. We delete the
-  // Hive partition first and then load data file into the Hive 
partition.
-  val hiveVersion = 
externalCatalog.asInstanceOf[ExternalCatalogWithListener]
-.unwrapped.asInstanceOf[HiveExternalCatalog]
-.client
-.version
-
   // inheritTableSpecs is set to true. It should be set to false for 
an IMPORT query
   // which is currently considered as a Hive native command.
   val inheritTableSpecs = true
@@ -243,7 +215,7 @@ case class InsertIntoHiveTable(
 table.identifier.table,
 tmpLocation.toString,
 partitionSpec,
-isOverwrite = doHiveOverwrite,
+isOverwrite = overwrite,
 inheritTableSpecs = inheritTableSpecs,
 isSrcLocal = false)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46179][SQL] Pull out code into reusable functions in SQLQueryTestSuite

2023-12-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e804f5360144 [SPARK-46179][SQL] Pull out code into reusable functions 
in SQLQueryTestSuite
e804f5360144 is described below

commit e804f53601444df71c6df8bd6237cc350bfec076
Author: Andy Lam 
AuthorDate: Thu Dec 21 11:00:29 2023 +0800

[SPARK-46179][SQL] Pull out code into reusable functions in 
SQLQueryTestSuite

### What changes were proposed in this pull request?

### Why are the changes needed?

As a prelude to https://github.com/apache/spark/pull/44084, in this PR, I 
refactored SQLQueryTestSuite by pulling out code into functions for reuse in 
subclasses.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Simple refactor, no testing.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44405 from andylam-db/crossdbms-pre.

Authored-by: Andy Lam 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/SQLQueryTestHelper.scala  |  30 ++--
 .../org/apache/spark/sql/SQLQueryTestSuite.scala   | 190 -
 .../thriftserver/ThriftServerQueryTestSuite.scala  |   2 +-
 3 files changed, 163 insertions(+), 59 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala
index d8956961440d..c08569150e2a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestHelper.scala
@@ -98,20 +98,24 @@ trait SQLQueryTestHelper extends Logging {
 }
   }
 
+  /**
+   * Uses the Spark logical plan to determine whether the plan is semantically 
sorted. This is
+   * important to make non-sorted queries test cases more deterministic.
+   */
+  protected def isSemanticallySorted(plan: LogicalPlan): Boolean = plan match {
+case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => 
false
+case _: DescribeCommandBase
+ | _: DescribeColumnCommand
+ | _: DescribeRelation
+ | _: DescribeColumn => true
+case PhysicalOperation(_, _, Sort(_, true, _)) => true
+case _ => plan.children.iterator.exists(isSemanticallySorted)
+  }
+
   /** Executes a query and returns the result as (schema of the output, 
normalized output). */
   protected def getNormalizedQueryExecutionResult(
   session: SparkSession, sql: String): (String, Seq[String]) = {
 // Returns true if the plan is supposed to be sorted.
-def isSorted(plan: LogicalPlan): Boolean = plan match {
-  case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => 
false
-  case _: DescribeCommandBase
-  | _: DescribeColumnCommand
-  | _: DescribeRelation
-  | _: DescribeColumn => true
-  case PhysicalOperation(_, _, Sort(_, true, _)) => true
-  case _ => plan.children.iterator.exists(isSorted)
-}
-
 val df = session.sql(sql)
 val schema = df.schema.catalogString
 // Get answer, but also get rid of the #1234 expression ids that show up 
in explain plans
@@ -120,7 +124,11 @@ trait SQLQueryTestHelper extends Logging {
 }
 
 // If the output is not pre-sorted, sort it.
-if (isSorted(df.queryExecution.analyzed)) (schema, answer) else (schema, 
answer.sorted)
+if (isSemanticallySorted(df.queryExecution.analyzed)) {
+  (schema, answer)
+} else {
+  (schema, answer.sorted)
+}
   }
 
   /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 032964766792..9a78b7f52b74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -349,20 +349,16 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSparkSession with SQLHelper
 }
   }
 
-  /** Run a test case. */
-  protected def runSqlTestCase(testCase: TestCase, listTestCases: 
Seq[TestCase]): Unit = {
-def splitWithSemicolon(seq: Seq[String]) = {
-  seq.mkString("\n").split("(?<=[^]);")
-}
-
-def splitCommentsAndCodes(input: String) = input.split("\n").partition { 
line =>
+  protected def splitCommentsAndCodes(input: String) =
+input.split("\n").partition { line =>
   val newLine = line.trim
   newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER")
 }
 
-val input = fileToString(new File(testCase.inputFile))
-
-val (comments, code) = splitCommentsAndCodes(input)
+  protected def getQueries(code: Array[String], comments: Array[String]) = {
+def splitWithSemicolon(seq: Seq[String]) = {
+  

(spark) branch master updated: [SPARK-46207][SQL] Support MergeInto in DataFrameWriterV2

2023-12-20 Thread beliefer
This is an automated email from the ASF dual-hosted git repository.

beliefer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 56dc7f8c2199 [SPARK-46207][SQL] Support MergeInto in DataFrameWriterV2
56dc7f8c2199 is described below

commit 56dc7f8c2199bd3f2822004a1d9853188a9db465
Author: Huaxin Gao 
AuthorDate: Thu Dec 21 10:13:55 2023 +0800

[SPARK-46207][SQL] Support MergeInto in DataFrameWriterV2

### What changes were proposed in this pull request?
Add `MergeInto` support in `DataFrameWriterV2`

### Why are the changes needed?
Spark currently supports merge into sql statement. We want DataFrame to 
have the same support.

### Does this PR introduce _any_ user-facing change?
Yes. This PR introduces new API like the following:

```
  spark.table("source")
.mergeInto("target", $"source.id" === $"target.id")
.whenNotMatched()
.insertAll()
.merge()
```

### How was this patch tested?
new tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44119 from huaxingao/mergeinto.

Authored-by: Huaxin Gao 
Signed-off-by: Jiaan Geng 
---
 .../src/main/resources/error/error-classes.json|   6 +
 .../CheckConnectJvmClientCompatibility.scala   |  11 +-
 docs/sql-error-conditions.md   |   6 +
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  32 +
 .../org/apache/spark/sql/MergeIntoWriter.scala | 329 +++
 .../sql/connector/MergeIntoDataFrameSuite.scala| 946 +
 6 files changed, 1329 insertions(+), 1 deletion(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 930042505379..df223f3298ef 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2846,6 +2846,12 @@
 ],
 "sqlState" : "42000"
   },
+  "NO_MERGE_ACTION_SPECIFIED" : {
+"message" : [
+  "df.mergeInto needs to be followed by at least one of 
whenMatched/whenNotMatched/whenNotMatchedBySource."
+],
+"sqlState" : "42K0E"
+  },
   "NO_SQL_TYPE_IN_PROTOBUF_SCHEMA" : {
 "message" : [
   "Cannot find  in Protobuf schema."
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index a9b6f102a512..bd5ff6af7464 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -298,7 +298,16 @@ object CheckConnectJvmClientCompatibility {
   ProblemFilters.exclude[MissingClassProblem](
 "org.apache.spark.sql.artifact.util.ArtifactUtils"),
   ProblemFilters.exclude[MissingClassProblem](
-"org.apache.spark.sql.artifact.util.ArtifactUtils$"))
+"org.apache.spark.sql.artifact.util.ArtifactUtils$"),
+
+  // MergeIntoWriter
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.MergeIntoWriter"),
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenMatched"),
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenMatched$"),
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatched"),
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatched$"),
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatchedBySource"),
+  
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.WhenNotMatchedBySource$"))
 checkMiMaCompatibility(clientJar, sqlJar, includedRules, excludeRules)
   }
 
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 94c7c167e392..a1af6863913e 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1640,6 +1640,12 @@ Can't determine the default value for `` since 
it is not nullable and i
 
 No handler for UDAF '``'. Use sparkSession.udf.register(...) 
instead.
 
+### NO_MERGE_ACTION_SPECIFIED
+
+[SQLSTATE: 
42K0E](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+df.mergeInto needs to be followed by at least one of 
whenMatched/whenNotMatched/whenNotMatchedBySource.
+
 ### NO_SQL_TYPE_IN_PROTOBUF_SCHEMA
 
 [SQLSTATE: 
42S22](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 

(spark) branch master updated: [MINOR][PYTHON][DOCS] Fix typo in pyspark.sql.functions.greatest docstring

2023-12-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 57c01df22fd0 [MINOR][PYTHON][DOCS] Fix typo in 
pyspark.sql.functions.greatest docstring
57c01df22fd0 is described below

commit 57c01df22fd05d929cbbcd9213913e141f997b35
Author: Kaz 
AuthorDate: Thu Dec 21 09:52:32 2023 +0900

[MINOR][PYTHON][DOCS] Fix typo in pyspark.sql.functions.greatest docstring

### What changes were proposed in this pull request?
The description of f.greatest had two small typos in them. This PR fixes 
these typos.

### Why are the changes needed?
Proper English should be used in documentation.

### Does this PR introduce _any_ user-facing change?
Yes, the correct english word is now used in the pyspark docs.

### How was this patch tested?
Not tested

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44427 from KazMiddelhoek/master.

Authored-by: Kaz 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/functions/builtin.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index 17a64b1a4f1d..54a91792404d 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -5907,12 +5907,12 @@ def greatest(*cols: "ColumnOrName") -> Column:
 Parameters
 --
 col : :class:`~pyspark.sql.Column` or str
-columns to check for gratest value.
+columns to check for greatest value.
 
 Returns
 ---
 :class:`~pyspark.sql.Column`
-gratest value.
+greatest value.
 
 Examples
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46465][PYTHON][CONNECT] Add `Column.isNaN` in PySpark

2023-12-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d56fe6c7b202 [SPARK-46465][PYTHON][CONNECT] Add `Column.isNaN` in 
PySpark
d56fe6c7b202 is described below

commit d56fe6c7b20244238a2d807a08bb646d8edcaf32
Author: Ruifeng Zheng 
AuthorDate: Thu Dec 21 08:59:45 2023 +0900

[SPARK-46465][PYTHON][CONNECT] Add `Column.isNaN` in PySpark

### What changes were proposed in this pull request?
add `Column.isNaN` in pyspark

### Why are the changes needed?
`Column.isNaN` was added in scala since 1.5.0, but it is still missing in 
python
this pr adds it for parity

### Does this PR introduce _any_ user-facing change?
yes

```
In [1]: from pyspark.sql import Row

In [2]: df = spark.createDataFrame([Row(name='Tom', height=80.0), 
Row(name='Alice', height=float('nan'))])

In [3]: df.show()
+-+--+
| name|height|
+-+--+
|  Tom|  80.0|
|Alice|   NaN|
+-+--+

In [4]: df.filter(df.height.isNaN()).show()
+-+--+
| name|height|
+-+--+
|Alice|   NaN|
+-+--+
```

### How was this patch tested?
added doctest

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44422 from zhengruifeng/py_isNaN.

Lead-authored-by: Ruifeng Zheng 
Co-authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/column.py | 14 ++
 python/pyspark/sql/connect/column.py |  1 +
 2 files changed, 15 insertions(+)

diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index 198dd9ff3e40..5fa7fb3d42b0 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -1153,9 +1153,23 @@ class Column:
 >>> df.filter(df.height.isNotNull()).collect()
 [Row(name='Tom', height=80)]
 """
+_isNaN_doc = """
+True if the current expression is NaN.
+
+.. versionadded:: 4.0.0
+
+Examples
+
+>>> from pyspark.sql import Row
+>>> df = spark.createDataFrame(
+... [Row(name='Tom', height=80.0), Row(name='Alice', 
height=float('nan'))])
+>>> df.filter(df.height.isNaN()).collect()
+[Row(name='Alice', height=nan)]
+"""
 
 isNull = _unary_op("isNull", _isNull_doc)
 isNotNull = _unary_op("isNotNull", _isNotNull_doc)
+isNaN = _unary_op("isNaN", _isNaN_doc)
 
 def alias(self, *alias: str, **kwargs: Any) -> "Column":
 """
diff --git a/python/pyspark/sql/connect/column.py 
b/python/pyspark/sql/connect/column.py
index 13b00fd83d8b..052151d5417e 100644
--- a/python/pyspark/sql/connect/column.py
+++ b/python/pyspark/sql/connect/column.py
@@ -160,6 +160,7 @@ class Column:
 
 isNull = _unary_op("isnull", PySparkColumn.isNull.__doc__)
 isNotNull = _unary_op("isnotnull", PySparkColumn.isNotNull.__doc__)
+isNaN = _unary_op("isNaN", PySparkColumn.isNaN.__doc__)
 
 def __ne__(  # type: ignore[override]
 self,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46462][PS][TESTS] Reorganize `OpsOnDiffFramesGroupByRollingTests`

2023-12-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9b583809dd94 [SPARK-46462][PS][TESTS] Reorganize 
`OpsOnDiffFramesGroupByRollingTests`
9b583809dd94 is described below

commit 9b583809dd9494ee8ed3c2e50356230e1ffae218
Author: Ruifeng Zheng 
AuthorDate: Thu Dec 21 08:58:25 2023 +0900

[SPARK-46462][PS][TESTS] Reorganize `OpsOnDiffFramesGroupByRollingTests`

### What changes were proposed in this pull request?
Reorganize `OpsOnDiffFramesGroupByRollingTests`

### Why are the changes needed?
for parallelism

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44420 from zhengruifeng/ps_test_diff_group_roll.

Lead-authored-by: Ruifeng Zheng 
Co-authored-by: Hyukjin Kwon 
Co-authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 dev/sparktestsupport/modules.py|  8 ++-
 .../test_parity_groupby_rolling.py}| 13 ++---
 .../test_parity_groupby_rolling_adv.py}| 13 ++---
 .../test_parity_groupby_rolling_count.py}  | 13 ++---
 .../test_groupby_rolling.py}   | 66 ++
 .../diff_frames_ops/test_groupby_rolling_adv.py| 61 
 .../test_groupby_rolling_count.py} | 62 +++-
 7 files changed, 107 insertions(+), 129 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 0388f1812b0d..cbd3b35c0015 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -745,7 +745,9 @@ pyspark_pandas = Module(
 "pyspark.pandas.tests.diff_frames_ops.test_groupby_expanding",
 "pyspark.pandas.tests.diff_frames_ops.test_groupby_expanding_adv",
 "pyspark.pandas.tests.diff_frames_ops.test_groupby_expanding_count",
-"pyspark.pandas.tests.test_ops_on_diff_frames_groupby_rolling",
+"pyspark.pandas.tests.diff_frames_ops.test_groupby_rolling",
+"pyspark.pandas.tests.diff_frames_ops.test_groupby_rolling_adv",
+"pyspark.pandas.tests.diff_frames_ops.test_groupby_rolling_count",
 "pyspark.pandas.tests.test_repr",
 "pyspark.pandas.tests.resample.test_on",
 "pyspark.pandas.tests.resample.test_error",
@@ -1170,7 +1172,9 @@ pyspark_pandas_connect_part2 = Module(
 "pyspark.pandas.tests.connect.window.test_parity_expanding_error",
 "pyspark.pandas.tests.connect.window.test_parity_groupby_expanding",
 
"pyspark.pandas.tests.connect.window.test_parity_groupby_expanding_adv",
-
"pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames_groupby_rolling",
+
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_rolling",
+
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_rolling_adv",
+
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_rolling_count",
 "pyspark.pandas.tests.connect.computation.test_parity_missing_data",
 "pyspark.pandas.tests.connect.groupby.test_parity_index",
 "pyspark.pandas.tests.connect.groupby.test_parity_describe",
diff --git 
a/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_rolling.py
 
b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_rolling.py
similarity index 75%
copy from 
python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_rolling.py
copy to 
python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_rolling.py
index 4a52bb0748f5..c8255d6f9c42 100644
--- 
a/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_rolling.py
+++ 
b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_rolling.py
@@ -16,24 +16,21 @@
 #
 import unittest
 
-from pyspark.pandas.tests.test_ops_on_diff_frames_groupby_rolling import (
-OpsOnDiffFramesGroupByRollingTestsMixin,
-)
+from pyspark.pandas.tests.diff_frames_ops.test_groupby_rolling import 
GroupByRollingMixin
 from pyspark.testing.connectutils import ReusedConnectTestCase
-from pyspark.testing.pandasutils import PandasOnSparkTestUtils, TestUtils
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
 
 
-class OpsOnDiffFramesGroupByRollingParityTests(
-OpsOnDiffFramesGroupByRollingTestsMixin,
+class GroupByRollingParityTests(
+GroupByRollingMixin,
 PandasOnSparkTestUtils,
-TestUtils,
 ReusedConnectTestCase,
 ):
 pass
 
 
 if __name__ == "__main__":
-from 
pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames_groupby_rolling 
import *
+from 

(spark) branch master updated: [SPARK-46463][PS][TESTS] Reorganize `OpsOnDiffFramesGroupByExpandingTests`

2023-12-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 091d8da54930 [SPARK-46463][PS][TESTS] Reorganize 
`OpsOnDiffFramesGroupByExpandingTests`
091d8da54930 is described below

commit 091d8da549306e0474e413e2984a744058be707a
Author: Ruifeng Zheng 
AuthorDate: Thu Dec 21 08:44:41 2023 +0900

[SPARK-46463][PS][TESTS] Reorganize `OpsOnDiffFramesGroupByExpandingTests`

### What changes were proposed in this pull request?
break `OpsOnDiffFramesGroupByExpandingTests` into small tests

### Why are the changes needed?
for parallelism

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44421 from zhengruifeng/ps_test_diff_group_exp.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 dev/sparktestsupport/modules.py|  8 +++--
 .../test_parity_groupby_expanding.py}  | 13 +++
 .../test_parity_groupby_expanding_adv.py}  | 13 +++
 .../test_parity_groupby_expanding_count.py}| 13 ---
 .../test_groupby_expanding.py} | 42 ++
 .../test_groupby_expanding_adv.py} | 42 +++---
 .../test_groupby_expanding_count.py}   | 39 +---
 7 files changed, 98 insertions(+), 72 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 80930c22af8b..0388f1812b0d 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -742,7 +742,9 @@ pyspark_pandas = Module(
 "pyspark.pandas.tests.test_internal",
 "pyspark.pandas.tests.test_namespace",
 "pyspark.pandas.tests.test_numpy_compat",
-"pyspark.pandas.tests.test_ops_on_diff_frames_groupby_expanding",
+"pyspark.pandas.tests.diff_frames_ops.test_groupby_expanding",
+"pyspark.pandas.tests.diff_frames_ops.test_groupby_expanding_adv",
+"pyspark.pandas.tests.diff_frames_ops.test_groupby_expanding_count",
 "pyspark.pandas.tests.test_ops_on_diff_frames_groupby_rolling",
 "pyspark.pandas.tests.test_repr",
 "pyspark.pandas.tests.resample.test_on",
@@ -1128,7 +1130,6 @@ pyspark_pandas_connect_part1 = Module(
 "pyspark.pandas.tests.connect.reshape.test_parity_get_dummies_object",
 "pyspark.pandas.tests.connect.reshape.test_parity_get_dummies_prefix",
 "pyspark.pandas.tests.connect.reshape.test_parity_merge_asof",
-
"pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames_groupby_expanding",
 ],
 excluded_python_implementations=[
 "PyPy"  # Skip these tests under PyPy since they require numpy, 
pandas, and pyarrow and
@@ -1229,6 +1230,9 @@ pyspark_pandas_connect_part3 = Module(
 
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_shift",
 
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_split_apply_combine",
 
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_transform",
+
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_expanding",
+
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_expanding_adv",
+
"pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_expanding_count",
 ],
 excluded_python_implementations=[
 "PyPy"  # Skip these tests under PyPy since they require numpy, 
pandas, and pyarrow and
diff --git 
a/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_expanding.py
 
b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_expanding.py
similarity index 74%
copy from 
python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_expanding.py
copy to 
python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_expanding.py
index c373268cdb23..7cd3d17de440 100644
--- 
a/python/pyspark/pandas/tests/connect/test_parity_ops_on_diff_frames_groupby_expanding.py
+++ 
b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_expanding.py
@@ -16,24 +16,21 @@
 #
 import unittest
 
-from pyspark.pandas.tests.test_ops_on_diff_frames_groupby_expanding import (
-OpsOnDiffFramesGroupByExpandingTestsMixin,
-)
+from pyspark.pandas.tests.diff_frames_ops.test_groupby_expanding import 
GroupByExpandingMixin
 from pyspark.testing.connectutils import ReusedConnectTestCase
-from pyspark.testing.pandasutils import PandasOnSparkTestUtils, TestUtils
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
 
 
-class OpsOnDiffFramesGroupByExpandingParityTests(
-OpsOnDiffFramesGroupByExpandingTestsMixin,

(spark) branch master updated: [SPARK-46413][PYTHON] Validate returnType of Arrow Python UDF

2023-12-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4faeb803670d [SPARK-46413][PYTHON] Validate returnType of Arrow Python 
UDF
4faeb803670d is described below

commit 4faeb803670d77de8a47fe1683a6680c3ee3f454
Author: Xinrong Meng 
AuthorDate: Thu Dec 21 08:18:13 2023 +0900

[SPARK-46413][PYTHON] Validate returnType of Arrow Python UDF

### What changes were proposed in this pull request?
Validate returnType of Arrow Python UDF

### Why are the changes needed?
Better error handling and consistency with other types of UDFs.

### Does this PR introduce _any_ user-facing change?
Yes, now we raise an error when the given returnType is not supported.

```py
>>> udf(lambda x: x, returnType=VarcharType(10), useArrow=True)
Traceback (most recent call last):
...
pyspark.errors.exceptions.base.PySparkTypeError: 
[UNSUPPORTED_DATA_TYPE_FOR_ARROW_CONVERSION] VarcharType(10) is not supported 
in conversion to Arrow.
```

### How was this patch tested?
Unit tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44362 from xinrong-meng/test_more_udf.

Authored-by: Xinrong Meng 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/tests/connect/test_parity_arrow_python_udf.py |  4 
 python/pyspark/sql/tests/test_arrow_python_udf.py | 15 ++-
 python/pyspark/sql/udf.py | 14 --
 3 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py 
b/python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py
index fa329b598d98..f5bd99fa22cf 100644
--- a/python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py
+++ b/python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py
@@ -58,6 +58,10 @@ class ArrowPythonUDFParityTests(UDFParityTests, 
PythonUDFArrowTestsMixin):
 with self.assertRaises(PythonException):
 self.spark.sql("SELECT test_udf(id, a => id * 10) FROM 
range(2)").show()
 
+@unittest.skip("Spark Connect does not validate return type in client.")
+def test_err_return_type(self):
+super.test_err_return_type()
+
 
 if __name__ == "__main__":
 import unittest
diff --git a/python/pyspark/sql/tests/test_arrow_python_udf.py 
b/python/pyspark/sql/tests/test_arrow_python_udf.py
index f853b15ce6f8..c59326edc31a 100644
--- a/python/pyspark/sql/tests/test_arrow_python_udf.py
+++ b/python/pyspark/sql/tests/test_arrow_python_udf.py
@@ -17,10 +17,11 @@
 
 import unittest
 
-from pyspark.errors import PythonException
+from pyspark.errors import PythonException, PySparkNotImplementedError
 from pyspark.sql import Row
 from pyspark.sql.functions import udf
 from pyspark.sql.tests.test_udf import BaseUDFTestsMixin
+from pyspark.sql.types import VarcharType
 from pyspark.testing.sqlutils import (
 have_pandas,
 have_pyarrow,
@@ -175,6 +176,18 @@ class PythonUDFArrowTestsMixin(BaseUDFTestsMixin):
 with self.assertRaises(PythonException):
 df_floating_value.select(udf(lambda x: x, 
"decimal")("value").alias("res")).collect()
 
+def test_err_return_type(self):
+with self.assertRaises(PySparkNotImplementedError) as pe:
+udf(lambda x: x, VarcharType(10), useArrow=True)
+
+self.check_error(
+exception=pe.exception,
+error_class="NOT_IMPLEMENTED",
+message_parameters={
+"feature": "Invalid return type with Arrow-optimized Python 
UDF: VarcharType(10)"
+},
+)
+
 
 class PythonUDFArrowTests(PythonUDFArrowTestsMixin, ReusedSQLTestCase):
 @classmethod
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index b9a00d432671..16605bc12acc 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -213,8 +213,18 @@ class UserDefinedFunction:
 self._returnType_placeholder = self._returnType
 else:
 self._returnType_placeholder = 
_parse_datatype_string(self._returnType)
-
-if (
+if self.evalType == PythonEvalType.SQL_ARROW_BATCHED_UDF:
+try:
+to_arrow_type(self._returnType_placeholder)
+except TypeError:
+raise PySparkNotImplementedError(
+error_class="NOT_IMPLEMENTED",
+message_parameters={
+"feature": f"Invalid return type with Arrow-optimized 
Python UDF: "
+f"{self._returnType_placeholder}"
+},
+)
+elif (
 self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF
 or self.evalType == 

(spark) branch master updated: [SPARK-46398][PYTHON][TESTS] Test rangeBetween window function (pyspark.sql.window)

2023-12-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 28b0c2c11d68 [SPARK-46398][PYTHON][TESTS] Test rangeBetween window 
function (pyspark.sql.window)
28b0c2c11d68 is described below

commit 28b0c2c11d681d9051a7b82b63e929bfecef
Author: Xinrong Meng 
AuthorDate: Thu Dec 21 08:16:49 2023 +0900

[SPARK-46398][PYTHON][TESTS] Test rangeBetween window function 
(pyspark.sql.window)

### What changes were proposed in this pull request?
Test rangeBetween window function (pyspark.sql.window).

### Why are the changes needed?
Subtasks of 
[SPARK-46041](https://issues.apache.org/jira/browse/SPARK-46041) to improve 
test coverage.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Test change only.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44339 from xinrong-meng/test_window_func.

Authored-by: Xinrong Meng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/test_functions.py | 28 
 1 file changed, 28 insertions(+)

diff --git a/python/pyspark/sql/tests/test_functions.py 
b/python/pyspark/sql/tests/test_functions.py
index 5352ee04d7fe..df1ddd0301ad 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -997,6 +997,34 @@ class FunctionsTestsMixin:
 for r, ex in zip(rs, expected):
 self.assertEqual(tuple(r), ex[: len(r)])
 
+def test_window_functions_moving_average(self):
+data = [
+(datetime.datetime(2023, 1, 1), 20),
+(datetime.datetime(2023, 1, 2), 22),
+(datetime.datetime(2023, 1, 3), 21),
+(datetime.datetime(2023, 1, 4), 23),
+(datetime.datetime(2023, 1, 5), 24),
+(datetime.datetime(2023, 1, 6), 26),
+]
+df = self.spark.createDataFrame(data, ["date", "temperature"])
+
+def to_sec(i):
+return i * 86400
+
+w = 
Window.orderBy(F.col("date").cast("timestamp").cast("long")).rangeBetween(-to_sec(3),
 0)
+res = df.withColumn("3_day_avg_temp", F.avg("temperature").over(w))
+rs = sorted(res.collect())
+expected = [
+(datetime.datetime(2023, 1, 1, 0, 0), 20, 20.0),
+(datetime.datetime(2023, 1, 2, 0, 0), 22, 21.0),
+(datetime.datetime(2023, 1, 3, 0, 0), 21, 21.0),
+(datetime.datetime(2023, 1, 4, 0, 0), 23, 21.5),
+(datetime.datetime(2023, 1, 5, 0, 0), 24, 22.5),
+(datetime.datetime(2023, 1, 6, 0, 0), 26, 23.5),
+]
+for r, ex in zip(rs, expected):
+self.assertEqual(tuple(r), ex[: len(r)])
+
 def test_window_time(self):
 df = self.spark.createDataFrame(
 [(datetime.datetime(2016, 3, 11, 9, 0, 7), 1)], ["date", "val"]


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46456][CORE] Add `spark.ui.jettyStopTimeout` to set Jetty server stop timeout to unblock SparkContext shutdown

2023-12-20 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 847b65eac370 [SPARK-46456][CORE] Add `spark.ui.jettyStopTimeout` to 
set Jetty server stop timeout  to unblock SparkContext shutdown
847b65eac370 is described below

commit 847b65eac370fc8ef98c617a2934b2fa0fcee250
Author: Kent Yao 
AuthorDate: Wed Dec 20 15:04:45 2023 -0800

[SPARK-46456][CORE] Add `spark.ui.jettyStopTimeout` to set Jetty server 
stop timeout  to unblock SparkContext shutdown

### What changes were proposed in this pull request?

The `_stopTimeout` sets a graceful stop time for each ContainerLifeCycle.

This pull request aims to address the issue of interrupting shutdown hooks 
during the shutdown process. By setting the _stopTimeout to 5 seconds, we can 
reduce the risk of causing modules such as MapOutputTracker and BlockManager in 
the SparkContext to not be properly stopped, resulting in uncleaned resources.

- 
https://github.com/jetty/jetty.project/blob/1f34ece62b918a006231258474f5fa370c49df29/jetty-util/src/main/java/org/eclipse/jetty/util/component/AbstractLifeCycle.java#L53
```
private long _stopTimeout = 3;
```

This pull request reduces the value to 5 seconds, taking into account the 
value from the 
[QueuedThreadPool](https://git.eclipse.org/c/jetty/org.eclipse.jetty.project.git/tree/jetty-util/src/main/java/org/eclipse/jetty/util/thread/QueuedThreadPool.java#n96)

### Why are the changes needed?

In Jetty, the ContainerLifeCycle implementation manages a collection of 
contained beans. For managed beans, it stops them one by one and waits for each 
to stop for a specified time(30s). A single bean can result in the shutdown 
hook timeout.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

This can be reproduced easily by local-cluster with proxied SparkUI.

 Before

```
23/12/19 17:07:40 DEBUG QueuedThreadPool: Waiting for 
Thread[MasterUI-81,5,main] for 14999
23/12/19 17:07:55 DEBUG QueuedThreadPool: Waiting for 
Thread[MasterUI-81,5,main] for 14999
```

```
23/12/19 17:08:09 WARN ShutdownHookManager: ShutdownHook '' timeout, 
java.util.concurrent.TimeoutException
java.util.concurrent.TimeoutException
at 
java.base/java.util.concurrent.FutureTask.get(FutureTask.java:204)
at 
org.apache.hadoop.util.ShutdownHookManager.executeShutdown(ShutdownHookManager.java:124)
at 
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:95)
23/12/19 17:08:09 ERROR Utils: Uncaught exception in thread shutdown-hook-0
java.lang.InterruptedException
at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1679)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1464)
at org.apache.spark.rpc.netty.MessageLoop.stop(MessageLoop.scala:60)
at org.apache.spark.rpc.netty.Dispatcher.stop(Dispatcher.scala:205)
at 
org.apache.spark.rpc.netty.NettyRpcEnv.cleanup(NettyRpcEnv.scala:333)
at 
org.apache.spark.rpc.netty.NettyRpcEnv.shutdown(NettyRpcEnv.scala:311)
at 
org.apache.spark.deploy.LocalSparkCluster.$anonfun$stop$4(LocalSparkCluster.scala:97)
at 
org.apache.spark.deploy.LocalSparkCluster.$anonfun$stop$4$adapted(LocalSparkCluster.scala:97)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
at scala.collection.AbstractIterable.foreach(Iterable.scala:933)
at 
org.apache.spark.deploy.LocalSparkCluster.stop(LocalSparkCluster.scala:97)
at 
org.apache.spark.SparkContext$.$anonfun$createTaskScheduler$2(SparkContext.scala:3233)
at 
org.apache.spark.SparkContext$.$anonfun$createTaskScheduler$2$adapted(SparkContext.scala:3232)
at 
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.org$apache$spark$scheduler$cluster$StandaloneSchedulerBackend$$stop(StandaloneSchedulerBackend.scala:280)
at 
org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.stop(StandaloneSchedulerBackend.scala:143)
at 
org.apache.spark.scheduler.SchedulerBackend.stop(SchedulerBackend.scala:34)
at 
org.apache.spark.scheduler.SchedulerBackend.stop$(SchedulerBackend.scala:34)
at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:55)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.$anonfun$stop$2(TaskSchedulerImpl.scala:992)
at 

(spark) branch master updated: [SPARK-46447][SQL] Remove the legacy datetime rebasing SQL configs

2023-12-20 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1005cd5576ef [SPARK-46447][SQL] Remove the legacy datetime rebasing 
SQL configs
1005cd5576ef is described below

commit 1005cd5576ef073afee243848bcad5e5f4a9d309
Author: Max Gekk 
AuthorDate: Wed Dec 20 20:22:09 2023 +0300

[SPARK-46447][SQL] Remove the legacy datetime rebasing SQL configs

### What changes were proposed in this pull request?
In the PR, I propose to remove already deprecated SQL configs (alternatives 
to other configs):
- spark.sql.legacy.parquet.int96RebaseModeInWrite
- spark.sql.legacy.parquet.datetimeRebaseModeInWrite
- spark.sql.legacy.parquet.int96RebaseModeInRead
- spark.sql.legacy.avro.datetimeRebaseModeInWrite
- spark.sql.legacy.avro.datetimeRebaseModeInRead

### Why are the changes needed?
To improve code maintenance.

### Does this PR introduce _any_ user-facing change?
Should not.

### How was this patch tested?
By existing test suites.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44402 from MaxGekk/remove-legacy-rebase-confs-2.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 docs/sql-migration-guide.md|  6 
 .../org/apache/spark/sql/internal/SQLConf.scala| 36 +++---
 2 files changed, 24 insertions(+), 18 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 4e8e2422d7e0..30a37d97042a 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -30,6 +30,12 @@ license: |
 - Since Spark 4.0, `spark.sql.parquet.compression.codec` drops the support of 
codec name `lz4raw`, please use `lz4_raw` instead.
 - Since Spark 4.0, when overflowing during casting timestamp to byte/short/int 
under non-ansi mode, Spark will return null instead a wrapping value.
 - Since Spark 4.0, the `encode()` and `decode()` functions support only the 
following charsets 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 
'UTF-16'. To restore the previous behavior when the function accepts charsets 
of the current JDK used by Spark, set `spark.sql.legacy.javaCharsets` to `true`.
+- Since Spark 4.0, the legacy datetime rebasing SQL configs with the prefix 
`spark.sql.legacy` are removed. To restore the previous behavior, use the 
following configs:
+  - `spark.sql.parquet.int96RebaseModeInWrite` instead of 
`spark.sql.legacy.parquet.int96RebaseModeInWrite`
+  - `spark.sql.parquet.datetimeRebaseModeInWrite` instead of 
`spark.sql.legacy.parquet.datetimeRebaseModeInWrite`
+  - `spark.sql.parquet.int96RebaseModeInRead` instead of 
`spark.sql.legacy.parquet.int96RebaseModeInRead`
+  - `spark.sql.avro.datetimeRebaseModeInWrite` instead of 
`spark.sql.legacy.avro.datetimeRebaseModeInWrite`
+  - `spark.sql.avro.datetimeRebaseModeInRead` instead of 
`spark.sql.legacy.avro.datetimeRebaseModeInRead`
 
 ## Upgrading from Spark SQL 3.4 to 3.5
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6404779f30ac..d54cb3756638 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4081,7 +4081,6 @@ object SQLConf {
 "When EXCEPTION, which is the default, Spark will fail the writing if 
it sees ancient " +
 "timestamps that are ambiguous between the two calendars.")
   .version("3.1.0")
-  .withAlternative("spark.sql.legacy.parquet.int96RebaseModeInWrite")
   .stringConf
   .transform(_.toUpperCase(Locale.ROOT))
   .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
@@ -4099,7 +4098,6 @@ object SQLConf {
 "TIMESTAMP_MILLIS, TIMESTAMP_MICROS. The INT96 type has the separate 
config: " +
 s"${PARQUET_INT96_REBASE_MODE_IN_WRITE.key}.")
   .version("3.0.0")
-  .withAlternative("spark.sql.legacy.parquet.datetimeRebaseModeInWrite")
   .stringConf
   .transform(_.toUpperCase(Locale.ROOT))
   .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
@@ -4115,7 +4113,6 @@ object SQLConf {
 "timestamps that are ambiguous between the two calendars. This config 
is only effective " +
 "if the writer info (like Spark, Hive) of the Parquet files is 
unknown.")
   .version("3.1.0")
-  .withAlternative("spark.sql.legacy.parquet.int96RebaseModeInRead")
   .stringConf
   .transform(_.toUpperCase(Locale.ROOT))
   .checkValues(LegacyBehaviorPolicy.values.map(_.toString))
@@ -4149,7 +4146,6 @@ object SQLConf {
 "When EXCEPTION, which is the default, Spark will fail the writing if 
it sees " +
 

(spark) branch master updated: [SPARK-46452][SQL] Add a new API in DataWriter to write an iterator of records

2023-12-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c9cfaac90fd4 [SPARK-46452][SQL] Add a new API in DataWriter to write 
an iterator of records
c9cfaac90fd4 is described below

commit c9cfaac90fd423c3a38e295234e24744b946cb02
Author: allisonwang-db 
AuthorDate: Wed Dec 20 19:17:21 2023 +0800

[SPARK-46452][SQL] Add a new API in DataWriter to write an iterator of 
records

### What changes were proposed in this pull request?

This PR proposes to add a new method in `DataWriter` that supports writing 
an iterator of records:
```java
void writeAll(Iterator records) throws IOException
```

### Why are the changes needed?

To make the API more flexible and support more use cases (e.g Python data 
sources). See https://github.com/apache/spark/pull/43791

### Does this PR introduce _any_ user-facing change?

Yes. This PR introduces a new method in `DataWriter`.

### How was this patch tested?

Existing unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44410 from allisonwang-db/spark-46452-dsv2-write-all.

Authored-by: allisonwang-db 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/connector/write/DataWriter.java  |  18 +++
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 121 -
 2 files changed, 88 insertions(+), 51 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriter.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriter.java
index 6a1cee181bc2..d6e94fe2ca8b 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriter.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/DataWriter.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.connector.write;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Iterator;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.connector.metric.CustomTaskMetric;
@@ -74,6 +75,23 @@ public interface DataWriter extends Closeable {
*/
   void write(T record) throws IOException;
 
+  /**
+   * Writes all records provided by the given iterator. By default, it calls 
the {@link #write}
+   * method for each record in the iterator.
+   * 
+   * If this method fails (by throwing an exception), {@link #abort()} will be 
called and this
+   * data writer is considered to have been failed.
+   *
+   * @throws IOException if failure happens during disk/network IO like 
writing files.
+   *
+   * @since 4.0.0
+   */
+  default void writeAll(Iterator records) throws IOException {
+while (records.hasNext()) {
+  write(records.next());
+}
+  }
+
   /**
* Commits this writer after all records are written successfully, returns a 
commit message which
* will be sent back to driver side and passed to
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index 2527f201f3a8..97c1f7ced508 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -421,7 +421,7 @@ trait V2TableWriteExec extends V2CommandExec with 
UnaryExecNode {
 
 trait WritingSparkTask[W <: DataWriter[InternalRow]] extends Logging with 
Serializable {
 
-  protected def write(writer: W, row: InternalRow): Unit
+  protected def write(writer: W, iter: java.util.Iterator[InternalRow]): Unit
 
   def run(
   writerFactory: DataWriterFactory,
@@ -436,19 +436,11 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] 
extends Logging with Serial
 val attemptId = context.attemptNumber()
 val dataWriter = writerFactory.createWriter(partId, taskId).asInstanceOf[W]
 
-var count = 0L
+val iterWithMetrics = IteratorWithMetrics(iter, dataWriter, customMetrics)
+
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-  while (iter.hasNext) {
-if (count % CustomMetrics.NUM_ROWS_PER_UPDATE == 0) {
-  CustomMetrics.updateMetrics(
-dataWriter.currentMetricsValues.toImmutableArraySeq, customMetrics)
-}
-
-// Count is here.
-count += 1
-write(dataWriter, iter.next())
-  }
+  write(dataWriter, iterWithMetrics)
 
   CustomMetrics.updateMetrics(
 dataWriter.currentMetricsValues.toImmutableArraySeq, customMetrics)
@@ -476,7 +468,7 @@ trait WritingSparkTask[W <: DataWriter[InternalRow]] 
extends 

(spark) branch branch-3.4 updated: [SPARK-46330] Loading of Spark UI blocks for a long time when HybridStore enabled

2023-12-20 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 254d634548a0 [SPARK-46330] Loading of Spark UI blocks for a long time 
when HybridStore enabled
254d634548a0 is described below

commit 254d634548a0966e3cb501cf580f14c6d2e24732
Author: zhouyifan279 
AuthorDate: Wed Dec 20 16:50:38 2023 +0800

[SPARK-46330] Loading of Spark UI blocks for a long time when HybridStore 
enabled

### What changes were proposed in this pull request?
Move `LoadedAppUI` invalidate operation out of `FsHistoryProvider` 
synchronized block.

### Why are the changes needed?
When closing a HybridStore of a `LoadedAppUI` with a lot of data waiting to 
be written to disk, loading of other Spark UIs will be blocked for a long time.

See more details at https://issues.apache.org/jira/browse/SPARK-46330

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Passed existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44260 from zhouyifan279/SPARK-46330.

Authored-by: zhouyifan279 
Signed-off-by: Kent Yao 
(cherry picked from commit cf54e8f9a51bf54e8fa3e1011ac370e46134b134)
Signed-off-by: Kent Yao 
---
 .../org/apache/spark/deploy/history/FsHistoryProvider.scala   | 11 ++-
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 49b479f3124e..387bc7d9e45b 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -925,11 +925,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
* UI lifecycle.
*/
   private def invalidateUI(appId: String, attemptId: Option[String]): Unit = {
-synchronized {
-  activeUIs.get((appId, attemptId)).foreach { ui =>
-ui.invalidate()
-ui.ui.store.close()
-  }
+val uiOption = synchronized {
+  activeUIs.get((appId, attemptId))
+}
+uiOption.foreach { ui =>
+  ui.invalidate()
+  ui.ui.store.close()
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-46330] Loading of Spark UI blocks for a long time when HybridStore enabled

2023-12-20 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 0c00c54583fe [SPARK-46330] Loading of Spark UI blocks for a long time 
when HybridStore enabled
0c00c54583fe is described below

commit 0c00c54583fe3e56f940425aac6e0e4f05c4b9db
Author: zhouyifan279 
AuthorDate: Wed Dec 20 16:50:38 2023 +0800

[SPARK-46330] Loading of Spark UI blocks for a long time when HybridStore 
enabled

### What changes were proposed in this pull request?
Move `LoadedAppUI` invalidate operation out of `FsHistoryProvider` 
synchronized block.

### Why are the changes needed?
When closing a HybridStore of a `LoadedAppUI` with a lot of data waiting to 
be written to disk, loading of other Spark UIs will be blocked for a long time.

See more details at https://issues.apache.org/jira/browse/SPARK-46330

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Passed existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44260 from zhouyifan279/SPARK-46330.

Authored-by: zhouyifan279 
Signed-off-by: Kent Yao 
(cherry picked from commit cf54e8f9a51bf54e8fa3e1011ac370e46134b134)
Signed-off-by: Kent Yao 
---
 .../org/apache/spark/deploy/history/FsHistoryProvider.scala   | 11 ++-
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 49b479f3124e..387bc7d9e45b 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -925,11 +925,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
* UI lifecycle.
*/
   private def invalidateUI(appId: String, attemptId: Option[String]): Unit = {
-synchronized {
-  activeUIs.get((appId, attemptId)).foreach { ui =>
-ui.invalidate()
-ui.ui.store.close()
-  }
+val uiOption = synchronized {
+  activeUIs.get((appId, attemptId))
+}
+uiOption.foreach { ui =>
+  ui.invalidate()
+  ui.ui.store.close()
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46330] Loading of Spark UI blocks for a long time when HybridStore enabled

2023-12-20 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cf54e8f9a51b [SPARK-46330] Loading of Spark UI blocks for a long time 
when HybridStore enabled
cf54e8f9a51b is described below

commit cf54e8f9a51bf54e8fa3e1011ac370e46134b134
Author: zhouyifan279 
AuthorDate: Wed Dec 20 16:50:38 2023 +0800

[SPARK-46330] Loading of Spark UI blocks for a long time when HybridStore 
enabled

### What changes were proposed in this pull request?
Move `LoadedAppUI` invalidate operation out of `FsHistoryProvider` 
synchronized block.

### Why are the changes needed?
When closing a HybridStore of a `LoadedAppUI` with a lot of data waiting to 
be written to disk, loading of other Spark UIs will be blocked for a long time.

See more details at https://issues.apache.org/jira/browse/SPARK-46330

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Passed existing tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44260 from zhouyifan279/SPARK-46330.

Authored-by: zhouyifan279 
Signed-off-by: Kent Yao 
---
 .../org/apache/spark/deploy/history/FsHistoryProvider.scala   | 11 ++-
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 73fb0086b338..8f64de0847ec 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -926,11 +926,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
* UI lifecycle.
*/
   private def invalidateUI(appId: String, attemptId: Option[String]): Unit = {
-synchronized {
-  activeUIs.get((appId, attemptId)).foreach { ui =>
-ui.invalidate()
-ui.ui.store.close()
-  }
+val uiOption = synchronized {
+  activeUIs.get((appId, attemptId))
+}
+uiOption.foreach { ui =>
+  ui.invalidate()
+  ui.ui.store.close()
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-28386][SQL] Cannot resolve ORDER BY columns with GROUP BY and HAVING

2023-12-20 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9fdc3754d676 [SPARK-28386][SQL] Cannot resolve ORDER BY columns with 
GROUP BY and HAVING
9fdc3754d676 is described below

commit 9fdc3754d676bcb5de500c87025b9c571a7cf523
Author: Cheng Pan 
AuthorDate: Wed Dec 20 16:46:54 2023 +0800

[SPARK-28386][SQL] Cannot resolve ORDER BY columns with GROUP BY and HAVING

### What changes were proposed in this pull request?

This PR enhanced the analyzer to handle the following pattern properly.

```
Sort
 - Filter
   - Aggregate
```

### Why are the changes needed?

```
spark-sql (default)> CREATE TABLE t1 (flag BOOLEAN, dt STRING);

spark-sql (default)>   SELECT LENGTH(dt),
   >  COUNT(t1.flag)
   > FROM t1
   > GROUP BY LENGTH(dt)
   >   HAVING COUNT(t1.flag) > 1
   > ORDER BY LENGTH(dt);
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with 
name `dt` cannot be resolved. Did you mean one of the following? [`length(dt)`, 
`count(flag)`].; line 6 pos 16;
'Sort ['LENGTH('dt) ASC NULLS FIRST], true
+- Filter (count(flag)#60L > cast(1 as bigint))
   +- Aggregate [length(dt#9)], [length(dt#9) AS length(dt)#59, 
count(flag#8) AS count(flag)#60L]
  +- SubqueryAlias spark_catalog.default.t1
 +- Relation spark_catalog.default.t1[flag#8,dt#9] parquet
```

The above code demonstrates the failure case, the query failed during the 
analysis phase when both `HAVING` and `ORDER BY` clauses are present, but 
successful if only one is present.

### Does this PR introduce _any_ user-facing change?

Yes, maybe we can call it a bugfix.

### How was this patch tested?

New UTs are added

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44352 from pan3793/SPARK-28386.

Authored-by: Cheng Pan 
Signed-off-by: Kent Yao 
---
 .../spark/sql/catalyst/analysis/Analyzer.scala |  9 +++
 .../analysis/ResolveReferencesInSort.scala | 16 +++-
 .../sql-tests/analyzer-results/having.sql.out  | 29 ++
 .../udf/postgreSQL/udf-select_having.sql.out   | 11 
 .../src/test/resources/sql-tests/inputs/having.sql |  6 +
 .../resources/sql-tests/results/having.sql.out | 18 ++
 .../approved-plans-v2_7/q6.sf100/explain.txt   |  8 +++---
 .../approved-plans-v2_7/q6.sf100/simplified.txt|  2 +-
 .../approved-plans-v2_7/q6/explain.txt |  8 +++---
 .../approved-plans-v2_7/q6/simplified.txt  |  2 +-
 10 files changed, 87 insertions(+), 22 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e3538647e375..94f6d3346265 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2690,6 +2690,15 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
   }
   s.copy(order = newSortOrder, child = newChild)
 })
+
+  case s @ Sort(_, _, f @ Filter(cond, agg: Aggregate))
+  if agg.resolved && cond.resolved && s.order.forall(_.resolved) =>
+resolveOperatorWithAggregate(s.order.map(_.child), agg, (newExprs, 
newChild) => {
+  val newSortOrder = s.order.zip(newExprs).map {
+case (sortOrder, expr) => sortOrder.copy(child = expr)
+  }
+  s.copy(order = newSortOrder, child = f.copy(child = newChild))
+})
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala
index 02583ebb8f6b..6fa723d4a75f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInSort.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.expressions.SortOrder
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, 
LogicalPlan, Project, Sort}
 import org.apache.spark.sql.connector.catalog.CatalogManager
 
 /**
@@ -28,10 +28,11 @@ import 

(spark) branch master updated: [SPARK-46399][CORE] Add exit status to the Application End event for the use of Spark Listener

2023-12-20 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f9e468eafdf0 [SPARK-46399][CORE] Add exit status to the Application 
End event for the use of Spark Listener
f9e468eafdf0 is described below

commit f9e468eafdf0682a2353897677d8487e1c2ab613
Author: Reza Safi 
AuthorDate: Wed Dec 20 02:20:56 2023 -0600

[SPARK-46399][CORE] Add exit status to the Application End event for the 
use of Spark Listener

### What changes were proposed in this pull request?
Currently SparkListenerApplicationEnd only has a timestamp value and there 
is not exit status recorded with it.
This change will exitcode to the SparkListenerApplicationEnd event.

### Why are the changes needed?
Without this it is hard to understand whether an attempt has failed or 
succeeded when using spark listeners.

### Does this PR introduce _any_ user-facing change?
No, the added exitCode is an optional parameter of the event.

### How was this patch tested?
Locally using the normal tests. also anew tests is added.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44340 from rezasafi/rezaaddstatuslistener.

Authored-by: Reza Safi 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 core/src/main/scala/org/apache/spark/SparkContext.scala |  6 +++---
 .../scala/org/apache/spark/scheduler/SparkListener.scala|  4 +++-
 .../src/main/scala/org/apache/spark/util/JsonProtocol.scala |  4 +++-
 .../apache/spark/scheduler/EventLoggingListenerSuite.scala  | 13 +++--
 project/MimaExcludes.scala  |  3 +++
 5 files changed, 23 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b43bb08d25a7..da37fa83254b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2273,7 +2273,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
 if (listenerBus != null) {
   Utils.tryLogNonFatalError {
-postApplicationEnd()
+postApplicationEnd(exitCode)
   }
 }
 Utils.tryLogNonFatalError {
@@ -2803,8 +2803,8 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /** Post the application end event */
-  private def postApplicationEnd(): Unit = {
-listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis))
+  private def postApplicationEnd(exitCode: Int): Unit = {
+listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis, 
Some(exitCode)))
   }
 
   /** Post the environment update event once the task scheduler is ready */
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index fd846545d689..cc19b71bfc4d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -289,7 +289,9 @@ case class SparkListenerApplicationStart(
 driverAttributes: Option[Map[String, String]] = None) extends 
SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
+case class SparkListenerApplicationEnd(
+time: Long,
+exitCode: Option[Int] = None) extends SparkListenerEvent
 
 /**
  * An internal class that describes the metadata of an event log.
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 987e5b4328f9..22dcf6c11e4b 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -277,6 +277,7 @@ private[spark] object JsonProtocol extends JsonUtils {
 g.writeStartObject()
 g.writeStringField("Event", 
SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationEnd)
 g.writeNumberField("Timestamp", applicationEnd.time)
+applicationEnd.exitCode.foreach(exitCode => g.writeNumberField("ExitCode", 
exitCode))
 g.writeEndObject()
   }
 
@@ -1065,7 +1066,8 @@ private[spark] object JsonProtocol extends JsonUtils {
   }
 
   def applicationEndFromJson(json: JsonNode): SparkListenerApplicationEnd = {
-SparkListenerApplicationEnd(json.get("Timestamp").extractLong)
+val exitCode = jsonOption(json.get("ExitCode")).map(_.extractInt)
+SparkListenerApplicationEnd(json.get("Timestamp").extractLong, exitCode)
   }
 
   def executorAddedFromJson(json: JsonNode): SparkListenerExecutorAdded = {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 

(spark) branch master updated: [SPARK-46272][SQL] Support CTAS using DSv2 sources

2023-12-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cc9d9911d7ed [SPARK-46272][SQL] Support CTAS using DSv2 sources
cc9d9911d7ed is described below

commit cc9d9911d7eddfb3bdc7b1fa621687f42930e13a
Author: allisonwang-db 
AuthorDate: Wed Dec 20 16:17:45 2023 +0800

[SPARK-46272][SQL] Support CTAS using DSv2 sources

### What changes were proposed in this pull request?

https://github.com/apache/spark/pull/43949 supports CREATE TABLE using DSv2 
sources. This PR supports CREATE TABLE AS SELECT (CTAS) using DSv2 sources. It 
turns out that we don't need additional code changes. This PR simply adds more 
test cases for CTAS queries.

### Why are the changes needed?

To add tests for CTAS for DSv2 sources.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

New tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44190 from allisonwang-db/spark-46272-ctas.

Authored-by: allisonwang-db 
Signed-off-by: Wenchen Fan 
---
 .../src/main/resources/error/error-classes.json|   6 +
 docs/sql-error-conditions.md   |   6 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |  13 +-
 .../datasources/v2/TableCapabilityCheck.scala  |   2 +-
 .../datasources/v2/V2SessionCatalog.scala  |  19 ++-
 .../DataSourceV2DataFrameSessionCatalogSuite.scala |   2 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |   2 +-
 .../spark/sql/connector/DataSourceV2Suite.scala| 154 -
 .../spark/sql/connector/FakeV2Provider.scala   |  22 +++
 .../connector/SupportsCatalogOptionsSuite.scala|   2 +-
 .../spark/sql/connector/V1WriteFallbackSuite.scala |   2 +-
 .../streaming/test/DataStreamTableAPISuite.scala   |   4 +-
 12 files changed, 217 insertions(+), 17 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 30aacc07d318..930042505379 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -893,6 +893,12 @@
 ],
 "sqlState" : "42K02"
   },
+  "DATA_SOURCE_TABLE_SCHEMA_MISMATCH" : {
+"message" : [
+  "The schema of the data source table  does not match the 
actual schema . If you are using the DataFrameReader.schema API 
or creating a table, avoid specifying the schema."
+],
+"sqlState" : "42K03"
+  },
   "DATETIME_OVERFLOW" : {
 "message" : [
   "Datetime operation overflow: ."
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index e4b04ce02fe2..94c7c167e392 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -492,6 +492,12 @@ Data source '``' not found. Please make sure the 
data source is regist
 
 Failed to find the data source: ``. Please find packages at 
`https://spark.apache.org/third-party-projects.html`.
 
+### DATA_SOURCE_TABLE_SCHEMA_MISMATCH
+
+[SQLSTATE: 
42K03](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+The schema of the data source table `` does not match the actual 
schema ``. If you are using the DataFrameReader.schema API or 
creating a table, avoid specifying the schema.
+
 ### DATETIME_OVERFLOW
 
 [SQLSTATE: 22008](sql-error-conditions-sqlstates.html#class-22-data-exception)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index cdab854c004b..3fd1fe04aed6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -926,8 +926,8 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
 unsupportedTableOperationError(table.name(), "either micro-batch or 
continuous scan")
   }
 
-  def unsupportedAppendInBatchModeError(table: Table): Throwable = {
-unsupportedTableOperationError(table.name(), "append in batch mode")
+  def unsupportedAppendInBatchModeError(name: String): Throwable = {
+unsupportedTableOperationError(name, "append in batch mode")
   }
 
   def unsupportedDynamicOverwriteInBatchModeError(table: Table): Throwable = {
@@ -3924,4 +3924,13 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
   errorClass = "NESTED_EXECUTE_IMMEDIATE",
   messageParameters = Map("sqlString" -> toSQLStmt(queryString)))
   }
+
+  def dataSourceTableSchemaMismatchError(
+  tableSchema: StructType, actualSchema: StructType): Throwable = {
+

(spark) branch master updated: [SPARK-46069][SQL][FOLLOWUP] Simplify the algorithm and add comments

2023-12-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f555e9983f6d [SPARK-46069][SQL][FOLLOWUP] Simplify the algorithm and 
add comments
f555e9983f6d is described below

commit f555e9983f6dcea90066c3f4678fb11e11c6949e
Author: Wenchen Fan 
AuthorDate: Wed Dec 20 16:10:02 2023 +0800

[SPARK-46069][SQL][FOLLOWUP] Simplify the algorithm and add comments

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/43982, to 
simplify the algorithm without the "add one day" operation. This makes the 
algorithm easier to document.

### Why are the changes needed?

code simplication.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

updated tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #44403 from cloud-fan/minor.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../optimizer/UnwrapCastInBinaryComparison.scala   | 50 --
 .../UnwrapCastInBinaryComparisonSuite.scala| 48 ++---
 2 files changed, 59 insertions(+), 39 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala
index dd516afeb58c..19af9f5a2b55 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala
@@ -341,36 +341,58 @@ object UnwrapCastInBinaryComparison extends 
Rule[LogicalPlan] {
   ts: Literal,
   tz: Option[String],
   evalMode: EvalMode.Value): Expression = {
-val floorDate = Cast(ts, fromExp.dataType, tz, evalMode)
-val dateAddOne = DateAdd(floorDate, Literal(1, IntegerType))
-val isStartOfDay =
-  EqualTo(ts, Cast(floorDate, ts.dataType, tz, 
evalMode)).eval(EmptyRow).asInstanceOf[Boolean]
+assert(fromExp.dataType == DateType)
+val floorDate = Literal(Cast(ts, DateType, tz, evalMode).eval(), DateType)
+val timePartsAllZero =
+  EqualTo(ts, Cast(floorDate, ts.dataType, tz, 
evalMode)).eval().asInstanceOf[Boolean]
 
 exp match {
   case _: GreaterThan =>
+// "CAST(date AS TIMESTAMP) > timestamp"  ==>  "date > floor_date", no 
matter the
+// timestamp has non-zero time part or not.
 GreaterThan(fromExp, floorDate)
+  case _: LessThanOrEqual =>
+// "CAST(date AS TIMESTAMP) <= timestamp"  ==>  "date <= floor_date", 
no matter the
+// timestamp has non-zero time part or not.
+LessThanOrEqual(fromExp, floorDate)
   case _: GreaterThanOrEqual =>
-if (isStartOfDay) {
+if (!timePartsAllZero) {
+  // "CAST(date AS TIMESTAMP) >= timestamp"  ==>  "date > floor_date", 
if the timestamp has
+  // non-zero time part.
+  GreaterThan(fromExp, floorDate)
+} else {
+  // If the timestamp's time parts are all zero, the date can also be 
the floor_date.
   GreaterThanOrEqual(fromExp, floorDate)
+}
+  case _: LessThan =>
+if (!timePartsAllZero) {
+  // "CAST(date AS TIMESTAMP) < timestamp"  ==>  "date <= floor_date", 
if the timestamp has
+  // non-zero time part.
+  LessThanOrEqual(fromExp, floorDate)
 } else {
-  GreaterThanOrEqual(fromExp, dateAddOne)
+  // If the timestamp's time parts are all zero, the date can not be 
the floor_date.
+  LessThan(fromExp, floorDate)
 }
   case _: EqualTo =>
-if (isStartOfDay) {
+if (timePartsAllZero) {
+  // "CAST(date AS TIMESTAMP) = timestamp"  ==>  "date = floor_date", 
if the timestamp's
+  // time parts are all zero
   EqualTo(fromExp, floorDate)
 } else {
+  // if the timestamp has non-zero time part, then we always get false 
unless the date is
+  // null, in which case the result is also null.
   falseIfNotNull(fromExp)
 }
   case _: EqualNullSafe =>
-if (isStartOfDay) EqualNullSafe(fromExp, floorDate) else FalseLiteral
-  case _: LessThan =>
-if (isStartOfDay) {
-  LessThan(fromExp, floorDate)
+if (timePartsAllZero) {
+  // "CAST(date AS TIMESTAMP) <=> timestamp"  ==>  "date <=> 
floor_date", if the timestamp's
+  // time parts are all zero
+  EqualNullSafe(fromExp, floorDate)
 } else {
-  LessThan(fromExp, dateAddOne)
+  // if the timestamp has non-zero time part, then we