(spark) branch master updated: [SPARK-45938][INFRA] Add `utils` to the dependencies of the `core/unsafe/network_common` module in `module.py`

2023-11-15 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 22f3221f5db [SPARK-45938][INFRA] Add `utils` to the dependencies of 
the `core/unsafe/network_common` module in `module.py`
22f3221f5db is described below

commit 22f3221f5db88769ae5f999cc4be01cd69a6172e
Author: yangjie01 
AuthorDate: Thu Nov 16 15:51:26 2023 +0800

[SPARK-45938][INFRA] Add `utils` to the dependencies of the 
`core/unsafe/network_common` module in `module.py`

### What changes were proposed in this pull request?
This pr add `utils` to the dependencies of the `core/unsafe/network_common` 
module in `module.py` due to  `utils` is direct dependency of 
`core/unsafe/network_common`:


https://github.com/apache/spark/blob/7120e6b88f2327ffb71c4bca14b10b15aeb26c32/core/pom.xml#L102-L106


https://github.com/apache/spark/blob/ef93dc247844aca778deda06897a2759ad5eeea4/common/unsafe/pom.xml#L44-L48


https://github.com/apache/spark/blob/ef93dc247844aca778deda06897a2759ad5eeea4/common/network-common/pom.xml#L177-L181

### Why are the changes needed?
All UTs of `utils` module are still in the `core` module now. This pr is 
used to avoid the issue of not running unit tests in the `core` module when 
changing the `utils` module.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43818 from LuciferYang/common-utils-test.

Lead-authored-by: yangjie01 
Co-authored-by: YangJie 
Signed-off-by: yangjie01 
---
 dev/sparktestsupport/modules.py | 22 +++---
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 01757ba28dd..8aa93821637 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -113,6 +113,14 @@ tags = Module(
 ],
 )
 
+utils = Module(
+name="utils",
+dependencies=[tags],
+source_file_regexes=[
+"common/utils/",
+],
+)
+
 kvstore = Module(
 name="kvstore",
 dependencies=[tags],
@@ -126,7 +134,7 @@ kvstore = Module(
 
 network_common = Module(
 name="network-common",
-dependencies=[tags],
+dependencies=[tags, utils],
 source_file_regexes=[
 "common/network-common/",
 ],
@@ -148,7 +156,7 @@ network_shuffle = Module(
 
 unsafe = Module(
 name="unsafe",
-dependencies=[tags],
+dependencies=[tags, utils],
 source_file_regexes=[
 "common/unsafe",
 ],
@@ -157,14 +165,6 @@ unsafe = Module(
 ],
 )
 
-utils = Module(
-name="utils",
-dependencies=[tags],
-source_file_regexes=[
-"common/utils/",
-],
-)
-
 launcher = Module(
 name="launcher",
 dependencies=[tags],
@@ -178,7 +178,7 @@ launcher = Module(
 
 core = Module(
 name="core",
-dependencies=[kvstore, network_common, network_shuffle, unsafe, launcher],
+dependencies=[kvstore, network_common, network_shuffle, unsafe, launcher, 
utils],
 source_file_regexes=[
 "core/",
 ],


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2

2023-11-15 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c91dbde2f94 [SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2
c91dbde2f94 is described below

commit c91dbde2f94c15d2007dccaeec5d72a159e9f4e2
Author: panbingkun 
AuthorDate: Thu Nov 16 13:15:27 2023 +0800

[SPARK-33393][SQL] Support SHOW TABLE EXTENDED in v2

### What changes were proposed in this pull request?
The pr aim to implement v2 SHOW TABLE EXTENDED as `ShowTableExec`

### Why are the changes needed?
To have feature parity with the datasource V1.

### Does this PR introduce _any_ user-facing change?
Yes, Support SHOW TABLE EXTENDED in v2.

### How was this patch tested?
Add new UT.
By running the unified tests for v2 implementation:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *ShowTablesSuite"
$ build/sbt "test:testOnly *ShowTablesSuite"
```

Closes #37588 from panbingkun/v2_SHOW_TABLE_EXTENDED.

Authored-by: panbingkun 
Signed-off-by: Wenchen Fan 
---
 .../src/main/resources/error/error-classes.json|   5 -
 .../sql/catalyst/analysis/CheckAnalysis.scala  |   4 -
 .../sql/catalyst/analysis/ResolveCatalogs.scala|   2 +-
 .../sql/catalyst/catalog/SessionCatalog.scala  |  19 ++
 .../spark/sql/catalyst/parser/AstBuilder.scala |  30 +-
 .../sql/catalyst/plans/logical/v2Commands.scala|  23 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  19 +-
 .../datasources/v2/DataSourceV2Implicits.scala |   5 +
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  22 +-
 .../datasources/v2/DataSourceV2Strategy.scala  |  10 +
 .../datasources/v2/ShowTablesExtendedExec.scala| 189 
 .../sql-tests/analyzer-results/show-tables.sql.out |  13 +-
 .../sql-tests/results/show-tables.sql.out  |  13 +-
 .../execution/command/ShowTablesParserSuite.scala  |  34 +--
 .../execution/command/ShowTablesSuiteBase.scala| 334 -
 .../sql/execution/command/v1/ShowTablesSuite.scala |  70 -
 .../sql/execution/command/v2/ShowTablesSuite.scala |  71 ++---
 .../hive/execution/command/ShowTablesSuite.scala   |  77 +
 18 files changed, 815 insertions(+), 125 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index b60d70cefc9..afcd841a2ce 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -4776,11 +4776,6 @@
   "Invalid bound function ': there are  arguments but 
 parameters returned from 'inputTypes()'."
 ]
   },
-  "_LEGACY_ERROR_TEMP_1200" : {
-"message" : [
-  " is not supported for v2 tables."
-]
-  },
   "_LEGACY_ERROR_TEMP_1201" : {
 "message" : [
   "Cannot resolve column name \"\" among ()."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 176a45a6f8e..3843901a2e0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -271,10 +271,6 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
   case _ =>
 }
 
-  // `ShowTableExtended` should have been converted to the v1 command if 
the table is v1.
-  case _: ShowTableExtended =>
-throw QueryCompilationErrors.commandUnsupportedInV2TableError("SHOW 
TABLE EXTENDED")
-
   case operator: LogicalPlan =>
 operator transformExpressionsDown {
   // Check argument data types of higher-order functions downwards 
first.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 253c8eb190f..b0df76068ff 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -50,7 +50,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
 case s @ ShowTables(UnresolvedNamespace(Seq()), _, _) =>
   s.copy(namespace = ResolvedNamespace(currentCatalog,
 catalogManager.currentNamespace.toImmutableArraySeq))
-case s @ ShowTableExtended(UnresolvedNamespace(Seq()), _, _, _) =>
+case s @ ShowTablesExtended(UnresolvedNamespace(Seq()), _, _) =>
   s.copy(namespace = ResolvedNamespace(currentCatalog,
 catalogManager.currentNamespace.toImmutableArraySeq))
 case s @ ShowViews(Unre

(spark) branch master updated: [SPARK-45747][SS] Use prefix key information in state metadata to handle reading state for session window aggregation

2023-11-15 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bd14136b397 [SPARK-45747][SS] Use prefix key information in state 
metadata to handle reading state for session window aggregation
bd14136b397 is described below

commit bd14136b39784038c3cef7dc3cafac2b07024a92
Author: Chaoqin Li 
AuthorDate: Thu Nov 16 12:29:10 2023 +0900

[SPARK-45747][SS] Use prefix key information in state metadata to handle 
reading state for session window aggregation

### What changes were proposed in this pull request?
Currently reading state for session window aggregation operator is not 
supported because the numColPrefixKey is unknown. We can read the operator 
state metadata introduced in SPARK-45558 to determine the number of prefix 
columns and load the state of session window correctly.

### Why are the changes needed?
To support reading state for session window aggregation.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Add integration test.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43788 from chaoqin-li1123/session_window_agg.

Authored-by: Chaoqin Li 
Signed-off-by: Jungtaek Lim 
---
 .../v2/state/StatePartitionReader.scala| 30 +++
 .../v2/state/metadata/StateMetadataSource.scala|  2 +-
 .../v2/state/StateDataSourceReadSuite.scala| 16 ++
 .../v2/state/StateDataSourceTestBase.scala | 34 ++
 4 files changed, 75 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
index 824034f42ea..1e5f7216e8b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.datasources.v2.state
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
JoinedRow, UnsafeRow}
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
@@ -47,7 +48,7 @@ class StatePartitionReader(
 storeConf: StateStoreConf,
 hadoopConf: SerializableConfiguration,
 partition: StateStoreInputPartition,
-schema: StructType) extends PartitionReader[InternalRow] {
+schema: StructType) extends PartitionReader[InternalRow] with Logging {
 
   private val keySchema = SchemaUtil.getSchemaAsDataType(schema, 
"key").asInstanceOf[StructType]
   private val valueSchema = SchemaUtil.getSchemaAsDataType(schema, 
"value").asInstanceOf[StructType]
@@ -57,12 +58,29 @@ class StatePartitionReader(
   partition.sourceOptions.operatorId, partition.partition, 
partition.sourceOptions.storeName)
 val stateStoreProviderId = StateStoreProviderId(stateStoreId, 
partition.queryId)
 
-// TODO: This does not handle the case of session window aggregation; we 
don't have an
-//  information whether the state store uses prefix scan or not. We will 
have to add such
-//  information to determine the right encoder/decoder for the data.
+val allStateStoreMetadata = new StateMetadataPartitionReader(
+  partition.sourceOptions.stateCheckpointLocation.getParent.toString, 
hadoopConf)
+  .stateMetadata.toArray
+
+val stateStoreMetadata = allStateStoreMetadata.filter { entry =>
+  entry.operatorId == partition.sourceOptions.operatorId &&
+entry.stateStoreName == partition.sourceOptions.storeName
+}
+val numColsPrefixKey = if (stateStoreMetadata.isEmpty) {
+  logWarning("Metadata for state store not found, possible cause is this 
checkpoint " +
+"is created by older version of spark. If the query has session window 
aggregation, " +
+"the state can't be read correctly and runtime exception will be 
thrown. " +
+"Run the streaming query in newer spark version to generate state 
metadata " +
+"can fix the issue.")
+  0
+} else {
+  require(stateStoreMetadata.length == 1)
+  stateStoreMetadata.head.numColsPrefixKey
+}
+
 StateStore.getReadOnly(stateStoreProviderId, keySchema, valueSchema,
-  numColsPrefixKey = 0, version = partition.sourceOptions.batchId + 1, 
storeConf = storeConf,
-  hadoopConf = hadoopConf.value)
+  numColsPrefixKey = numColsPrefixKey, version = 
partition.sourceOptions.batchId + 1,
+  storeConf = storeConf, hadoopConf = hadoopCon

(spark) branch master updated: [SPARK-45827][SQL] Fix variant parquet reader

2023-11-15 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f7d56e2827c [SPARK-45827][SQL] Fix variant parquet reader
f7d56e2827c is described below

commit f7d56e2827c4c04c065c0cf04f23084f3f8594ad
Author: Chenhao Li 
AuthorDate: Thu Nov 16 11:23:11 2023 +0800

[SPARK-45827][SQL] Fix variant parquet reader

## What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/43707. The 
previous PR missed a piece in the variant parquet reader: we are treating the 
variant type as `struct`, so it also needs a 
similar `assembleStruct` process in the Parquet reader to correctly set the 
nullness of variant values from def/rep levels.

## How was this patch tested?

Extend the existing unit test. It would fail without the change.

Closes #43825 from chenhao-db/fix_variant_parquet_reader.

Authored-by: Chenhao Li 
Signed-off-by: Wenchen Fan 
---
 .../sql/execution/datasources/parquet/ParquetColumnVector.java | 3 ++-
 sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala| 7 +++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
index f00b5b3a88b..5198096fe01 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.MapType;
 import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.VariantType;
 
 /**
  * Contains necessary information representing a Parquet column, either of 
primitive or nested type.
@@ -175,7 +176,7 @@ final class ParquetColumnVector {
 child.assemble();
   }
   assembleCollection();
-} else if (type instanceof StructType) {
+} else if (type instanceof StructType || type instanceof VariantType) {
   for (ParquetColumnVector child : children) {
 child.assemble();
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
index dde986c555b..58e0d7eeef3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/VariantSuite.scala
@@ -73,5 +73,12 @@ class VariantSuite extends QueryTest with SharedSparkSession 
{
   values.map(v => if (v == null) "null" else v.debugString()).sorted
 }
 assert(prepareAnswer(input) == prepareAnswer(result))
+
+withTempDir { dir =>
+  val tempDir = new File(dir, "files").getCanonicalPath
+  df.write.parquet(tempDir)
+  val readResult = 
spark.read.parquet(tempDir).collect().map(_.get(0).asInstanceOf[VariantVal])
+  assert(prepareAnswer(input) == prepareAnswer(readResult))
+}
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated (1a1d3034b1d -> ef93dc24784)

2023-11-15 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 1a1d3034b1d [SPARK-45930][SQL] Support non-deterministic UDFs in 
MapInPandas/MapInArrow
 add ef93dc24784 [MINOR] Fix some typo

No new revisions were added by this update.

Summary of changes:
 binder/postBuild| 4 ++--
 common/utils/src/main/resources/error/error-classes.json| 6 +++---
 .../scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala| 2 +-
 .../scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala | 2 +-
 .../scala/org/apache/spark/sql/jdbc/v2/OracleNamespaceSuite.scala   | 2 +-
 docs/cloud-integration.md   | 2 +-
 docs/sql-error-conditions-unsupported-feature-error-class.md| 2 +-
 .../scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala| 4 ++--
 8 files changed, 12 insertions(+), 12 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-45930][SQL] Support non-deterministic UDFs in MapInPandas/MapInArrow

2023-11-15 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1a1d3034b1d [SPARK-45930][SQL] Support non-deterministic UDFs in 
MapInPandas/MapInArrow
1a1d3034b1d is described below

commit 1a1d3034b1d7d3c457ef0b1b5693698c1c5e77d8
Author: allisonwang-db 
AuthorDate: Thu Nov 16 11:50:43 2023 +0900

[SPARK-45930][SQL] Support non-deterministic UDFs in MapInPandas/MapInArrow

### What changes were proposed in this pull request?

This PR supports non-deterministic UDFs in MapInPandas and MapInArrow.

### Why are the changes needed?

Currently, MapInPandas and MapInArrow do not support non-deterministic 
UDFs. The analyzer will fail with this error:
`org.apache.spark.sql.AnalysisException: 
[INVALID_NON_DETERMINISTIC_EXPRESSIONS] The operator expects a deterministic 
expression, but the actual expression is "pyUDF()"`.

This is needed for https://github.com/apache/spark/pull/43791.

### Does this PR introduce _any_ user-facing change?

No. Users cannot directly create a non-deterministic UDF in PySpark to be 
used in MapInPandas/MapInArrow.

### How was this patch tested?

New unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43810 from allisonwang-db/spark-45930-map-in-pandas-non-det.

Authored-by: allisonwang-db 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/catalyst/analysis/CheckAnalysis.scala  |  2 ++
 .../sql/catalyst/analysis/AnalysisSuite.scala  | 32 ++
 2 files changed, 34 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index d41345f38c2..176a45a6f8e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -746,6 +746,8 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
 !o.isInstanceOf[Expand] &&
 !o.isInstanceOf[Generate] &&
 !o.isInstanceOf[CreateVariable] &&
+!o.isInstanceOf[MapInPandas] &&
+!o.isInstanceOf[PythonMapInArrow] &&
 // Lateral join is checked in checkSubqueryExpression.
 !o.isInstanceOf[LateralJoin] =>
 // The rule above is used to check Aggregate operator.
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 8e514e245cb..441b5fb6ca6 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -711,6 +711,38 @@ class AnalysisSuite extends AnalysisTest with Matchers {
   Project(Seq(UnresolvedAttribute("temp0.a"), 
UnresolvedAttribute("temp1.a")), join))
   }
 
+  test("SPARK-45930: MapInPandas with non-deterministic UDF") {
+val pythonUdf = PythonUDF("pyUDF", null,
+  StructType(Seq(StructField("a", LongType))),
+  Seq.empty,
+  PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
+  false)
+val output = 
DataTypeUtils.toAttributes(pythonUdf.dataType.asInstanceOf[StructType])
+val project = Project(Seq(UnresolvedAttribute("a")), testRelation)
+val mapInPandas = MapInPandas(
+  pythonUdf,
+  output,
+  project,
+  false)
+assertAnalysisSuccess(mapInPandas)
+  }
+
+  test("SPARK-45930: MapInArrow with non-deterministic UDF") {
+val pythonUdf = PythonUDF("pyUDF", null,
+  StructType(Seq(StructField("a", LongType))),
+  Seq.empty,
+  PythonEvalType.SQL_MAP_ARROW_ITER_UDF,
+  false)
+val output = 
DataTypeUtils.toAttributes(pythonUdf.dataType.asInstanceOf[StructType])
+val project = Project(Seq(UnresolvedAttribute("a")), testRelation)
+val mapInArrow = PythonMapInArrow(
+  pythonUdf,
+  output,
+  project,
+  false)
+assertAnalysisSuccess(mapInArrow)
+  }
+
   test("SPARK-34741: Avoid ambiguous reference in MergeIntoTable") {
 val cond = $"a" > 1
 assertAnalysisErrorClass(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-45931][PYTHON][DOCS] Refine docstring of mapInPandas

2023-11-15 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e795dc12833 [SPARK-45931][PYTHON][DOCS] Refine docstring of mapInPandas
e795dc12833 is described below

commit e795dc12833a3e2cf6996fb1b649c27bb0c85a28
Author: allisonwang-db 
AuthorDate: Thu Nov 16 10:42:03 2023 +0900

[SPARK-45931][PYTHON][DOCS] Refine docstring of mapInPandas

### What changes were proposed in this pull request?

This PR improves the docstring of the dataframe function `mapInPandas`.

### Why are the changes needed?

To improve PySpark documentation.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

doctest + manual verification

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43811 from allisonwang-db/spark-45931-refine-mapinpandas.

Authored-by: allisonwang-db 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/pandas/map_ops.py | 34 +-
 1 file changed, 33 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/pandas/map_ops.py 
b/python/pyspark/sql/pandas/map_ops.py
index 710fc8a9a37..55aa3249530 100644
--- a/python/pyspark/sql/pandas/map_ops.py
+++ b/python/pyspark/sql/pandas/map_ops.py
@@ -67,8 +67,10 @@ class PandasMapOpsMixin:
 
 Examples
 
->>> from pyspark.sql.functions import pandas_udf
 >>> df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
+
+Filter rows with id equal to 1:
+
 >>> def filter_func(iterator):
 ... for pdf in iterator:
 ... yield pdf[pdf.id == 1]
@@ -80,6 +82,36 @@ class PandasMapOpsMixin:
 |  1| 21|
 +---+---+
 
+Compute the mean age for each id:
+
+>>> def mean_age(iterator):
+... for pdf in iterator:
+... yield pdf.groupby("id").mean().reset_index()
+...
+>>> df.mapInPandas(mean_age, "id: bigint, age: double").show()  # 
doctest: +SKIP
++---++
+| id| age|
++---++
+|  1|21.0|
+|  2|30.0|
++---++
+
+Add a new column with the double of the age:
+
+>>> def double_age(iterator):
+... for pdf in iterator:
+... pdf["double_age"] = pdf["age"] * 2
+... yield pdf
+...
+>>> df.mapInPandas(
+... double_age, "id: bigint, age: bigint, double_age: 
bigint").show()  # doctest: +SKIP
++---+---+--+
+| id|age|double_age|
++---+---+--+
+|  1| 21|42|
+|  2| 30|60|
++---+---+--+
+
 Set ``barrier`` to ``True`` to force the ``mapInPandas`` stage running 
in the
 barrier mode, it ensures all Python workers in the stage will be
 launched concurrently.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-45936][PS] Optimize `Index.symmetric_difference`

2023-11-15 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2bb36fb271b [SPARK-45936][PS] Optimize `Index.symmetric_difference`
2bb36fb271b is described below

commit 2bb36fb271b60dda68567b92613a3664a7aae2b8
Author: Ruifeng Zheng 
AuthorDate: Thu Nov 16 10:40:05 2023 +0900

[SPARK-45936][PS] Optimize `Index.symmetric_difference`

### What changes were proposed in this pull request?
Add a helper function for `XOR`, and use it to optimize 
`Index.symmetric_difference`

### Why are the changes needed?
the old plan is too complex

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #43816 from zhengruifeng/ps_base_diff.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/indexes/base.py  |  4 ++--
 python/pyspark/pandas/indexes/multi.py | 15 ++-
 python/pyspark/pandas/utils.py | 17 +
 3 files changed, 21 insertions(+), 15 deletions(-)

diff --git a/python/pyspark/pandas/indexes/base.py 
b/python/pyspark/pandas/indexes/base.py
index 6c6ee9ae0d7..a515a79dcd7 100644
--- a/python/pyspark/pandas/indexes/base.py
+++ b/python/pyspark/pandas/indexes/base.py
@@ -72,6 +72,7 @@ from pyspark.pandas.utils import (
 validate_index_loc,
 ERROR_MESSAGE_CANNOT_COMBINE,
 log_advice,
+xor,
 )
 from pyspark.pandas.internal import (
 InternalField,
@@ -1468,8 +1469,7 @@ class Index(IndexOpsMixin):
 
 sdf_self = 
self._psdf._internal.spark_frame.select(self._internal.index_spark_columns)
 sdf_other = 
other._psdf._internal.spark_frame.select(other._internal.index_spark_columns)
-
-sdf_symdiff = 
sdf_self.union(sdf_other).subtract(sdf_self.intersect(sdf_other))
+sdf_symdiff = xor(sdf_self, sdf_other)
 
 if sort:
 sdf_symdiff = 
sdf_symdiff.sort(*self._internal.index_spark_column_names)
diff --git a/python/pyspark/pandas/indexes/multi.py 
b/python/pyspark/pandas/indexes/multi.py
index 62b42c1fcd0..7d2712cbb53 100644
--- a/python/pyspark/pandas/indexes/multi.py
+++ b/python/pyspark/pandas/indexes/multi.py
@@ -38,6 +38,7 @@ from pyspark.pandas.utils import (
 scol_for,
 verify_temp_column_name,
 validate_index_loc,
+xor,
 )
 from pyspark.pandas.internal import (
 InternalField,
@@ -809,19 +810,7 @@ class MultiIndex(Index):
 
 sdf_self = 
self._psdf._internal.spark_frame.select(self._internal.index_spark_columns)
 sdf_other = 
other._psdf._internal.spark_frame.select(other._internal.index_spark_columns)
-
-tmp_tag_col = verify_temp_column_name(sdf_self, "__multi_index_tag__")
-tmp_max_col = verify_temp_column_name(sdf_self, 
"__multi_index_max_tag__")
-tmp_min_col = verify_temp_column_name(sdf_self, 
"__multi_index_min_tag__")
-
-sdf_symdiff = (
-sdf_self.withColumn(tmp_tag_col, F.lit(0))
-.union(sdf_other.withColumn(tmp_tag_col, F.lit(1)))
-.groupBy(*self._internal.index_spark_column_names)
-.agg(F.min(tmp_tag_col).alias(tmp_min_col), 
F.max(tmp_tag_col).alias(tmp_max_col))
-.where(F.col(tmp_min_col) == F.col(tmp_max_col))
-.select(*self._internal.index_spark_column_names)
-)
+sdf_symdiff = xor(sdf_self, sdf_other)
 
 if sort:
 sdf_symdiff = 
sdf_symdiff.sort(*self._internal.index_spark_column_names)
diff --git a/python/pyspark/pandas/utils.py b/python/pyspark/pandas/utils.py
index 9f372a53079..57c1ddbe6ae 100644
--- a/python/pyspark/pandas/utils.py
+++ b/python/pyspark/pandas/utils.py
@@ -1033,6 +1033,23 @@ def validate_index_loc(index: "Index", loc: int) -> None:
 )
 
 
+def xor(df1: PySparkDataFrame, df2: PySparkDataFrame) -> PySparkDataFrame:
+colNames = df1.columns
+
+tmp_tag_col = verify_temp_column_name(df1, "__temporary_tag__")
+tmp_max_col = verify_temp_column_name(df1, "__temporary_max_tag__")
+tmp_min_col = verify_temp_column_name(df1, "__temporary_min_tag__")
+
+return (
+df1.withColumn(tmp_tag_col, F.lit(0))
+.union(df2.withColumn(tmp_tag_col, F.lit(1)))
+.groupBy(*colNames)
+.agg(F.min(tmp_tag_col).alias(tmp_min_col), 
F.max(tmp_tag_col).alias(tmp_max_col))
+.where(F.col(tmp_min_col) == F.col(tmp_max_col))
+.select(*colNames)
+)
+
+
 def _test() -> None:
 import os
 import doctest


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-45934][DOCS] Fix `Spark Standalone` documentation table layout

2023-11-15 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new a9f95e8203b [SPARK-45934][DOCS] Fix `Spark Standalone` documentation 
table layout
a9f95e8203b is described below

commit a9f95e8203bede86462e681bb7a3e6123b8c00a2
Author: Dongjoon Hyun 
AuthorDate: Wed Nov 15 14:12:36 2023 -0800

[SPARK-45934][DOCS] Fix `Spark Standalone` documentation table layout

This PR fixes `Spark Standalone` documentation table layout.

**BEFORE**
- https://spark.apache.org/docs/3.5.0/spark-standalone.html

**AFTER**
- Spark Standalone
https://github.com/apache/spark/assets/9700541/281ca898-f252-47c2-8cf3-0504bcdcbfb3";>

No.

Manual review.

No.

Closes #43814 from dongjoon-hyun/SPARK-45934.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit e8c2a590f99d8c87968c79960e6b69191f28b420)
Signed-off-by: Dongjoon Hyun 
---
 docs/spark-standalone.md | 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 3e87edad0aa..ebda8d897ea 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -254,7 +254,7 @@ SPARK_MASTER_OPTS supports the following system properties:
   0.6.2
 
 
-  spark.worker.resource.{resourceName}.amount
+  spark.worker.resource.{name}.amount
   (none)
   
 Amount of a particular resource to use on the worker.
@@ -262,7 +262,7 @@ SPARK_MASTER_OPTS supports the following system properties:
   3.0.0
 
 
-  spark.worker.resource.{resourceName}.discoveryScript
+  spark.worker.resource.{name}.discoveryScript
   (none)
   
 Path to resource discovery script, which is used to find a particular 
resource while worker starting up.
@@ -275,8 +275,10 @@ SPARK_MASTER_OPTS supports the following system properties:
   (none)
   
 Path to resources file which is used to find various resources while 
worker starting up.
-The content of resources file should be formatted like 
-[{"id":{"componentName": 
"spark.worker","resourceName":"gpu"},"addresses":["0","1","2"]}].
+The content of resources file should be formatted like
+[{"id":{"componentName":
+"spark.worker", "resourceName":"gpu"},
+"addresses":["0","1","2"]}].
 If a particular resource is not found in the resources file, the discovery 
script would be used to
 find that resource. If the discovery script also does not find the 
resources, the worker will fail
 to start up.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-45934][DOCS] Fix `Spark Standalone` documentation table layout

2023-11-15 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e8c2a590f99 [SPARK-45934][DOCS] Fix `Spark Standalone` documentation 
table layout
e8c2a590f99 is described below

commit e8c2a590f99d8c87968c79960e6b69191f28b420
Author: Dongjoon Hyun 
AuthorDate: Wed Nov 15 14:12:36 2023 -0800

[SPARK-45934][DOCS] Fix `Spark Standalone` documentation table layout

### What changes were proposed in this pull request?

This PR fixes `Spark Standalone` documentation table layout.

### Why are the changes needed?

**BEFORE**
- https://spark.apache.org/docs/3.5.0/spark-standalone.html

**AFTER**
- Spark Standalone
https://github.com/apache/spark/assets/9700541/281ca898-f252-47c2-8cf3-0504bcdcbfb3";>

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43814 from dongjoon-hyun/SPARK-45934.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 docs/spark-standalone.md | 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index bc13693e280..c96839c6e95 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -299,7 +299,7 @@ SPARK_MASTER_OPTS supports the following system properties:
   0.8.0
 
 
-  spark.worker.resource.{resourceName}.amount
+  spark.worker.resource.{name}.amount
   (none)
   
 Amount of a particular resource to use on the worker.
@@ -307,7 +307,7 @@ SPARK_MASTER_OPTS supports the following system properties:
   3.0.0
 
 
-  spark.worker.resource.{resourceName}.discoveryScript
+  spark.worker.resource.{name}.discoveryScript
   (none)
   
 Path to resource discovery script, which is used to find a particular 
resource while worker starting up.
@@ -320,8 +320,10 @@ SPARK_MASTER_OPTS supports the following system properties:
   (none)
   
 Path to resources file which is used to find various resources while 
worker starting up.
-The content of resources file should be formatted like 
-[{"id":{"componentName": 
"spark.worker","resourceName":"gpu"},"addresses":["0","1","2"]}].
+The content of resources file should be formatted like
+[{"id":{"componentName":
+"spark.worker", "resourceName":"gpu"},
+"addresses":["0","1","2"]}].
 If a particular resource is not found in the resources file, the discovery 
script would be used to
 find that resource. If the discovery script also does not find the 
resources, the worker will fail
 to start up.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-45719][K8S][TESTS] Upgrade AWS SDK to v2 for Kubernetes IT

2023-11-15 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f6a59edc79a [SPARK-45719][K8S][TESTS] Upgrade AWS SDK to v2 for 
Kubernetes IT
f6a59edc79a is described below

commit f6a59edc79ab16556093b1f86cbf890717687e1a
Author: Junyu Chen 
AuthorDate: Wed Nov 15 14:09:10 2023 -0800

[SPARK-45719][K8S][TESTS] Upgrade AWS SDK to v2 for Kubernetes IT

### What changes were proposed in this pull request?
As Spark is moving to 4.0, one of the major improvement is to upgrade AWS 
SDK to v2,
as tracked in this parent Jira: 
https://issues.apache.org/jira/browse/SPARK-44124.

Currently, some tests in this module (i.e. DepsTestsSuite) uses S3 client 
which requires
AWS credentials during initialization.

As part of the SDK upgrade, the main purpose of this PR is to upgrading AWS 
SDK to v2
for the Kubernetes integration tests module.

### Why are the changes needed?

As the GA of AWS SDK v2, the SDKv1 has entered maintenance mode where its 
future
release are only limited to address critical bug and security issues. More 
details
about the SDK maintenance policy can be found here: 
https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html.
To keep Spark’s dependent softwares up to date, we should consider 
upgrading the SDK to v2.

### Does this PR introduce _any_ user-facing change?

No because this change only impacts the integration tests codes.

### How was this patch tested?

The existing integration tests in the k8s integration test module passed

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43510 from junyuc25/junyuc25/k8s-test.

Authored-by: Junyu Chen 
Signed-off-by: Dongjoon Hyun 
---
 .github/workflows/build_and_test.yml   |  2 +-
 pom.xml|  1 +
 .../kubernetes/integration-tests/pom.xml   |  6 ++--
 .../k8s/integrationtest/DepsTestsSuite.scala   | 35 +++---
 4 files changed, 30 insertions(+), 14 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 7c23980d281..25af93af280 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -1065,7 +1065,7 @@ jobs:
   kubectl create clusterrolebinding serviceaccounts-cluster-admin 
--clusterrole=cluster-admin --group=system:serviceaccounts || true
   kubectl apply -f 
https://raw.githubusercontent.com/volcano-sh/volcano/v1.8.1/installer/volcano-development.yaml
 || true
   eval $(minikube docker-env)
-  build/sbt -Psparkr -Pkubernetes -Pvolcano 
-Pkubernetes-integration-tests -Dspark.kubernetes.test.driverRequestCores=0.5 
-Dspark.kubernetes.test.executorRequestCores=0.2 
-Dspark.kubernetes.test.volcanoMaxConcurrencyJobNum=1 -Dtest.exclude.tags=local 
"kubernetes-integration-tests/test"
+  build/sbt -Phadoop-3 -Psparkr -Pkubernetes -Pvolcano 
-Pkubernetes-integration-tests -Dspark.kubernetes.test.driverRequestCores=0.5 
-Dspark.kubernetes.test.executorRequestCores=0.2 
-Dspark.kubernetes.test.volcanoMaxConcurrencyJobNum=1 -Dtest.exclude.tags=local 
"kubernetes-integration-tests/test"
   - name: Upload Spark on K8S integration tests log files
 if: ${{ !success() }}
 uses: actions/upload-artifact@v3
diff --git a/pom.xml b/pom.xml
index 14754c0bcaa..f4aeb5d935b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -162,6 +162,7 @@
 1.12.0
 
 1.11.655
+2.20.160
 
 0.12.8
 hadoop3-2.2.17
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml 
b/resource-managers/kubernetes/integration-tests/pom.xml
index c5f55c52d0b..518c5bc2170 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -210,9 +210,9 @@
   
   
 
-  com.amazonaws
-  aws-java-sdk-bundle
-  1.11.375
+  software.amazon.awssdk
+  bundle
+  ${aws.java.sdk.v2.version}
   test
 
   
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
index e4650479b2c..f4e23cf839c 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala
@@ -17,22 +17,25 @@
 package org.apache.spark.deploy.k8s.integrationtest
 
 im

(spark) branch master updated: [SPARK-45810][PYTHON] Create Python UDTF API to stop consuming rows from the input table

2023-11-15 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 36983443112 [SPARK-45810][PYTHON] Create Python UDTF API to stop 
consuming rows from the input table
36983443112 is described below

commit 36983443112799dc2ee4462828e7c0552a63a229
Author: Daniel Tenedorio 
AuthorDate: Wed Nov 15 13:47:04 2023 -0800

[SPARK-45810][PYTHON] Create Python UDTF API to stop consuming rows from 
the input table

### What changes were proposed in this pull request?

This PR creates a Python UDTF API to stop consuming rows from the input 
table.

If the UDTF raises a `SkipRestOfInputTableException` exception in the 
`eval` method, then the UDTF stops consuming rows from the input table for that 
input partition, and finally calls the `terminate` method (if any) to represent 
a successful UDTF call.

For example:

```
udtf(returnType="total: int")
class TestUDTF:
def __init__(self):
self._total = 0

def eval(self, _: Row):
self._total += 1
if self._total >= 3:
raise SkipRestOfInputTableException("Stop at self._total >= 3")

def terminate(self):
yield self._total,
```

### Why are the changes needed?

This is useful when the UDTF logic knows that we don't have to scan the 
input table anymore, and skip the rest of the I/O for that case.

### Does this PR introduce _any_ user-facing change?

Yes, see above.

### How was this patch tested?

This PR adds test coverage.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43682 from dtenedor/udtf-api-stop-consuming-input-rows.

Authored-by: Daniel Tenedorio 
Signed-off-by: Takuya UESHIN 
---
 python/docs/source/user_guide/sql/python_udtf.rst | 38 -
 python/pyspark/sql/functions.py   |  1 +
 python/pyspark/sql/tests/test_udtf.py | 51 +++
 python/pyspark/sql/udtf.py| 19 -
 python/pyspark/worker.py  | 30 ++---
 5 files changed, 123 insertions(+), 16 deletions(-)

diff --git a/python/docs/source/user_guide/sql/python_udtf.rst 
b/python/docs/source/user_guide/sql/python_udtf.rst
index 0e0c6e28578..3e3c7634438 100644
--- a/python/docs/source/user_guide/sql/python_udtf.rst
+++ b/python/docs/source/user_guide/sql/python_udtf.rst
@@ -65,8 +65,8 @@ To implement a Python UDTF, you first need to define a class 
implementing the me
 
 def analyze(self, *args: Any) -> AnalyzeResult:
 """
-Computes the output schema of a particular call to this function 
in response to the
-arguments provided.
+Static method to compute the output schema of a particular call to 
this function in
+response to the arguments provided.
 
 This method is optional and only needed if the registration of the 
UDTF did not provide
 a static output schema to be use for all calls to the function. In 
this context,
@@ -101,12 +101,20 @@ To implement a Python UDTF, you first need to define a 
class implementing the me
 partitionBy: Sequence[PartitioningColumn] = 
field(default_factory=tuple)
 orderBy: Sequence[OrderingColumn] = 
field(default_factory=tuple)
 
+Notes
+-
+- It is possible for the `analyze` method to accept the exact 
arguments expected,
+  mapping 1:1 with the arguments provided to the UDTF call.
+- The `analyze` method can instead choose to accept positional 
arguments if desired
+  (using `*args`) or keyword arguments (using `**kwargs`).
+
 Examples
 
-analyze implementation that returns one output column for each 
word in the input string
-argument.
+This is an `analyze` implementation that returns one output column 
for each word in the
+input string argument.
 
->>> def analyze(self, text: str) -> AnalyzeResult:
+>>> @staticmethod
+... def analyze(text: str) -> AnalyzeResult:
 ... schema = StructType()
 ... for index, word in enumerate(text.split(" ")):
 ... schema = schema.add(f"word_{index}")
@@ -114,7 +122,8 @@ To implement a Python UDTF, you first need to define a 
class implementing the me
 
 Same as above, but using *args to accept the arguments.
 
->>> def analyze(self, *args) -> AnalyzeResult:
+>>> @staticmethod
+... def analyze(*args) -> AnalyzeResult:
 ... assert len(args) == 1, "This function

(spark) branch master updated: [SPARK-45868][CONNECT] Make sure `spark.table` use the same parser with vanilla spark

2023-11-15 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d378b35f54b [SPARK-45868][CONNECT] Make sure `spark.table` use the 
same parser with vanilla spark
d378b35f54b is described below

commit d378b35f54b853d91e13e5def8a5bf2c7c06ff32
Author: Ruifeng Zheng 
AuthorDate: Wed Nov 15 13:43:55 2023 -0800

[SPARK-45868][CONNECT] Make sure `spark.table` use the same parser with 
vanilla spark

### What changes were proposed in this pull request?
Make sure spark.table use the same parser with vanilla spark

### Why are the changes needed?
to be consistent with the vanilla spark:


https://github.com/apache/spark/blob/9d93b7112a31965447a34301889f90d14578e628/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L714-L720

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #43741 from zhengruifeng/connect_read_table_parser.

Authored-by: Ruifeng Zheng 
Signed-off-by: Dongjoon Hyun 
---
 .../sql/connect/planner/SparkConnectPlanner.scala  |  5 +-
 .../SparkConnectWithSessionExtensionSuite.scala| 82 ++
 2 files changed, 85 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 637ed09798a..d4e5e34c61a 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -50,7 +50,7 @@ import 
org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, ExpressionEncode
 import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.UnboundRowEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate
-import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, 
ParseException, ParserUtils}
+import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
 import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, JoinType, 
LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.{AppendColumns, CoGroup, 
CollectMetrics, CommandResult, Deduplicate, DeduplicateWithinWatermark, 
DeserializeToObject, Except, FlatMapGroupsWithState, Intersect, JoinWith, 
LocalRelation, LogicalGroupState, LogicalPlan, MapGroups, MapPartitions, 
Project, Sample, SerializeFromObject, Sort, SubqueryAlias, TypedFilter, Union, 
Unpivot, UnresolvedHint}
@@ -1227,7 +1227,8 @@ class SparkConnectPlanner(
 rel.getReadTypeCase match {
   case proto.Read.ReadTypeCase.NAMED_TABLE =>
 val multipartIdentifier =
-  
CatalystSqlParser.parseMultipartIdentifier(rel.getNamedTable.getUnparsedIdentifier)
+  session.sessionState.sqlParser
+.parseMultipartIdentifier(rel.getNamedTable.getUnparsedIdentifier)
 UnresolvedRelation(
   multipartIdentifier,
   new CaseInsensitiveStringMap(rel.getNamedTable.getOptionsMap),
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectWithSessionExtensionSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectWithSessionExtensionSuite.scala
new file mode 100644
index 000..37c7fe25097
--- /dev/null
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectWithSessionExtensionSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.planner
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.connect.pr

(spark) branch master updated: [SPARK-45941][PS] Upgrade `pandas` to version 2.1.3

2023-11-15 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new aa646d30500 [SPARK-45941][PS] Upgrade `pandas` to version 2.1.3
aa646d30500 is described below

commit aa646d3050028272f7333deaef52f20e6975e0ed
Author: Bjørn Jørgensen 
AuthorDate: Wed Nov 15 13:20:27 2023 -0800

[SPARK-45941][PS] Upgrade `pandas` to version 2.1.3

### What changes were proposed in this pull request?
Upgrade pandas from 2.1.2 to 2.1.3

### Why are the changes needed?
Fixed infinite recursion from operations that return a new object on some 
DataFrame subclasses ([GH 
55763](https://github.com/pandas-dev/pandas/issues/55763))
and Fix 
[read_parquet()](https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html#pandas.read_parquet)
 and 
[read_feather()](https://pandas.pydata.org/docs/reference/api/pandas.read_feather.html#pandas.read_feather)
 for [CVE-2023-47248](https://www.cve.org/CVERecord?id=CVE-2023-47248) ([GH 
55894](https://github.com/pandas-dev/pandas/issues/55894))

[Release notes for 
2.1.3](https://pandas.pydata.org/docs/whatsnew/v2.1.3.html)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43822 from bjornjorgensen/pandas-2_1_3.

Authored-by: Bjørn Jørgensen 
Signed-off-by: Dongjoon Hyun 
---
 dev/infra/Dockerfile   | 4 ++--
 python/pyspark/pandas/supported_api_gen.py | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/dev/infra/Dockerfile b/dev/infra/Dockerfile
index e6a58cc3fc7..b433faa14c8 100644
--- a/dev/infra/Dockerfile
+++ b/dev/infra/Dockerfile
@@ -84,8 +84,8 @@ RUN Rscript -e "devtools::install_version('roxygen2', 
version='7.2.0', repos='ht
 # See more in SPARK-39735
 ENV R_LIBS_SITE 
"/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library"
 
-RUN pypy3 -m pip install numpy 'pandas<=2.1.2' scipy coverage matplotlib
-RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.1.2' scipy 
unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 
'memory-profiler==0.60.0' 'scikit-learn==1.1.*'
+RUN pypy3 -m pip install numpy 'pandas<=2.1.3' scipy coverage matplotlib
+RUN python3.9 -m pip install numpy pyarrow 'pandas<=2.1.3' scipy 
unittest-xml-reporting plotly>=4.8 'mlflow>=2.3.1' coverage matplotlib openpyxl 
'memory-profiler==0.60.0' 'scikit-learn==1.1.*'
 
 # Add Python deps for Spark Connect.
 RUN python3.9 -m pip install 'grpcio>=1.48,<1.57' 'grpcio-status>=1.48,<1.57' 
'protobuf==3.20.3' 'googleapis-common-protos==1.56.4'
diff --git a/python/pyspark/pandas/supported_api_gen.py 
b/python/pyspark/pandas/supported_api_gen.py
index 8d49fef2799..a83731db8fc 100644
--- a/python/pyspark/pandas/supported_api_gen.py
+++ b/python/pyspark/pandas/supported_api_gen.py
@@ -98,7 +98,7 @@ def generate_supported_api(output_rst_file_path: str) -> None:
 
 Write supported APIs documentation.
 """
-pandas_latest_version = "2.1.2"
+pandas_latest_version = "2.1.3"
 if LooseVersion(pd.__version__) != LooseVersion(pandas_latest_version):
 msg = (
 "Warning: Latest version of pandas (%s) is required to generate 
the documentation; "


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.4 updated: [SPARK-45592][SPARK-45282][SQL] Correctness issue in AQE with InMemoryTableScanExec

2023-11-15 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new b53c170679f [SPARK-45592][SPARK-45282][SQL] Correctness issue in AQE 
with InMemoryTableScanExec
b53c170679f is described below

commit b53c170679f5a9a0214095cd345da6f33e09a239
Author: Maryann Xue 
AuthorDate: Tue Nov 14 08:51:26 2023 -0800

[SPARK-45592][SPARK-45282][SQL] Correctness issue in AQE with 
InMemoryTableScanExec

This PR fixes an correctness issue while enabling AQE for SQL Cache. This 
issue was caused by AQE coalescing the top-level shuffle in the physical plan 
of InMemoryTableScan and wrongfully reported the output partitioning of that 
InMemoryTableScan as HashPartitioning as if it had not been coalesced. The 
caller query of that InMemoryTableScan in turn failed to align the partitions 
correctly and output incorrect join results.

The fix addresses the issue by disabling coalescing in InMemoryTableScan 
for shuffles in the final stage. This fix also guarantees that AQE enabled for 
SQL cache vs. disabled would always be a performance win, since AQE 
optimizations are applied to all non-top-level stages and meanwhile no extra 
shuffle would be introduced between the parent query and the cached relation 
(if coalescing in top-level shuffles of InMemoryTableScan was not disabled, an 
extra shuffle would end up being add [...]

To fix correctness issue and to avoid potential AQE perf regressions in 
queries using SQL cache.

No.

Added UTs.

No.

Closes #43760 from maryannxue/spark-45592.

Authored-by: Maryann Xue 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 128f5523194d5241c7b0f08b5be183288128ba16)
Signed-off-by: Dongjoon Hyun 
---
 .../org/apache/spark/sql/internal/SQLConf.scala|  9 
 .../apache/spark/sql/execution/CacheManager.scala  |  3 +-
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  8 +++-
 .../org/apache/spark/sql/CachedTableSuite.scala| 52 +++---
 4 files changed, 55 insertions(+), 17 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6578fa16439..951a54a15cb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -646,6 +646,15 @@ object SQLConf {
 .booleanConf
 .createWithDefault(false)
 
+  val ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS =
+buildConf("spark.sql.adaptive.applyFinalStageShuffleOptimizations")
+  .internal()
+  .doc("Configures whether adaptive query execution (if enabled) should 
apply shuffle " +
+"coalescing and local shuffle read optimization for the final query 
stage.")
+  .version("3.4.2")
+  .booleanConf
+  .createWithDefault(true)
+
   val ADAPTIVE_EXECUTION_LOG_LEVEL = buildConf("spark.sql.adaptive.logLevel")
 .internal()
 .doc("Configures the log level for adaptive execution logging of plan 
changes. The value " +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index b1153d7a1e8..4a2fe74b853 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -379,7 +379,8 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
*/
   private def getOrCloneSessionWithConfigsOff(session: SparkSession): 
SparkSession = {
 if (session.conf.get(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING)) {
-  session
+  SparkSession.getOrCloneSessionWithConfigsOff(session,
+SQLConf.ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMIZATIONS :: 
Nil)
 } else {
   SparkSession.getOrCloneSessionWithConfigsOff(session, 
forceDisableConfigs)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 395e5468b64..f859e17d937 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -153,7 +153,13 @@ case class AdaptiveSparkPlanExec(
   )
 
   private def optimizeQueryStage(plan: SparkPlan, isFinalStage: Boolean): 
SparkPlan = {
-val optimized = queryStageOptimizerRules.foldLeft(plan) { case 
(latestPlan, rule) =>
+val rules = if (isFinalStage &&
+
!conf.getConf(SQLConf.ADAPTIVE_EXECUTION_APPLY_FINAL_STAGE_SHUFFLE_OPTIMI

(spark) branch branch-3.5 updated: Revert "[SPARK-43393][SQL] Address sequence expression overflow bug"

2023-11-15 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new e38310c74e6 Revert "[SPARK-43393][SQL] Address sequence expression 
overflow bug"
e38310c74e6 is described below

commit e38310c74e6cae8c8c8489ffcbceb80ed37a7cae
Author: Dongjoon Hyun 
AuthorDate: Wed Nov 15 09:12:42 2023 -0800

Revert "[SPARK-43393][SQL] Address sequence expression overflow bug"

This reverts commit 41a7a4a3233772003aef380428acd9eaf39b9a93.
---
 .../expressions/collectionOperations.scala | 48 ++-
 .../expressions/CollectionExpressionsSuite.scala   | 56 ++
 2 files changed, 20 insertions(+), 84 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index c3c235fba67..ade4a6c5be7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -22,8 +22,6 @@ import java.util.Comparator
 import scala.collection.mutable
 import scala.reflect.ClassTag
 
-import org.apache.spark.QueryContext
-import org.apache.spark.SparkException.internalError
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion, 
UnresolvedAttribute, UnresolvedSeed}
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
@@ -42,6 +40,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SQLOpenHashSet
 import org.apache.spark.unsafe.UTF8StringBuilder
 import org.apache.spark.unsafe.array.ByteArrayMethods
+import org.apache.spark.unsafe.array.ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
 import org.apache.spark.unsafe.types.{ByteArray, CalendarInterval, UTF8String}
 
 /**
@@ -3081,34 +3080,6 @@ case class Sequence(
 }
 
 object Sequence {
-  private def prettyName: String = "sequence"
-
-  def sequenceLength(start: Long, stop: Long, step: Long): Int = {
-try {
-  val delta = Math.subtractExact(stop, start)
-  if (delta == Long.MinValue && step == -1L) {
-// We must special-case division of Long.MinValue by -1 to catch 
potential unchecked
-// overflow in next operation. Division does not have a builtin 
overflow check. We
-// previously special-case div-by-zero.
-throw new ArithmeticException("Long overflow (Long.MinValue / -1)")
-  }
-  val len = if (stop == start) 1L else Math.addExact(1L, (delta / step))
-  if (len > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
-throw 
QueryExecutionErrors.createArrayWithElementsExceedLimitError(prettyName, len)
-  }
-  len.toInt
-} catch {
-  // We handle overflows in the previous try block by raising an 
appropriate exception.
-  case _: ArithmeticException =>
-val safeLen =
-  BigInt(1) + (BigInt(stop) - BigInt(start)) / BigInt(step)
-if (safeLen > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
-  throw 
QueryExecutionErrors.createArrayWithElementsExceedLimitError(prettyName, 
safeLen)
-}
-throw internalError("Unreachable code reached.")
-  case e: Exception => throw e
-}
-  }
 
   private type LessThanOrEqualFn = (Any, Any) => Boolean
 
@@ -3480,7 +3451,13 @@ object Sequence {
 || (estimatedStep == num.zero && start == stop),
   s"Illegal sequence boundaries: $start to $stop by $step")
 
-sequenceLength(start.toLong, stop.toLong, estimatedStep.toLong)
+val len = if (start == stop) 1L else 1L + (stop.toLong - start.toLong) / 
estimatedStep.toLong
+
+require(
+  len <= MAX_ROUNDED_ARRAY_LENGTH,
+  s"Too long sequence: $len. Should be <= $MAX_ROUNDED_ARRAY_LENGTH")
+
+len.toInt
   }
 
   private def genSequenceLengthCode(
@@ -3490,7 +3467,7 @@ object Sequence {
   step: String,
   estimatedStep: String,
   len: String): String = {
-val calcFn = classOf[Sequence].getName + ".sequenceLength"
+val longLen = ctx.freshName("longLen")
 s"""
|if (!(($estimatedStep > 0 && $start <= $stop) ||
|  ($estimatedStep < 0 && $start >= $stop) ||
@@ -3498,7 +3475,12 @@ object Sequence {
|  throw new IllegalArgumentException(
|"Illegal sequence boundaries: " + $start + " to " + $stop + " by " 
+ $step);
|}
-   |int $len = $calcFn((long) $start, (long) $stop, (long) $estimatedStep);
+   |long $longLen = $stop == $start ? 1L : 1L + ((long) $stop - $start) / 
$estimatedStep;
+   |if ($longLen > $MAX_ROUNDED_ARRAY_LENGTH) {
+   |  throw new IllegalArgumentException(
+   |"Too long sequence: " + $longLen + ". 

(spark) branch master updated: [SPARK-45562][DOCS] Regenerate `docs/sql-error-conditions.md` and add `42KDF` to `SQLSTATE table` in `error/README.md`

2023-11-15 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 39fc6108bfa [SPARK-45562][DOCS] Regenerate 
`docs/sql-error-conditions.md` and add `42KDF` to `SQLSTATE table` in 
`error/README.md`
39fc6108bfa is described below

commit 39fc6108bfaaa0ce471f6460880109f948ba5c62
Author: yangjie01 
AuthorDate: Wed Nov 15 09:04:25 2023 -0800

[SPARK-45562][DOCS] Regenerate `docs/sql-error-conditions.md` and add 
`42KDF` to `SQLSTATE table` in `error/README.md`

### What changes were proposed in this pull request?
This pr re-generate `docs/sql-error-conditions.md` and add `42KDF` to 
`SQLSTATE table` in `error/README.md` refer to 
https://docs.gcp.databricks.com/error-messages/sqlstates.html

https://github.com/apache/spark/assets/1475305/6e302ea5-fa61-40a3-99ef-7c6d63334194";>

### Why are the changes needed?
Make GA pass

https://github.com/apache/spark/actions/runs/6874376893/job/18695941494

```
[info] - SQLSTATE invariants *** FAILED *** (30 milliseconds)
[info]   fx.apply(s) was false 42KDF (SparkThrowableSuite.scala:74)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info]   at 
org.apache.spark.SparkThrowableSuite.$anonfun$checkCondition$1(SparkThrowableSuite.scala:74)
[info]   at scala.collection.immutable.List.foreach(List.scala:333)
[info]   at 
org.apache.spark.SparkThrowableSuite.checkCondition(SparkThrowableSuite.scala:73)
[info]   at 
org.apache.spark.SparkThrowableSuite.$anonfun$new$6(SparkThrowableSuite.scala:138)
[info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
[info]   at 
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)

[info] - Error classes match with document *** FAILED *** (71 milliseconds)
[info]   "...gs-error-class.html)[
[info]
[info]   ### XML_ROW_TAG_MISSING
[info]
[info]   [SQLSTATE: 
42KDF](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
[info]
[info]   `` option is required for reading files in XML format.]" 
did not equal "...gs-error-class.html)[]" The error class document is not up to 
date. Please regenerate it. (SparkThrowableSuite.scala:346)
[info]   Analysis:
[info]   "...gs-error-class.html)[
[info]
[info] ### XML_ROW_TAG_MISSING
[info]
[info] [SQLSTATE: 
42KDF](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
[info]
[info] `` option is required for reading files in XML format.]" -> 
"...gs-error-class.html)[]"
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass GitHub Actions
- Manual check `build/sbt   "core/testOnly *SparkThrowableSuite"`

**Before**

```
[info] Run completed in 1 second, 205 milliseconds.
[info] Total number of tests run: 19
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 17, failed 2, canceled 0, ignored 0, pending 0
[info] *** 2 TESTS FAILED ***
[error] Failed tests:
[error] org.apache.spark.SparkThrowableSuite
```
**After**

```
[info] Run completed in 1 second, 131 milliseconds.
[info] Total number of tests run: 19
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 19, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43817 from LuciferYang/SPARK-45562-FOLLOWUP.

Authored-by: yangjie01 
Signed-off-by: Dongjoon Hyun 
---
 common/utils/src/main/resources/error/README.md | 1 +
 docs/sql-error-conditions.md| 6 ++
 2 files changed, 7 insertions(+)

diff --git a/common/utils/src/main/resources/error/README.md 
b/common/utils/src/main/resources/error/README.md
index 8d8529bea56..7b1b9038aeb 100644
--- a/common/utils/src/main/resources/error/README.md
+++ b/common/utils/src/main/resources/error/README.md
@@ -895,6 +895,7 @@ The following SQLSTATEs are collated from:
 |42KDC|42   |Syntax error or Access Rule violation |KDC 
|Archived file reference.|Databricks |N 
  |Databricks   
   |
 |42KDD|42   |Syntax error or Access Rule violati

(spark) branch master updated: [SPARK-45905][SQL] Least common type between decimal types should retain integral digits first

2023-11-15 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7120e6b88f23 [SPARK-45905][SQL] Least common type between decimal 
types should retain integral digits first
7120e6b88f23 is described below

commit 7120e6b88f2327ffb71c4bca14b10b15aeb26c32
Author: Wenchen Fan 
AuthorDate: Wed Nov 15 20:39:38 2023 +0800

[SPARK-45905][SQL] Least common type between decimal types should retain 
integral digits first

### What changes were proposed in this pull request?

This is kind of a followup of https://github.com/apache/spark/pull/20023 .

It's simply wrong to cut the decimal precision to 38 if a wider decimal 
type exceeds the max precision, which drops the integral digits and makes the 
decimal value very likely to overflow.

In https://github.com/apache/spark/pull/20023 , we fixed this issue for 
arithmetic operations, but many other operations suffer from the same issue: 
Union, binary comparison, in subquery, create_array, coalesce, etc.

This PR fixes all the remaining operators, without the min scale 
limitation, which should be applied to division and multiple only according to 
the SQL server doc: 
https://learn.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql?view=sql-server-ver15

### Why are the changes needed?

To produce reasonable wider decimal type.

### Does this PR introduce _any_ user-facing change?

Yes, the final data type of these operators will be changed if it's decimal 
type and its precision exceeds the max and the scale is not 0.

### How was this patch tested?

updated tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43781 from cloud-fan/decimal.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 docs/sql-ref-ansi-compliance.md|  19 ++
 .../org/apache/spark/sql/types/DecimalType.scala   |  12 ++
 .../sql/catalyst/analysis/DecimalPrecision.scala   |   7 +-
 .../org/apache/spark/sql/internal/SQLConf.scala|  12 +-
 .../catalyst/analysis/AnsiTypeCoercionSuite.scala  |  10 +-
 .../sql/catalyst/analysis/TypeCoercionSuite.scala  |  18 +-
 .../typeCoercion/native/mapZipWith.sql.out |  50 ++---
 .../results/typeCoercion/native/mapZipWith.sql.out |  42 +
 .../approved-plans-v2_7/q36a.sf100/explain.txt |   6 +-
 .../approved-plans-v2_7/q36a/explain.txt   |   6 +-
 .../tpcds-query-results/v2_7/q36a.sql.out  | 202 ++---
 .../apache/spark/sql/DataFrameFunctionsSuite.scala |  21 +--
 12 files changed, 191 insertions(+), 214 deletions(-)

diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index 4729db16d63f..90e65e5ce36e 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -240,6 +240,25 @@ The least common type resolution is used to:
 - Derive the result type for expressions such as the case expression.
 - Derive the element, key, or value types for array and map constructors.
 Special rules are applied if the least common type resolves to FLOAT. With 
float type values, if any of the types is INT, BIGINT, or DECIMAL the least 
common type is pushed to DOUBLE to avoid potential loss of digits.
+
+Decimal type is a bit more complicated here, as it's not a simple type but has 
parameters: precision and scale.
+A `decimal(precision, scale)` means the value can have at most `precision - 
scale` digits in the integral part and `scale` digits in the fractional part.
+A least common type between decimal types should have enough digits in both 
integral and fractional parts to represent all values.
+More precisely, a least common type between `decimal(p1, s1)` and `decimal(p2, 
s2)` has the scale of `max(s1, s2)` and precision of `max(s1, s2) + max(p1 - 
s1, p2 - s2)`.
+However, decimal types in Spark have a maximum precision: 38. If the final 
decimal type need more precision, we must do truncation.
+Since the digits in the integral part are more significant, Spark truncates 
the digits in the fractional part first. For example, `decimal(48, 20)` will be 
reduced to `decimal(38, 10)`.
+
+Note, arithmetic operations have special rules to calculate the least common 
type for decimal inputs:
+
+| Operation  | Result precision | Result scale|
+||--|-|
+| e1 + e2| max(s1, s2) + max(p1 - s1, p2 - s2) + 1  | max(s1, s2) |
+| e1 - e2| max(s1, s2) + max(p1 - s1, p2 - s2) + 1 | max(s1, s2) |
+| e1 * e2| p1 + p2 + 1 | s1 + s2 |
+| e1 / e2| p1 - s1 + s2 + max(6, s1 + p2 + 1)  

(spark) branch master updated: [SPARK-45915][SQL] Treat decimal(x, 0) the same as IntegralType in `PromoteStrings`

2023-11-15 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6c8bd94c5ed5 [SPARK-45915][SQL] Treat decimal(x, 0) the same as 
IntegralType in `PromoteStrings`
6c8bd94c5ed5 is described below

commit 6c8bd94c5ed5aa806fc6cbf66ac83b1641f78694
Author: Yuming Wang 
AuthorDate: Wed Nov 15 02:43:37 2023 -0800

[SPARK-45915][SQL] Treat decimal(x, 0) the same as IntegralType in 
`PromoteStrings`

### What changes were proposed in this pull request?

The common type of decimal(x, 0) and string is double. But the common type 
of int/bigint and string are int/bigint.

This PR updates `PromoteStrings` make the common type of decimal(x, 0) and 
string is decimal(x, 0).

### Why are the changes needed?

1. Make decimal(x, 0) behave the same as int/bigint in `PromoteStrings`.
2. Reduce one cast in binary comparison so we may use bucket read. For 
example: `cast(stringCol as double) = cast(decimalCol as double)` vs 
`cast(stringCol as decimal(x, 0)) = decimalCol`.

### Does this PR introduce _any_ user-facing change?

Yes. The result type of decimal(x, 0) and string is decimal(x, 0) in binary 
comparison.

### How was this patch tested?

Unit test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43812 from wangyum/SPARK-45915.

Authored-by: Yuming Wang 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/catalyst/analysis/TypeCoercion.scala |  4 +-
 .../sql/catalyst/analysis/TypeCoercionSuite.scala  |  3 ++
 .../typeCoercion/native/binaryComparison.sql.out   | 48 +--
 .../typeCoercion/native/decimalPrecision.sql.out   | 56 +++---
 .../typeCoercion/native/promoteStrings.sql.out | 28 +--
 5 files changed, 71 insertions(+), 68 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index b34fd8736217..c5e98683c749 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -934,8 +934,8 @@ object TypeCoercion extends TypeCoercionBase {
 // There is no proper decimal type we can pick,
 // using double type is the best we can do.
 // See SPARK-22469 for details.
-case (n: DecimalType, s: StringType) => Some(DoubleType)
-case (s: StringType, n: DecimalType) => Some(DoubleType)
+case (DecimalType.Fixed(_, s), _: StringType) if s > 0 => Some(DoubleType)
+case (_: StringType, DecimalType.Fixed(_, s)) if s > 0 => Some(DoubleType)
 
 case (l: StringType, r: AtomicType) if canPromoteAsInBinaryComparison(r) 
=> Some(r)
 case (l: AtomicType, r: StringType) if canPromoteAsInBinaryComparison(l) 
=> Some(l)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
index 765920a1d00e..db86e7131446 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala
@@ -1611,6 +1611,9 @@ class TypeCoercionSuite extends TypeCoercionSuiteBase {
   GreaterThan(Literal("1.5"), Literal(BigDecimal("0.5"))),
   GreaterThan(Cast(Literal("1.5"), DoubleType), 
Cast(Literal(BigDecimal("0.5")),
 DoubleType)))
+ruleTest(rule,
+  GreaterThan(Literal("1.0"), Literal(BigDecimal("1"))),
+  GreaterThan(Cast(Literal("1.0"), DecimalType(1, 0)), 
Literal(BigDecimal("1"
 // Checks that dates/timestamps are not promoted to strings
 val date0301 = Literal(java.sql.Date.valueOf("2017-03-01"))
 val timestamp030100 = Literal(Timestamp.valueOf("2017-03-01 00:00:00"))
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/typeCoercion/native/binaryComparison.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/typeCoercion/native/binaryComparison.sql.out
index 6df9a4c21a9f..c2dfe61b259d 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/typeCoercion/native/binaryComparison.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/typeCoercion/native/binaryComparison.sql.out
@@ -1330,7 +1330,7 @@ Project [NOT (cast(cast(null as string) as bigint) = 
cast(1 as bigint)) AS (NOT
 -- !query
 SELECT cast(1 as decimal(10, 0)) = '1' FROM t
 -- !query analysis
-Project [(cast(cast(1 as decimal(10,0)) as double) = cast(1 as double)) AS 
(CAST(1 AS DECIMAL(10,0)) = 1)#x]
+Project [(cast(1 as decimal(10,0)) = cast