[spark] branch master updated: [SPARK-43302][SQL][FOLLOWUP] Code cleanup for PythonUDAF

2023-05-16 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fddf25a4dd8 [SPARK-43302][SQL][FOLLOWUP] Code cleanup for PythonUDAF
fddf25a4dd8 is described below

commit fddf25a4dd8029db89287416de39adb27e8643c8
Author: Wenchen Fan 
AuthorDate: Wed May 17 13:39:27 2023 +0800

[SPARK-43302][SQL][FOLLOWUP] Code cleanup for PythonUDAF

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/40739 to do some 
code cleanup
1. remove the pattern `PYTHON_UDAF` as it's not used by any rule.
2. add `PythonFuncExpression.evalType` for convenience: catalyst rules 
(including third-party extensions) may want to get the eval type of a python 
function, no matter it's UDF or UDAF.
3. update the python profile to use `PythonUDAF.resultId` instead of 
`AggregateExpression.resultId`, to be consistent with `PythonUDF`

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #41142 from cloud-fan/follow.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Kent Yao 
---
 python/pyspark/sql/column.py |  4 
 python/pyspark/sql/udf.py| 12 +++-
 .../apache/spark/sql/catalyst/expressions/PythonUDF.scala| 11 ++-
 .../org/apache/spark/sql/catalyst/trees/TreePatterns.scala   |  1 -
 .../sql/execution/python/UserDefinedPythonFunction.scala | 11 +--
 5 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index 49a42406048..3cf59989965 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -69,6 +69,10 @@ def _to_java_column(col: "ColumnOrName") -> JavaObject:
 return jcol
 
 
+def _to_java_expr(col: "ColumnOrName") -> JavaObject:
+return _to_java_column(col).expr()
+
+
 def _to_seq(
 sc: SparkContext,
 cols: Iterable["ColumnOrName"],
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 45828187295..87d53266edf 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -30,7 +30,7 @@ from py4j.java_gateway import JavaObject
 from pyspark import SparkContext
 from pyspark.profiler import Profiler
 from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType
-from pyspark.sql.column import Column, _to_java_column, _to_seq
+from pyspark.sql.column import Column, _to_java_column, _to_java_expr, _to_seq
 from pyspark.sql.types import (
 ArrayType,
 BinaryType,
@@ -419,8 +419,9 @@ class UserDefinedFunction:
 
 func.__signature__ = inspect.signature(f)  # type: 
ignore[attr-defined]
 judf = self._create_judf(func)
-jPythonUDF = judf.apply(_to_seq(sc, cols, _to_java_column))
-id = jPythonUDF.expr().resultId().id()
+jUDFExpr = judf.builder(_to_seq(sc, cols, _to_java_expr))
+jPythonUDF = judf.fromUDFExpr(jUDFExpr)
+id = jUDFExpr.resultId().id()
 sc.profiler_collector.add_profiler(id, profiler)
 else:  # memory_profiler_enabled
 f = self.func
@@ -436,8 +437,9 @@ class UserDefinedFunction:
 
 func.__signature__ = inspect.signature(f)  # type: 
ignore[attr-defined]
 judf = self._create_judf(func)
-jPythonUDF = judf.apply(_to_seq(sc, cols, _to_java_column))
-id = jPythonUDF.expr().resultId().id()
+jUDFExpr = judf.builder(_to_seq(sc, cols, _to_java_expr))
+jPythonUDF = judf.fromUDFExpr(jUDFExpr)
+id = jUDFExpr.resultId().id()
 sc.profiler_collector.add_profiler(id, memory_profiler)
 else:
 judf = self._judf
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
index 08ffbea5510..8636eb61034 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
@@ -22,7 +22,7 @@ import org.apache.spark.api.python.{PythonEvalType, 
PythonFunction}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
-import org.apache.spark.sql.catalyst.trees.TreePattern.{PYTHON_UDAF, 
PYTHON_UDF, TreePattern}
+import 

[spark] branch master updated (772ef41d6a4 -> 9cb3174a5ef)

2023-05-16 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 772ef41d6a4 [SPARK-43461][BUILD] Skip compiling useless files when 
making distribution
 add 9cb3174a5ef [SPARK-43532][BUILD][TESTS] Upgrade `jdbc` related test 
dependencies

No new revisions were added by this update.

Summary of changes:
 pom.xml | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43461][BUILD] Skip compiling useless files when making distribution

2023-05-16 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 772ef41d6a4 [SPARK-43461][BUILD] Skip compiling useless files when 
making distribution
772ef41d6a4 is described below

commit 772ef41d6a40ed219bcdf8a51691391fa6eba75d
Author: Yuming Wang 
AuthorDate: Wed May 17 11:33:26 2023 +0800

[SPARK-43461][BUILD] Skip compiling useless files when making distribution

### What changes were proposed in this pull request?

This PR add more skip properties when making distribution:
- `-Dmaven.javadoc.skip=true` to skip generating javadoc
- `-Dmaven.scaladoc.skip=true` to skip generating scaladoc. Please see: 
https://davidb.github.io/scala-maven-plugin/doc-jar-mojo.html#skip
- `-Dmaven.source.skip` to skip generating sources.jar
- `-Dcyclonedx.skip=true` to skip making bom. Please see: 
https://cyclonedx.github.io/cyclonedx-maven-plugin/makeBom-mojo.html#skip

### Why are the changes needed?

Reduce time spent on making distribution.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual test:
```sh
./dev/make-distribution.sh --tgz --pip -Phadoop-3 -Phive 
-Phive-thriftserver -Pyarn -Phadoop-provided
```

Before this PR | After this PR
-- | --
43 min total from scheduled to completion | 23 min total from scheduled to 
completion

Closes #41141 from wangyum/SPARK-43461.

Lead-authored-by: Yuming Wang 
Co-authored-by: Yuming Wang 
Signed-off-by: yangjie01 
---
 dev/make-distribution.sh | 8 +++-
 pom.xml  | 2 ++
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh
index 948ee19fbac..ef7c010e930 100755
--- a/dev/make-distribution.sh
+++ b/dev/make-distribution.sh
@@ -166,7 +166,13 @@ export MAVEN_OPTS="${MAVEN_OPTS:--Xss128m -Xmx4g 
-XX:ReservedCodeCacheSize=128m}
 # Store the command as an array because $MVN variable might have spaces in it.
 # Normal quoting tricks don't work.
 # See: http://mywiki.wooledge.org/BashFAQ/050
-BUILD_COMMAND=("$MVN" clean package -DskipTests $@)
+BUILD_COMMAND=("$MVN" clean package \
+-DskipTests \
+-Dmaven.javadoc.skip=true \
+-Dmaven.scaladoc.skip=true \
+-Dmaven.source.skip \
+-Dcyclonedx.skip=true \
+$@)
 
 # Actually build the jar
 echo -e "\nBuilding with..."
diff --git a/pom.xml b/pom.xml
index 1f6305b77ab..7f37f8f1a3a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,6 +175,7 @@
   See: SPARK-36547, SPARK-38394.
-->
 4.8.0
+false
 2.15.0
 
 true
@@ -2851,6 +2852,7 @@
 true
 true
 incremental
+${maven.scaladoc.skip}
 
   -unchecked
   -deprecation


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b04e2004558 -> 5b8721b83b6)

2023-05-16 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from b04e2004558 [SPARK-43525][BUILD] Import `scala.collection` instead of 
`collection`
 add 5b8721b83b6 [SPARK-43531][CONNECT][PYTHON][TESTS] Enable more parity 
tests for Pandas UDFs

No new revisions were added by this update.

Summary of changes:
 .../connect/test_parity_pandas_cogrouped_map.py|  45 ++
 .../connect/test_parity_pandas_grouped_map.py  |  54 ++-
 .../sql/tests/connect/test_parity_pandas_udf.py|   9 +-
 .../sql/tests/pandas/test_pandas_cogrouped_map.py  |  50 --
 .../sql/tests/pandas/test_pandas_grouped_map.py| 174 -
 python/pyspark/sql/tests/pandas/test_pandas_udf.py |  91 +--
 6 files changed, 210 insertions(+), 213 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43525][BUILD] Import `scala.collection` instead of `collection`

2023-05-16 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b04e2004558 [SPARK-43525][BUILD] Import `scala.collection` instead of 
`collection`
b04e2004558 is described below

commit b04e2004558550b39b0901c20b98af5998a7
Author: panbingkun 
AuthorDate: Wed May 17 09:24:10 2023 +0900

[SPARK-43525][BUILD] Import `scala.collection` instead of `collection`

### What changes were proposed in this pull request?
- The pr aims to add check rules, importing in `collection`(instead with 
`scala.collection`) format is not allowed.
- Adjust the code import according to the above rules.

### Why are the changes needed?
I found that some developers sometimes write `collection.JavaConverters._` 
when import `JavaConverters._`, while others write `scala.JavaConverters._`
Actually, they all belong to the `scala group`, so it is necessary for us 
to specify a clearer rule to make the code style more consistent

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

Closes #41185 from panbingkun/SPARK-43525.

Authored-by: panbingkun 
Signed-off-by: Hyukjin Kwon 
---
 .../scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala  | 3 ++-
 .../scala/org/apache/spark/sql/connect/service/SessionHolder.scala | 2 +-
 .../spark/sql/connect/service/AddArtifactsHandlerSuite.scala   | 7 ---
 .../org/apache/spark/streaming/kinesis/KinesisInputDStream.scala   | 2 +-
 .../spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala  | 3 ++-
 core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala | 2 +-
 .../protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala | 2 +-
 .../spark/status/protobuf/ApplicationInfoWrapperSerializer.scala   | 2 +-
 .../spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala   | 2 +-
 .../apache/spark/status/protobuf/JobDataWrapperSerializer.scala| 2 +-
 .../apache/spark/status/protobuf/KVStoreProtobufSerializer.scala   | 2 +-
 .../apache/spark/status/protobuf/StageDataWrapperSerializer.scala  | 3 ++-
 .../test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala| 2 +-
 core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala| 4 ++--
 scalastyle-config.xml  | 5 +
 .../spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala   | 2 +-
 .../status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala  | 2 +-
 .../src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala   | 3 ++-
 .../org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala  | 2 +-
 .../sql/execution/datasources/parquet/ParquetVectorizedSuite.scala | 2 +-
 20 files changed, 32 insertions(+), 22 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
index 5db40806d18..506ad3625b0 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
@@ -20,7 +20,8 @@ import java.io.InputStream
 import java.nio.file.{Files, Path, Paths}
 import java.util.concurrent.TimeUnit
 
-import collection.JavaConverters._
+import scala.collection.JavaConverters._
+
 import com.google.protobuf.ByteString
 import io.grpc.{ManagedChannel, Server}
 import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
index 613d7a38e9e..ca7fa0d42c5 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.connect.service
 import java.util.UUID
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
 
-import collection.JavaConverters._
+import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
 import org.apache.spark.connect.proto
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala
index 4a4e00ad997..9a4c029bb74 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala
+++ 

[spark] branch master updated: [SPARK-43528][SQL][PYTHON] Support duplicated field names in createDataFrame with pandas DataFrame

2023-05-16 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 71bac156b0d [SPARK-43528][SQL][PYTHON] Support duplicated field names 
in createDataFrame with pandas DataFrame
71bac156b0d is described below

commit 71bac156b0df2ce9e56c1c1690806a4b40ffd615
Author: Takuya UESHIN 
AuthorDate: Wed May 17 09:12:16 2023 +0900

[SPARK-43528][SQL][PYTHON] Support duplicated field names in 
createDataFrame with pandas DataFrame

### What changes were proposed in this pull request?

Support duplicated field names in `createDataFrame` with pandas DataFrame.

For with Arrow, without Arrow, and Spark Connect:

```py
>>> spark.createDataFrame(pdf, schema).show()
++---+
|struct_0|   struct_1|
++---+
|  {a, 1}|{2, 3, b, 4, c}|
|  {x, 6}|{7, 8, y, 9, z}|
++---+
```

### Why are the changes needed?

If there are duplicated field names, `createDataFrame` with pandas 
DataFrame fallbacks to without Arrow, or fails in Spark Connect.

```py
>>> import pandas as pd
>>> from pyspark.sql.types import *
>>>
>>> schema = (
... StructType()
... .add("struct_0", StructType().add("x", StringType()).add("x", 
IntegerType()))
... .add(
... "struct_1",
... StructType()
... .add("a", IntegerType())
... .add("x", IntegerType())
... .add("x", StringType())
... .add("y", IntegerType())
... .add("y", StringType()),
... )
... )
>>>
>>> data = [Row(Row("a", 1), Row(2, 3, "b", 4, "c")), Row(Row("x", 6), 
Row(7, 8, "y", 9, "z"))]
>>> pdf = pd.DataFrame.from_records(data, columns=schema.names)
```

- Without Arrow:

Works fine.

```py
>>> spark.createDataFrame(pdf, schema).show()
++---+
|struct_0|   struct_1|
++---+
|  {a, 1}|{2, 3, b, 4, c}|
|  {x, 6}|{7, 8, y, 9, z}|
++---+
```

- With Arrow:

Works with fallback.

```py
>>> spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
>>> spark.createDataFrame(pdf, schema).show()
/.../pyspark/sql/pandas/conversion.py:347: UserWarning: createDataFrame 
attempted Arrow optimization because 
'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by 
the reason below:
  [DUPLICATED_FIELD_NAME_IN_ARROW_STRUCT] Duplicated field names in Arrow 
Struct are not allowed, got [x, x].
Attempting non-optimization as 
'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
++---+
|struct_0|   struct_1|
++---+
|  {a, 1}|{2, 3, b, 4, c}|
|  {x, 6}|{7, 8, y, 9, z}|
++---+
```

- Spark Connect

Fails.

```py
>>> spark.createDataFrame(pdf, schema).show()
...
Traceback (most recent call last):
...
pyspark.errors.exceptions.connect.IllegalArgumentException: not all nodes 
and buffers were consumed. ...
```

### Does this PR introduce _any_ user-facing change?

The duplicated field names will work.

### How was this patch tested?

Added the related test.

Closes #41190 from ueshin/issues/SPARK-43528/from_pandas.

Authored-by: Takuya UESHIN 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/conversion.py   |  28 +
 python/pyspark/sql/connect/session.py  |   8 +-
 python/pyspark/sql/pandas/conversion.py|  14 ++-
 python/pyspark/sql/pandas/serializers.py   |  88 ---
 python/pyspark/sql/pandas/types.py | 125 +
 .../pyspark/sql/tests/connect/test_parity_arrow.py |   3 +
 python/pyspark/sql/tests/test_arrow.py |  28 +
 .../org/apache/spark/sql/api/r/SQLUtils.scala  |   2 +-
 .../sql/execution/arrow/ArrowConverters.scala  |   8 +-
 .../sql/execution/arrow/ArrowConvertersSuite.scala |   4 +-
 10 files changed, 254 insertions(+), 54 deletions(-)

diff --git a/python/pyspark/sql/connect/conversion.py 
b/python/pyspark/sql/connect/conversion.py
index a7ea88fb007..3cc301c38ea 100644
--- a/python/pyspark/sql/connect/conversion.py
+++ b/python/pyspark/sql/connect/conversion.py
@@ -44,7 +44,7 @@ from pyspark.sql.types import (
 from pyspark.storagelevel import StorageLevel
 from pyspark.sql.connect.types import to_arrow_schema
 import pyspark.sql.connect.proto as pb2
-from pyspark.sql.pandas.types import _dedup_names
+from pyspark.sql.pandas.types import _dedup_names, _deduplicate_field_names
 
 from typing import (
 

[spark] branch master updated (55735055d6c -> a232083f50d)

2023-05-16 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 55735055d6c [SPARK-43360][SS][CONNECT] Scala client 
StreamingQueryManager
 add a232083f50d [SPARK-43527][PYTHON] Fix `catalog.listCatalogs` in PySpark

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/catalog.py | 9 -
 1 file changed, 8 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-43527][PYTHON] Fix `catalog.listCatalogs` in PySpark

2023-05-16 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 7b148f0f7cc [SPARK-43527][PYTHON] Fix `catalog.listCatalogs` in PySpark
7b148f0f7cc is described below

commit 7b148f0f7ccdecc3d23ecf5ec8bb6818a9a5d3ac
Author: Ruifeng Zheng 
AuthorDate: Wed May 17 08:31:28 2023 +0900

[SPARK-43527][PYTHON] Fix `catalog.listCatalogs` in PySpark

### What changes were proposed in this pull request?
Fix `catalog.listCatalogs` in PySpark

### Why are the changes needed?
existing implementation outputs incorrect results

### Does this PR introduce _any_ user-facing change?
yes

before this PR:
```
In [1]: spark.catalog.listCatalogs()
Out[1]: [CatalogMetadata(name=, description=)]
```

after this PR:
```
In [1]: spark.catalog.listCatalogs()
Out[1]: [CatalogMetadata(name='spark_catalog', description=None)]
```

### How was this patch tested?
added doctest

Closes #41186 from zhengruifeng/py_list_catalog.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit a232083f50ddfdc81f2027fd3ffa89dfaa3ba199)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/catalog.py | 9 -
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index ca93a4faec6..17d3fad9c9e 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -126,12 +126,19 @@ class Catalog:
 ---
 list
 A list of :class:`CatalogMetadata`.
+
+Examples
+
+>>> spark.catalog.listCatalogs()
+[CatalogMetadata(name='spark_catalog', description=None)]
 """
 iter = self._jcatalog.listCatalogs().toLocalIterator()
 catalogs = []
 while iter.hasNext():
 jcatalog = iter.next()
-catalogs.append(CatalogMetadata(name=jcatalog.name, 
description=jcatalog.description))
+catalogs.append(
+CatalogMetadata(name=jcatalog.name(), 
description=jcatalog.description())
+)
 return catalogs
 
 def currentDatabase(self) -> str:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (56cfd40e74d -> 55735055d6c)

2023-05-16 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 56cfd40e74d [SPARK-42958][CONNECT] Refactor 
`connect-jvm-client-mima-check` to support mima check with avro module
 add 55735055d6c [SPARK-43360][SS][CONNECT] Scala client 
StreamingQueryManager

No new revisions were added by this update.

Summary of changes:
 .../scala/org/apache/spark/sql/SparkSession.scala  |   3 +
 .../spark/sql/streaming/StreamingQuery.scala   |  17 +++
 .../sql/streaming/StreamingQueryManager.scala  | 147 +
 .../CheckConnectJvmClientCompatibility.scala   |   4 +
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  30 +
 .../src/main/protobuf/spark/connect/commands.proto |   2 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  |  40 +++---
 python/pyspark/sql/connect/proto/commands_pb2.py   |  22 +--
 python/pyspark/sql/connect/proto/commands_pb2.pyi  |  13 +-
 9 files changed, 241 insertions(+), 37 deletions(-)
 create mode 100644 
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-42958][CONNECT] Refactor `connect-jvm-client-mima-check` to support mima check with avro module

2023-05-16 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 56cfd40e74d [SPARK-42958][CONNECT] Refactor 
`connect-jvm-client-mima-check` to support mima check with avro module
56cfd40e74d is described below

commit 56cfd40e74d56362a425c5e5d5d9e7260176
Author: YangJie 
AuthorDate: Tue May 16 15:50:12 2023 -0400

[SPARK-42958][CONNECT] Refactor `connect-jvm-client-mima-check` to support 
mima check with avro module

### What changes were proposed in this pull request?
This pr refactor `connect-jvm-client-mima-check` and 
`CheckConnectJvmClientCompatibility` to support mima check between 
`connect-client-jvm` and `avro` module.

### Why are the changes needed?
Do mima check between `connect-client-jvm` and `avro` module.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

Closes #40605 from LuciferYang/SPARK-42958.

Lead-authored-by: YangJie 
Co-authored-by: yangjie01 
Signed-off-by: Herman van Hovell 
---
 .../CheckConnectJvmClientCompatibility.scala   | 109 +++--
 dev/connect-jvm-client-mima-check  |  10 +-
 2 files changed, 83 insertions(+), 36 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index f9674ac38cd..ad99342e6e6 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -62,27 +62,25 @@ object CheckConnectJvmClientCompatibility {
   "spark-connect-client-jvm-assembly",
   "spark-connect-client-jvm")
   val sqlJar: File = findJar("sql/core", "spark-sql", "spark-sql")
-  val problems = checkMiMaCompatibility(clientJar, sqlJar)
-  if (problems.nonEmpty) {
-resultWriter.write(s"ERROR: Comparing client jar: $clientJar and and 
sql jar: $sqlJar \n")
-resultWriter.write(s"problems: \n")
-resultWriter.write(s"${problems.map(p => 
p.description("client")).mkString("\n")}")
-resultWriter.write("\n")
-resultWriter.write(
-  "Exceptions to binary compatibility can be added in " +
-"'CheckConnectJvmClientCompatibility#checkMiMaCompatibility'\n")
-  }
+  val problemsWithSqlModule = 
checkMiMaCompatibilityWithSqlModule(clientJar, sqlJar)
+  appendMimaCheckErrorMessageIfNeeded(
+resultWriter,
+problemsWithSqlModule,
+clientJar,
+sqlJar,
+"Sql")
+
+  val avroJar: File = findJar("connector/avro", "spark-avro", "spark-avro")
+  val problemsWithAvroModule = 
checkMiMaCompatibilityWithAvroModule(clientJar, sqlJar)
+  appendMimaCheckErrorMessageIfNeeded(
+resultWriter,
+problemsWithAvroModule,
+clientJar,
+avroJar,
+"Avro")
+
   val incompatibleApis = checkDatasetApiCompatibility(clientJar, sqlJar)
-  if (incompatibleApis.nonEmpty) {
-resultWriter.write(
-  "ERROR: The Dataset apis only exist in the connect client " +
-"module and not belong to the sql module include: \n")
-resultWriter.write(incompatibleApis.mkString("\n"))
-resultWriter.write("\n")
-resultWriter.write(
-  "Exceptions can be added to exceptionMethods in " +
-
"'CheckConnectJvmClientCompatibility#checkDatasetApiCompatibility'\n")
-  }
+  appendIncompatibleDatasetApisErrorMessageIfNeeded(resultWriter, 
incompatibleApis)
 } catch {
   case e: Throwable =>
 println(e.getMessage)
@@ -94,16 +92,17 @@ object CheckConnectJvmClientCompatibility {
 }
   }
 
-  /**
-   * MiMa takes an old jar (sql jar) and a new jar (client jar) as inputs and 
then reports all
-   * incompatibilities found in the new jar. The incompatibility result is 
then filtered using
-   * include and exclude rules. Include rules are first applied to find all 
client classes that
-   * need to be checked. Then exclude rules are applied to filter out all 
unsupported methods in
-   * the client classes.
-   */
-  private def checkMiMaCompatibility(clientJar: File, sqlJar: File): 
List[Problem] = {
-val mima = new MiMaLib(Seq(clientJar, sqlJar))
-val allProblems = mima.collectProblems(sqlJar, clientJar, List.empty)
+  private def checkMiMaCompatibilityWithAvroModule(
+  clientJar: File,
+  avroJar: File): List[Problem] = {
+val includedRules = 

[spark] branch branch-3.4 updated: [SPARK-43043][CORE] Improve the performance of MapOutputTracker.updateMapOutput

2023-05-16 Thread jiangxb1987
This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new f68ece9e607 [SPARK-43043][CORE] Improve the performance of 
MapOutputTracker.updateMapOutput
f68ece9e607 is described below

commit f68ece9e6074cecdaf74ad9b39eae3c7dc2cfaf1
Author: Xingbo Jiang 
AuthorDate: Tue May 16 11:34:30 2023 -0700

[SPARK-43043][CORE] Improve the performance of 
MapOutputTracker.updateMapOutput

### What changes were proposed in this pull request?

The PR changes the implementation of MapOutputTracker.updateMapOutput() to 
search for the MapStatus under the help of a mapping from mapId to mapIndex, 
previously it was performing a linear search, which would become performance 
bottleneck if a large proportion of all blocks in the map are migrated.

### Why are the changes needed?

To avoid performance bottleneck when block decommission is enabled and a 
lot of blocks are migrated within a short time window.

### Does this PR introduce _any_ user-facing change?

No, it's pure performance improvement.

### How was this patch tested?

Manually test.

Closes #40690 from jiangxb1987/SPARK-43043.

Lead-authored-by: Xingbo Jiang 
Co-authored-by: Jiang Xingbo 
Signed-off-by: Xingbo Jiang 
(cherry picked from commit 66a2eb8f8957c22c69519b39be59beaaf931822b)
Signed-off-by: Xingbo Jiang 
---
 .../scala/org/apache/spark/MapOutputTracker.scala  | 26 +-
 .../apache/spark/util/collection/OpenHashMap.scala | 18 +++
 .../spark/util/collection/OpenHashMapSuite.scala   | 18 +++
 3 files changed, 56 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index fade0b86dd8..2dd3a903ee2 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -42,6 +42,7 @@ import org.apache.spark.scheduler.{MapStatus, MergeStatus, 
ShuffleOutputStatus}
 import org.apache.spark.shuffle.MetadataFetchFailedException
 import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId, 
ShuffleMergedBlockId}
 import org.apache.spark.util._
+import org.apache.spark.util.collection.OpenHashMap
 import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
 
 /**
@@ -147,6 +148,12 @@ private class ShuffleStatus(
 
   private[this] var shufflePushMergerLocations: Seq[BlockManagerId] = Seq.empty
 
+  /**
+   * Mapping from a mapId to the mapIndex, this is required to reduce the 
searching overhead within
+   * the function updateMapOutput(mapId, bmAddress).
+   */
+  private[this] val mapIdToMapIndex = new OpenHashMap[Long, Int]()
+
   /**
* Register a map output. If there is already a registered location for the 
map output then it
* will be replaced by the new location.
@@ -157,6 +164,14 @@ private class ShuffleStatus(
   invalidateSerializedMapOutputStatusCache()
 }
 mapStatuses(mapIndex) = status
+mapIdToMapIndex(status.mapId) = mapIndex
+  }
+
+  /**
+   * Get the map output that corresponding to a given mapId.
+   */
+  def getMapStatus(mapId: Long): Option[MapStatus] = withReadLock {
+mapIdToMapIndex.get(mapId).map(mapStatuses(_))
   }
 
   /**
@@ -164,15 +179,16 @@ private class ShuffleStatus(
*/
   def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = 
withWriteLock {
 try {
-  val mapStatusOpt = mapStatuses.find(x => x != null && x.mapId == mapId)
+  val mapIndex = mapIdToMapIndex.get(mapId)
+  val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_))
   mapStatusOpt match {
 case Some(mapStatus) =>
   logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
   mapStatus.updateLocation(bmAddress)
   invalidateSerializedMapOutputStatusCache()
 case None =>
-  val index = mapStatusesDeleted.indexWhere(x => x != null && x.mapId 
== mapId)
-  if (index >= 0 && mapStatuses(index) == null) {
+  if (mapIndex.map(mapStatusesDeleted).exists(_.mapId == mapId)) {
+val index = mapIndex.get
 val mapStatus = mapStatusesDeleted(index)
 mapStatus.updateLocation(bmAddress)
 mapStatuses(index) = mapStatus
@@ -1133,9 +1149,7 @@ private[spark] class MapOutputTrackerMaster(
*/
   def getMapOutputLocation(shuffleId: Int, mapId: Long): 
Option[BlockManagerId] = {
 shuffleStatuses.get(shuffleId).flatMap { shuffleStatus =>
-  shuffleStatus.withMapStatuses { mapStatues =>
-mapStatues.filter(_ != null).find(_.mapId == mapId).map(_.location)
-  }
+  shuffleStatus.getMapStatus(mapId).map(_.location)
 }
   }
 

[spark] branch master updated: [SPARK-43043][CORE] Improve the performance of MapOutputTracker.updateMapOutput

2023-05-16 Thread jiangxb1987
This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 66a2eb8f895 [SPARK-43043][CORE] Improve the performance of 
MapOutputTracker.updateMapOutput
66a2eb8f895 is described below

commit 66a2eb8f8957c22c69519b39be59beaaf931822b
Author: Xingbo Jiang 
AuthorDate: Tue May 16 11:34:30 2023 -0700

[SPARK-43043][CORE] Improve the performance of 
MapOutputTracker.updateMapOutput

### What changes were proposed in this pull request?

The PR changes the implementation of MapOutputTracker.updateMapOutput() to 
search for the MapStatus under the help of a mapping from mapId to mapIndex, 
previously it was performing a linear search, which would become performance 
bottleneck if a large proportion of all blocks in the map are migrated.

### Why are the changes needed?

To avoid performance bottleneck when block decommission is enabled and a 
lot of blocks are migrated within a short time window.

### Does this PR introduce _any_ user-facing change?

No, it's pure performance improvement.

### How was this patch tested?

Manually test.

Closes #40690 from jiangxb1987/SPARK-43043.

Lead-authored-by: Xingbo Jiang 
Co-authored-by: Jiang Xingbo 
Signed-off-by: Xingbo Jiang 
---
 .../scala/org/apache/spark/MapOutputTracker.scala  | 26 +-
 .../apache/spark/util/collection/OpenHashMap.scala | 18 +++
 .../spark/util/collection/OpenHashMapSuite.scala   | 18 +++
 3 files changed, 56 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 5ad62159d24..9a5cf1da9e4 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -42,6 +42,7 @@ import org.apache.spark.scheduler.{MapStatus, MergeStatus, 
ShuffleOutputStatus}
 import org.apache.spark.shuffle.MetadataFetchFailedException
 import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId, 
ShuffleMergedBlockId}
 import org.apache.spark.util._
+import org.apache.spark.util.collection.OpenHashMap
 import org.apache.spark.util.io.{ChunkedByteBuffer, 
ChunkedByteBufferOutputStream}
 
 /**
@@ -147,6 +148,12 @@ private class ShuffleStatus(
 
   private[this] var shufflePushMergerLocations: Seq[BlockManagerId] = Seq.empty
 
+  /**
+   * Mapping from a mapId to the mapIndex, this is required to reduce the 
searching overhead within
+   * the function updateMapOutput(mapId, bmAddress).
+   */
+  private[this] val mapIdToMapIndex = new OpenHashMap[Long, Int]()
+
   /**
* Register a map output. If there is already a registered location for the 
map output then it
* will be replaced by the new location.
@@ -157,6 +164,14 @@ private class ShuffleStatus(
   invalidateSerializedMapOutputStatusCache()
 }
 mapStatuses(mapIndex) = status
+mapIdToMapIndex(status.mapId) = mapIndex
+  }
+
+  /**
+   * Get the map output that corresponding to a given mapId.
+   */
+  def getMapStatus(mapId: Long): Option[MapStatus] = withReadLock {
+mapIdToMapIndex.get(mapId).map(mapStatuses(_))
   }
 
   /**
@@ -164,15 +179,16 @@ private class ShuffleStatus(
*/
   def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = 
withWriteLock {
 try {
-  val mapStatusOpt = mapStatuses.find(x => x != null && x.mapId == mapId)
+  val mapIndex = mapIdToMapIndex.get(mapId)
+  val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_))
   mapStatusOpt match {
 case Some(mapStatus) =>
   logInfo(s"Updating map output for ${mapId} to ${bmAddress}")
   mapStatus.updateLocation(bmAddress)
   invalidateSerializedMapOutputStatusCache()
 case None =>
-  val index = mapStatusesDeleted.indexWhere(x => x != null && x.mapId 
== mapId)
-  if (index >= 0 && mapStatuses(index) == null) {
+  if (mapIndex.map(mapStatusesDeleted).exists(_.mapId == mapId)) {
+val index = mapIndex.get
 val mapStatus = mapStatusesDeleted(index)
 mapStatus.updateLocation(bmAddress)
 mapStatuses(index) = mapStatus
@@ -1137,9 +1153,7 @@ private[spark] class MapOutputTrackerMaster(
*/
   def getMapOutputLocation(shuffleId: Int, mapId: Long): 
Option[BlockManagerId] = {
 shuffleStatuses.get(shuffleId).flatMap { shuffleStatus =>
-  shuffleStatus.withMapStatuses { mapStatues =>
-mapStatues.filter(_ != null).find(_.mapId == mapId).map(_.location)
-  }
+  shuffleStatus.getMapStatus(mapId).map(_.location)
 }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala 

[spark] branch master updated: [SPARK-43359][SQL] Delete from Hive table should throw "UNSUPPORTED_FEATURE.TABLE_OPERATION"

2023-05-16 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 530dea61c88 [SPARK-43359][SQL] Delete from Hive table should throw 
"UNSUPPORTED_FEATURE.TABLE_OPERATION"
530dea61c88 is described below

commit 530dea61c88780c82b2e3e624fbe2a405e602731
Author: panbingkun 
AuthorDate: Tue May 16 11:21:42 2023 -0700

[SPARK-43359][SQL] Delete from Hive table should throw 
"UNSUPPORTED_FEATURE.TABLE_OPERATION"

### What changes were proposed in this pull request?
The pr aims to fix error exception about 'DELETE from Hive table'

### Why are the changes needed?
Proper names of error classes should improve user experience with Spark SQL.

**BEFORE**
```
scala> sql("delete from t")
org.apache.spark.SparkException: [INTERNAL_ERROR] Unexpected table 
relation: HiveTableRelation [`spark_catalog`.`default`.`t`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [a#0], Partition 
Cols: []]
```

**AFTER**
```
scala> sql("delete from t")
org.apache.spark.sql.AnalysisException: 
[UNSUPPORTED_FEATURE.TABLE_OPERATION] The feature is not supported: Table 
`spark_catalog`.`default`.`t` does not support DELETE. Please check the current 
catalog and namespace to make sure the qualified table name is expected, and 
also check the catalog implementation which is configured by 
"spark.sql.catalog".
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA & Add new UT.

Closes #41172 from panbingkun/SPARK-43359.

Authored-by: panbingkun 
Signed-off-by: Dongjoon Hyun 
---
 .../scala/org/apache/spark/sql/hive/HiveStrategies.scala |  7 ++-
 .../apache/spark/sql/hive/execution/HiveDDLSuite.scala   | 16 
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 6c5646a2416..b2438d38520 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, 
InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, 
Statistics, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution._
@@ -178,6 +178,11 @@ object HiveAnalysis extends Rule[LogicalPlan] {
   if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath)
 
   InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, 
child.output.map(_.name))
+
+case DeleteFromTable(SubqueryAlias(_, HiveTableRelation(table, _, _, _, 
_)), _) =>
+  throw QueryCompilationErrors.unsupportedTableOperationError(
+table.identifier,
+"DELETE")
   }
 }
 
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index f17ec922b9b..6fd17ed5d9e 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -3098,4 +3098,20 @@ class HiveDDLSuite
   "CREATE TABLE tab (c1 int) PARTITIONED BY (c1) STORED AS PARQUET",
   "Cannot use all columns for partition columns")
   }
+
+  test("SPARK-43359: Delete table not allowed") {
+val tbl = "t1"
+withTable(tbl) {
+  sql(s"CREATE TABLE $tbl(c1 INT)")
+  val e = intercept[AnalysisException] {
+sql(s"DELETE FROM $tbl WHERE c1 = 1")
+  }
+  checkError(e,
+errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
+parameters = Map(
+  "tableName" -> s"`spark_catalog`.`default`.`$tbl`",
+  "operation" -> "DELETE")
+  )
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (6f377efd3f3 -> 60aa1127c51)

2023-05-16 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 6f377efd3f3 [SPARK-38469][CORE] Use error class in 
org.apache.spark.network
 add 60aa1127c51 [SPARK-43520][BUILD][TESTS] Upgrade `mysql-connector-java` 
to 8.0.33

No new revisions were added by this update.

Summary of changes:
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38469][CORE] Use error class in org.apache.spark.network

2023-05-16 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6f377efd3f3 [SPARK-38469][CORE] Use error class in 
org.apache.spark.network
6f377efd3f3 is described below

commit 6f377efd3f3b8db1909349a7c134929a2ec0bf60
Author: Bo Zhang 
AuthorDate: Tue May 16 19:13:20 2023 +0300

[SPARK-38469][CORE] Use error class in org.apache.spark.network

### What changes were proposed in this pull request?
This PR aims to change exceptions created in package 
org.apache.spark.netrowk to use error class. This also adds an error class 
INTERNAL_ERROR_NETWORK and uses that for the internal errors in the package.

### Why are the changes needed?
This is to move exceptions created in package org.apache.spark.network to 
error class.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests.

Closes #41140 from bozhang2820/spark-38469.

Lead-authored-by: Bo Zhang 
Co-authored-by: Bo Zhang 
Signed-off-by: Max Gekk 
---
 .../main/scala/org/apache/spark/SparkException.scala  |  3 ++-
 core/src/main/resources/error/error-classes.json  |  6 ++
 .../spark/network/netty/NettyBlockRpcServer.scala | 19 ---
 .../network/netty/NettyBlockTransferService.scala |  2 +-
 4 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala 
b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
index 4abf0fdf498..feb7bf5b66e 100644
--- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
+++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
@@ -118,7 +118,8 @@ private[spark] case class SparkUserAppException(exitCode: 
Int)
  * Exception thrown when the relative executor to access is dead.
  */
 private[spark] case class ExecutorDeadException(message: String)
-  extends SparkException(message)
+  extends SparkException(errorClass = "INTERNAL_ERROR_NETWORK",
+messageParameters = Map("message" -> message), cause = null)
 
 /**
  * Exception thrown when Spark returns different result after upgrading to a 
new version.
diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index edc5a5a66e5..24f972a5006 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -830,6 +830,12 @@
 ],
 "sqlState" : "XX000"
   },
+  "INTERNAL_ERROR_NETWORK" : {
+"message" : [
+  ""
+],
+"sqlState" : "XX000"
+  },
   "INTERVAL_ARITHMETIC_OVERFLOW" : {
 "message" : [
   "."
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
index f2a1fe49fcf..16ad848a326 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.BlockDataManager
 import org.apache.spark.network.buffer.NioManagedBuffer
@@ -93,8 +94,8 @@ class NettyBlockRpcServer(
   } else {
 val startAndEndId = fetchShuffleBlocks.reduceIds(index)
 if (startAndEndId.length != 2) {
-  throw new IllegalStateException(s"Invalid shuffle fetch request 
when batch mode " +
-s"is enabled: $fetchShuffleBlocks")
+  throw SparkException.internalError("Invalid shuffle fetch 
request when batch mode " +
+s"is enabled: $fetchShuffleBlocks", category = "NETWORK")
 }
 Array(blockManager.getLocalBlockData(
   ShuffleBlockBatchId(
@@ -125,8 +126,10 @@ class NettyBlockRpcServer(
 if (blockStored) {
   responseContext.onSuccess(ByteBuffer.allocate(0))
 } else {
-  val exception = new Exception(s"Upload block for $blockId failed. 
This mostly happens " +
-s"when there is not sufficient space available to store the 
block.")
+  val exception = SparkException.internalError(
+s"Upload block for $blockId failed. This mostly happens " +
+"when there is not sufficient space available to store the block.",
+category = "NETWORK")
   responseContext.onFailure(exception)
 }
 
@@ -137,13 +140,15 @@ class NettyBlockRpcServer(
   val errorMsg = "Invalid GetLocalDirsForExecutors request: " +
 s"${if (isIncorrectAppId) s"incorrect application id: 

[spark] branch master updated: [SPARK-43512][SS][TESTS] Update StateStoreOperationsBenchmark to reflect updates to RocksDB usage as state store provider

2023-05-16 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 28a2a2e08ee [SPARK-43512][SS][TESTS] Update 
StateStoreOperationsBenchmark to reflect updates to RocksDB usage as state 
store provider
28a2a2e08ee is described below

commit 28a2a2e08ee90f6ca633fb95a7c44721e331d17b
Author: Anish Shrigondekar 
AuthorDate: Tue May 16 08:55:58 2023 -0700

[SPARK-43512][SS][TESTS] Update StateStoreOperationsBenchmark to reflect 
updates to RocksDB usage as state store provider

### What changes were proposed in this pull request?
Update StateStoreOperationsBenchmark to reflect updates to RocksDB usage as 
state store provider

### Why are the changes needed?
Need the changes to unblock RocksDB JNI upgrade

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Ran it locally and using Github Actions. Run now takes ~40-50mins

Closes #41175 from anishshri-db/task/SPARK-43512.

Authored-by: Anish Shrigondekar 
Signed-off-by: Dongjoon Hyun 
---
 ...StoreBasicOperationsBenchmark-jdk11-results.txt | 169 +
 ...StoreBasicOperationsBenchmark-jdk17-results.txt | 199 +++--
 .../StateStoreBasicOperationsBenchmark-results.txt | 199 +++--
 .../StateStoreBasicOperationsBenchmark.scala   |  12 +-
 4 files changed, 180 insertions(+), 399 deletions(-)

diff --git 
a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk11-results.txt 
b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk11-results.txt
index f75157f59ba..5ba8d824b71 100644
--- a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk11-results.txt
+++ b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk11-results.txt
@@ -2,182 +2,109 @@
 put rows
 

 
-OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1035-azure
+OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1037-azure
 Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
 putting 1 rows (1 rows to overwrite - rate 100):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
---
-In-memory9 
11   1  1.2 869.5   1.0X
-RocksDB (trackTotalNumberOfRows: true)  55 
59   1  0.25544.5   0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 
17   1  0.71466.2   0.6X
+In-memory9 
10   1  1.1 894.3   1.0X
+RocksDB (trackTotalNumberOfRows: true)  70 
75   3  0.16964.6   0.1X
+RocksDB (trackTotalNumberOfRows: false) 23 
26   1  0.42342.7   0.4X
 
-OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1035-azure
-Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
-putting 1 rows (7500 rows to overwrite - rate 75):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
--
-In-memory  9   
  10   1  1.2 851.0   1.0X
-RocksDB (trackTotalNumberOfRows: true)52   
  55   1  0.25168.1   0.2X
-RocksDB (trackTotalNumberOfRows: false)   15   
  17   1  0.71521.9   0.6X
-
-OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1035-azure
+OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1037-azure
 Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
 putting 1 rows (5000 rows to overwrite - rate 50):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 
-
-In-memory  9   
  10   1  1.2 864.1   1.0X
-RocksDB (trackTotalNumberOfRows: true)46   
  49   1  0.24568.5   0.2X
-RocksDB (trackTotalNumberOfRows: false)   16   
  17   1  0.6

[spark] branch master updated: [SPARK-42604][CONNECT][FOLLOWUP] Remove `typedlit/typedLit` `ProblemFilters.exclude` rule from mima check

2023-05-16 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 606193301bf [SPARK-42604][CONNECT][FOLLOWUP] Remove 
`typedlit/typedLit` `ProblemFilters.exclude` rule from mima check
606193301bf is described below

commit 606193301bf361fc6a2b763fab074d766aeb52a4
Author: yangjie01 
AuthorDate: Tue May 16 21:34:15 2023 +0800

[SPARK-42604][CONNECT][FOLLOWUP] Remove `typedlit/typedLit` 
`ProblemFilters.exclude` rule from mima check

### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/40355 has implemented the 
`functions#typedlit ` and `functions#typedLit `, so this pr remove the 
corresponding `ProblemFilters.exclude` rule from 
`CheckConnectJvmClientCompatibility`

### Why are the changes needed?
Remove `unnecessary` `ProblemFilters.exclude` rule.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual check `dev/connect-jvm-client-mima-check` passed

Closes #41183 from LuciferYang/SPARK-42604-FOLLOWUP.

Authored-by: yangjie01 
Signed-off-by: yangjie01 
---
 .../spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala   | 2 --
 1 file changed, 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 32a44c350d9..f9674ac38cd 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -177,8 +177,6 @@ object CheckConnectJvmClientCompatibility {
   
ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.callUDF"),
   
ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.unwrap_udt"),
   ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.udaf"),
-  
ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.typedlit"),
-  
ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.typedLit"),
 
   // KeyValueGroupedDataset
   ProblemFilters.exclude[Problem](


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-39281][SQL] Speed up Timestamp type inference with legacy format in JSON/CSV data source

2023-05-16 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3192bbd2958 [SPARK-39281][SQL] Speed up Timestamp type inference with 
legacy format  in JSON/CSV data source
3192bbd2958 is described below

commit 3192bbd29585607d43d0819c6c2d3ac00180261a
Author: Jia Fan 
AuthorDate: Tue May 16 15:59:01 2023 +0300

[SPARK-39281][SQL] Speed up Timestamp type inference with legacy format  in 
JSON/CSV data source

### What changes were proposed in this pull request?
Follow up https://github.com/apache/spark/pull/36562 , performance 
improvement when Timestamp type inference with legacy format.

In the current implementation of CSV/JSON data source, the schema inference 
with legacy format relies on methods that will throw exceptions if the fields 
can't convert as some data types .

Throwing and catching exceptions can be slow. We can improve it by creating 
methods that return optional results instead.

The optimization of DefaultTimestampFormatter has been implemented in 
https://github.com/apache/spark/pull/36562 , this PR adds the optimization of 
legacy format. The basic logic is to prevent the formatter from throwing 
exceptions, and then use catch to determine whether the parsing is successful.

### Why are the changes needed?

Performance improvement when Timestamp type inference with legacy format.

When use JSON datasource, the speed up `67%`. CSV datasource speed also up, 
but not obvious.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add new test

Closes #41091 from Hisoka-X/SPARK-39281_legacy_format.

Lead-authored-by: Jia Fan 
Co-authored-by: Hisoka 
Signed-off-by: Max Gekk 
---
 .../sql/catalyst/util/TimestampFormatter.scala |  22 
 .../catalyst/util/TimestampFormatterSuite.scala|  19 
 sql/core/benchmarks/CSVBenchmark-jdk11-results.txt |  82 +++---
 sql/core/benchmarks/CSVBenchmark-jdk17-results.txt |  82 +++---
 sql/core/benchmarks/CSVBenchmark-results.txt   |  82 +++---
 .../benchmarks/JsonBenchmark-jdk11-results.txt |  98 -
 .../benchmarks/JsonBenchmark-jdk17-results.txt | 122 ++---
 sql/core/benchmarks/JsonBenchmark-results.txt  | 122 ++---
 8 files changed, 335 insertions(+), 294 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
index 2a8283bde1d..aab90ec3844 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala
@@ -407,6 +407,19 @@ class LegacyFastTimestampFormatter(
 if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) {
   throw new IllegalArgumentException(s"'$s' is an invalid timestamp")
 }
+extractMicros(cal)
+  }
+
+  override def parseOptional(s: String): Option[Long] = {
+cal.clear() // Clear the calendar because it can be re-used many times
+if (fastDateFormat.parse(s, new ParsePosition(0), cal)) {
+  Some(extractMicros(cal))
+} else {
+  None
+}
+  }
+
+  private def extractMicros(cal: MicrosCalendar): Long = {
 val micros = cal.getMicros()
 cal.set(Calendar.MILLISECOND, 0)
 val julianMicros = Math.addExact(millisToMicros(cal.getTimeInMillis), 
micros)
@@ -451,6 +464,15 @@ class LegacySimpleTimestampFormatter(
 fromJavaTimestamp(new Timestamp(sdf.parse(s).getTime))
   }
 
+  override def parseOptional(s: String): Option[Long] = {
+val date = sdf.parse(s, new ParsePosition(0))
+if (date == null) {
+  None
+} else {
+  Some(fromJavaTimestamp(new Timestamp(date.getTime)))
+}
+  }
+
   override def format(us: Long): String = {
 sdf.format(toJavaTimestamp(us))
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
index 10553d421ea..8f6099e96ef 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala
@@ -489,4 +489,23 @@ class TimestampFormatterSuite extends 
DatetimeFormatterSuite {
 assert(formatter.parseWithoutTimeZoneOptional("2012-00-65 23:59:59.9990", 
false)
   .isEmpty)
   }
+
+  test("SPARK-39281: support returning optional parse results in the legacy 
formatter") {
+val fastFormatter = new LegacyFastTimestampFormatter(
+  "-MM-dd 

[spark] branch master updated: [SPARK-43518][SQL] Convert `_LEGACY_ERROR_TEMP_2029` to INTERNAL_ERROR

2023-05-16 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new dbadb5f275c [SPARK-43518][SQL] Convert `_LEGACY_ERROR_TEMP_2029` to 
INTERNAL_ERROR
dbadb5f275c is described below

commit dbadb5f275cf0519b8b1ed78decfe4ce83934825
Author: panbingkun 
AuthorDate: Tue May 16 12:47:15 2023 +0300

[SPARK-43518][SQL] Convert `_LEGACY_ERROR_TEMP_2029` to INTERNAL_ERROR

### What changes were proposed in this pull request?
The pr aims to convert _LEGACY_ERROR_TEMP_2029 to INTERNAL_ERROR.

### Why are the changes needed?
1. I found that it can only be triggered it with the parameter value: 
UP,DOWN,HALF_DOWN,UNNECESSARY, but from a user's perspective, it is impossible 
(the internal code limits its value to only: HALF_UP,HALF_EVEN,CEILING,FLOOR), 
so we should convert it to an internal error.
2. The changes improve the error framework.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
1. Update existed UT.
2. Pass GA.

Closes #41179 from panbingkun/SPARK-43518.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json  |  5 -
 .../org/apache/spark/sql/errors/QueryExecutionErrors.scala|  6 ++
 .../test/scala/org/apache/spark/sql/types/DecimalSuite.scala  | 11 +++
 3 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index fa838a6da76..edc5a5a66e5 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -3873,11 +3873,6 @@
   "This line should be unreachable."
 ]
   },
-  "_LEGACY_ERROR_TEMP_2029" : {
-"message" : [
-  "Not supported rounding mode: ."
-]
-  },
   "_LEGACY_ERROR_TEMP_2030" : {
 "message" : [
   "Can not handle nested schema yet...  plan ."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 1f3ee517dd2..52e8c7df91e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -514,10 +514,8 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
   messageParameters = Map("err" -> err))
   }
 
-  def unsupportedRoundingMode(roundMode: BigDecimal.RoundingMode.Value): 
SparkRuntimeException = {
-new SparkRuntimeException(
-  errorClass = "_LEGACY_ERROR_TEMP_2029",
-  messageParameters = Map("roundMode" -> roundMode.toString()))
+  def unsupportedRoundingMode(roundMode: BigDecimal.RoundingMode.Value): 
SparkException = {
+SparkException.internalError(s"Not supported rounding mode: 
${roundMode.toString}.")
   }
 
   def resolveCannotHandleNestedSchema(plan: LogicalPlan): 
SparkRuntimeException = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
index 465c25118fa..ab3f831fbcb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
@@ -303,6 +303,17 @@ class DecimalSuite extends SparkFunSuite with 
PrivateMethodTester with SQLHelper
 }
   }
 
+  test("Not supported rounding mode: HALF_DOWN") {
+val d = Decimal(1L, 100, 80)
+checkError(
+  exception = intercept[SparkException] {
+d.toPrecision(5, 50, BigDecimal.RoundingMode.HALF_DOWN)
+  },
+  errorClass = "INTERNAL_ERROR",
+  parameters = Map("message" -> "Not supported rounding mode: HALF_DOWN.")
+)
+  }
+
   test("SPARK-20341: support BigInt's value does not fit in long value range") 
{
 val bigInt = scala.math.BigInt("9223372036854775808")
 val decimal = Decimal.apply(bigInt)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43457][CONNECT][PYTHON] Augument user agent with OS, Python and Spark versions

2023-05-16 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6172615f707 [SPARK-43457][CONNECT][PYTHON] Augument user agent with 
OS, Python and Spark versions
6172615f707 is described below

commit 6172615f70785b71224ecbc797de2f679ab0d593
Author: Niranjan Jayakar 
AuthorDate: Tue May 16 17:36:02 2023 +0900

[SPARK-43457][CONNECT][PYTHON] Augument user agent with OS, Python and 
Spark versions

### What changes were proposed in this pull request?

Augument the user agent string sent over the service to include
operating system and Python version.

### Why are the changes needed?

Including OS, Python and Spark versions in the user agent improves
tracking to see how Spark Connect is used across Python versions
and platforms.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests attached.

Closes #41138 from nija-at/user-agent-info.

Lead-authored-by: Niranjan Jayakar 
Co-authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/client.py   | 11 ++-
 python/pyspark/sql/tests/connect/test_client.py|  6 --
 python/pyspark/sql/tests/connect/test_connect_basic.py | 10 ++
 3 files changed, 20 insertions(+), 7 deletions(-)

diff --git a/python/pyspark/sql/connect/client.py 
b/python/pyspark/sql/connect/client.py
index a2a2cc4cf5e..c1675eac9e1 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -25,6 +25,7 @@ check_dependencies(__name__)
 
 import logging
 import os
+import platform
 import random
 import time
 import urllib.parse
@@ -57,6 +58,7 @@ import grpc
 from google.protobuf import text_format
 from google.rpc import error_details_pb2
 
+from pyspark.version import __version__
 from pyspark.resource.information import ResourceInformation
 from pyspark.sql.connect.conversion import storage_level_to_proto, 
proto_to_storage_level
 import pyspark.sql.connect.proto as pb2
@@ -299,7 +301,14 @@ class ChannelBuilder:
 raise SparkConnectException(
 f"'user_agent' parameter should not exceed 2048 characters, 
found {len} characters."
 )
-return user_agent
+return " ".join(
+[
+user_agent,
+f"spark/{__version__}",
+f"os/{platform.uname().system.lower()}",
+f"python/{platform.python_version()}",
+]
+)
 
 def get(self, key: str) -> Any:
 """
diff --git a/python/pyspark/sql/tests/connect/test_client.py 
b/python/pyspark/sql/tests/connect/test_client.py
index 191a5204bf3..722be1e2882 100644
--- a/python/pyspark/sql/tests/connect/test_client.py
+++ b/python/pyspark/sql/tests/connect/test_client.py
@@ -37,7 +37,7 @@ class SparkConnectClientTestCase(unittest.TestCase):
 client.execute_command(command)
 
 self.assertIsNotNone(mock.req, "ExecutePlan API was not called when 
expected")
-self.assertEqual(mock.req.client_type, "bar")
+self.assertRegex(mock.req.client_type, r"^bar spark/[^ ]+ os/[^ ]+ 
python/[^ ]+$")
 
 def test_user_agent_default(self):
 client = SparkConnectClient("sc://foo/")
@@ -48,7 +48,9 @@ class SparkConnectClientTestCase(unittest.TestCase):
 client.execute_command(command)
 
 self.assertIsNotNone(mock.req, "ExecutePlan API was not called when 
expected")
-self.assertEqual(mock.req.client_type, "_SPARK_CONNECT_PYTHON")
+self.assertRegex(
+mock.req.client_type, r"^_SPARK_CONNECT_PYTHON spark/[^ ]+ os/[^ 
]+ python/[^ ]+$"
+)
 
 def test_properties(self):
 client = SparkConnectClient("sc://foo/;token=bar")
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index b0bc2cba78e..8a83d040207 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -3404,13 +3404,15 @@ class ChannelBuilderTests(unittest.TestCase):
 
 chan = ChannelBuilder("sc://host/;token=abcs")
 self.assertTrue(chan.secure, "specifying a token must set the channel 
to secure")
-self.assertEqual(chan.userAgent, "_SPARK_CONNECT_PYTHON")
+self.assertRegex(
+chan.userAgent, r"^_SPARK_CONNECT_PYTHON spark/[^ ]+ os/[^ ]+ 
python/[^ ]+$"
+)
 chan = ChannelBuilder("sc://host/;use_ssl=abcs")
 self.assertFalse(chan.secure, "Garbage in, false out")
 
 def test_user_agent(self):
 chan = ChannelBuilder("sc://host/;user_agent=Agent123%20%2F3.4")
-self.assertEqual("Agent123 /3.4", chan.userAgent)
+