[spark] branch master updated: [SPARK-43302][SQL][FOLLOWUP] Code cleanup for PythonUDAF
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fddf25a4dd8 [SPARK-43302][SQL][FOLLOWUP] Code cleanup for PythonUDAF fddf25a4dd8 is described below commit fddf25a4dd8029db89287416de39adb27e8643c8 Author: Wenchen Fan AuthorDate: Wed May 17 13:39:27 2023 +0800 [SPARK-43302][SQL][FOLLOWUP] Code cleanup for PythonUDAF ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/40739 to do some code cleanup 1. remove the pattern `PYTHON_UDAF` as it's not used by any rule. 2. add `PythonFuncExpression.evalType` for convenience: catalyst rules (including third-party extensions) may want to get the eval type of a python function, no matter it's UDF or UDAF. 3. update the python profile to use `PythonUDAF.resultId` instead of `AggregateExpression.resultId`, to be consistent with `PythonUDF` ### Why are the changes needed? code cleanup ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #41142 from cloud-fan/follow. Lead-authored-by: Wenchen Fan Co-authored-by: Wenchen Fan Signed-off-by: Kent Yao --- python/pyspark/sql/column.py | 4 python/pyspark/sql/udf.py| 12 +++- .../apache/spark/sql/catalyst/expressions/PythonUDF.scala| 11 ++- .../org/apache/spark/sql/catalyst/trees/TreePatterns.scala | 1 - .../sql/execution/python/UserDefinedPythonFunction.scala | 11 +-- 5 files changed, 26 insertions(+), 13 deletions(-) diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index 49a42406048..3cf59989965 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -69,6 +69,10 @@ def _to_java_column(col: "ColumnOrName") -> JavaObject: return jcol +def _to_java_expr(col: "ColumnOrName") -> JavaObject: +return _to_java_column(col).expr() + + def _to_seq( sc: SparkContext, cols: Iterable["ColumnOrName"], diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 45828187295..87d53266edf 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -30,7 +30,7 @@ from py4j.java_gateway import JavaObject from pyspark import SparkContext from pyspark.profiler import Profiler from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType -from pyspark.sql.column import Column, _to_java_column, _to_seq +from pyspark.sql.column import Column, _to_java_column, _to_java_expr, _to_seq from pyspark.sql.types import ( ArrayType, BinaryType, @@ -419,8 +419,9 @@ class UserDefinedFunction: func.__signature__ = inspect.signature(f) # type: ignore[attr-defined] judf = self._create_judf(func) -jPythonUDF = judf.apply(_to_seq(sc, cols, _to_java_column)) -id = jPythonUDF.expr().resultId().id() +jUDFExpr = judf.builder(_to_seq(sc, cols, _to_java_expr)) +jPythonUDF = judf.fromUDFExpr(jUDFExpr) +id = jUDFExpr.resultId().id() sc.profiler_collector.add_profiler(id, profiler) else: # memory_profiler_enabled f = self.func @@ -436,8 +437,9 @@ class UserDefinedFunction: func.__signature__ = inspect.signature(f) # type: ignore[attr-defined] judf = self._create_judf(func) -jPythonUDF = judf.apply(_to_seq(sc, cols, _to_java_column)) -id = jPythonUDF.expr().resultId().id() +jUDFExpr = judf.builder(_to_seq(sc, cols, _to_java_expr)) +jPythonUDF = judf.fromUDFExpr(jUDFExpr) +id = jUDFExpr.resultId().id() sc.profiler_collector.add_profiler(id, memory_profiler) else: judf = self._judf diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala index 08ffbea5510..8636eb61034 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala @@ -22,7 +22,7 @@ import org.apache.spark.api.python.{PythonEvalType, PythonFunction} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.trees.TreePattern.{PYTHON_UDAF, PYTHON_UDF, TreePattern} +import
[spark] branch master updated (772ef41d6a4 -> 9cb3174a5ef)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 772ef41d6a4 [SPARK-43461][BUILD] Skip compiling useless files when making distribution add 9cb3174a5ef [SPARK-43532][BUILD][TESTS] Upgrade `jdbc` related test dependencies No new revisions were added by this update. Summary of changes: pom.xml | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43461][BUILD] Skip compiling useless files when making distribution
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 772ef41d6a4 [SPARK-43461][BUILD] Skip compiling useless files when making distribution 772ef41d6a4 is described below commit 772ef41d6a40ed219bcdf8a51691391fa6eba75d Author: Yuming Wang AuthorDate: Wed May 17 11:33:26 2023 +0800 [SPARK-43461][BUILD] Skip compiling useless files when making distribution ### What changes were proposed in this pull request? This PR add more skip properties when making distribution: - `-Dmaven.javadoc.skip=true` to skip generating javadoc - `-Dmaven.scaladoc.skip=true` to skip generating scaladoc. Please see: https://davidb.github.io/scala-maven-plugin/doc-jar-mojo.html#skip - `-Dmaven.source.skip` to skip generating sources.jar - `-Dcyclonedx.skip=true` to skip making bom. Please see: https://cyclonedx.github.io/cyclonedx-maven-plugin/makeBom-mojo.html#skip ### Why are the changes needed? Reduce time spent on making distribution. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual test: ```sh ./dev/make-distribution.sh --tgz --pip -Phadoop-3 -Phive -Phive-thriftserver -Pyarn -Phadoop-provided ``` Before this PR | After this PR -- | -- 43 min total from scheduled to completion | 23 min total from scheduled to completion Closes #41141 from wangyum/SPARK-43461. Lead-authored-by: Yuming Wang Co-authored-by: Yuming Wang Signed-off-by: yangjie01 --- dev/make-distribution.sh | 8 +++- pom.xml | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh index 948ee19fbac..ef7c010e930 100755 --- a/dev/make-distribution.sh +++ b/dev/make-distribution.sh @@ -166,7 +166,13 @@ export MAVEN_OPTS="${MAVEN_OPTS:--Xss128m -Xmx4g -XX:ReservedCodeCacheSize=128m} # Store the command as an array because $MVN variable might have spaces in it. # Normal quoting tricks don't work. # See: http://mywiki.wooledge.org/BashFAQ/050 -BUILD_COMMAND=("$MVN" clean package -DskipTests $@) +BUILD_COMMAND=("$MVN" clean package \ +-DskipTests \ +-Dmaven.javadoc.skip=true \ +-Dmaven.scaladoc.skip=true \ +-Dmaven.source.skip \ +-Dcyclonedx.skip=true \ +$@) # Actually build the jar echo -e "\nBuilding with..." diff --git a/pom.xml b/pom.xml index 1f6305b77ab..7f37f8f1a3a 100644 --- a/pom.xml +++ b/pom.xml @@ -175,6 +175,7 @@ See: SPARK-36547, SPARK-38394. --> 4.8.0 +false 2.15.0 true @@ -2851,6 +2852,7 @@ true true incremental +${maven.scaladoc.skip} -unchecked -deprecation - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b04e2004558 -> 5b8721b83b6)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from b04e2004558 [SPARK-43525][BUILD] Import `scala.collection` instead of `collection` add 5b8721b83b6 [SPARK-43531][CONNECT][PYTHON][TESTS] Enable more parity tests for Pandas UDFs No new revisions were added by this update. Summary of changes: .../connect/test_parity_pandas_cogrouped_map.py| 45 ++ .../connect/test_parity_pandas_grouped_map.py | 54 ++- .../sql/tests/connect/test_parity_pandas_udf.py| 9 +- .../sql/tests/pandas/test_pandas_cogrouped_map.py | 50 -- .../sql/tests/pandas/test_pandas_grouped_map.py| 174 - python/pyspark/sql/tests/pandas/test_pandas_udf.py | 91 +-- 6 files changed, 210 insertions(+), 213 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43525][BUILD] Import `scala.collection` instead of `collection`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b04e2004558 [SPARK-43525][BUILD] Import `scala.collection` instead of `collection` b04e2004558 is described below commit b04e2004558550b39b0901c20b98af5998a7 Author: panbingkun AuthorDate: Wed May 17 09:24:10 2023 +0900 [SPARK-43525][BUILD] Import `scala.collection` instead of `collection` ### What changes were proposed in this pull request? - The pr aims to add check rules, importing in `collection`(instead with `scala.collection`) format is not allowed. - Adjust the code import according to the above rules. ### Why are the changes needed? I found that some developers sometimes write `collection.JavaConverters._` when import `JavaConverters._`, while others write `scala.JavaConverters._` Actually, they all belong to the `scala group`, so it is necessary for us to specify a clearer rule to make the code style more consistent ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #41185 from panbingkun/SPARK-43525. Authored-by: panbingkun Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala | 3 ++- .../scala/org/apache/spark/sql/connect/service/SessionHolder.scala | 2 +- .../spark/sql/connect/service/AddArtifactsHandlerSuite.scala | 7 --- .../org/apache/spark/streaming/kinesis/KinesisInputDStream.scala | 2 +- .../spark/streaming/kinesis/KinesisInputDStreamBuilderSuite.scala | 3 ++- core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala | 2 +- .../protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala | 2 +- .../spark/status/protobuf/ApplicationInfoWrapperSerializer.scala | 2 +- .../spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala | 2 +- .../apache/spark/status/protobuf/JobDataWrapperSerializer.scala| 2 +- .../apache/spark/status/protobuf/KVStoreProtobufSerializer.scala | 2 +- .../apache/spark/status/protobuf/StageDataWrapperSerializer.scala | 3 ++- .../test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala| 2 +- core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala| 4 ++-- scalastyle-config.xml | 5 + .../spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala | 2 +- .../status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala | 2 +- .../src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 3 ++- .../org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 +- .../sql/execution/datasources/parquet/ParquetVectorizedSuite.scala | 2 +- 20 files changed, 32 insertions(+), 22 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala index 5db40806d18..506ad3625b0 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala @@ -20,7 +20,8 @@ import java.io.InputStream import java.nio.file.{Files, Path, Paths} import java.util.concurrent.TimeUnit -import collection.JavaConverters._ +import scala.collection.JavaConverters._ + import com.google.protobuf.ByteString import io.grpc.{ManagedChannel, Server} import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 613d7a38e9e..ca7fa0d42c5 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.connect.service import java.util.UUID import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} -import collection.JavaConverters._ +import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.spark.connect.proto diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala index 4a4e00ad997..9a4c029bb74 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/AddArtifactsHandlerSuite.scala +++
[spark] branch master updated: [SPARK-43528][SQL][PYTHON] Support duplicated field names in createDataFrame with pandas DataFrame
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 71bac156b0d [SPARK-43528][SQL][PYTHON] Support duplicated field names in createDataFrame with pandas DataFrame 71bac156b0d is described below commit 71bac156b0df2ce9e56c1c1690806a4b40ffd615 Author: Takuya UESHIN AuthorDate: Wed May 17 09:12:16 2023 +0900 [SPARK-43528][SQL][PYTHON] Support duplicated field names in createDataFrame with pandas DataFrame ### What changes were proposed in this pull request? Support duplicated field names in `createDataFrame` with pandas DataFrame. For with Arrow, without Arrow, and Spark Connect: ```py >>> spark.createDataFrame(pdf, schema).show() ++---+ |struct_0| struct_1| ++---+ | {a, 1}|{2, 3, b, 4, c}| | {x, 6}|{7, 8, y, 9, z}| ++---+ ``` ### Why are the changes needed? If there are duplicated field names, `createDataFrame` with pandas DataFrame fallbacks to without Arrow, or fails in Spark Connect. ```py >>> import pandas as pd >>> from pyspark.sql.types import * >>> >>> schema = ( ... StructType() ... .add("struct_0", StructType().add("x", StringType()).add("x", IntegerType())) ... .add( ... "struct_1", ... StructType() ... .add("a", IntegerType()) ... .add("x", IntegerType()) ... .add("x", StringType()) ... .add("y", IntegerType()) ... .add("y", StringType()), ... ) ... ) >>> >>> data = [Row(Row("a", 1), Row(2, 3, "b", 4, "c")), Row(Row("x", 6), Row(7, 8, "y", 9, "z"))] >>> pdf = pd.DataFrame.from_records(data, columns=schema.names) ``` - Without Arrow: Works fine. ```py >>> spark.createDataFrame(pdf, schema).show() ++---+ |struct_0| struct_1| ++---+ | {a, 1}|{2, 3, b, 4, c}| | {x, 6}|{7, 8, y, 9, z}| ++---+ ``` - With Arrow: Works with fallback. ```py >>> spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True) >>> spark.createDataFrame(pdf, schema).show() /.../pyspark/sql/pandas/conversion.py:347: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below: [DUPLICATED_FIELD_NAME_IN_ARROW_STRUCT] Duplicated field names in Arrow Struct are not allowed, got [x, x]. Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true. warn(msg) ++---+ |struct_0| struct_1| ++---+ | {a, 1}|{2, 3, b, 4, c}| | {x, 6}|{7, 8, y, 9, z}| ++---+ ``` - Spark Connect Fails. ```py >>> spark.createDataFrame(pdf, schema).show() ... Traceback (most recent call last): ... pyspark.errors.exceptions.connect.IllegalArgumentException: not all nodes and buffers were consumed. ... ``` ### Does this PR introduce _any_ user-facing change? The duplicated field names will work. ### How was this patch tested? Added the related test. Closes #41190 from ueshin/issues/SPARK-43528/from_pandas. Authored-by: Takuya UESHIN Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/conversion.py | 28 + python/pyspark/sql/connect/session.py | 8 +- python/pyspark/sql/pandas/conversion.py| 14 ++- python/pyspark/sql/pandas/serializers.py | 88 --- python/pyspark/sql/pandas/types.py | 125 + .../pyspark/sql/tests/connect/test_parity_arrow.py | 3 + python/pyspark/sql/tests/test_arrow.py | 28 + .../org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- .../sql/execution/arrow/ArrowConverters.scala | 8 +- .../sql/execution/arrow/ArrowConvertersSuite.scala | 4 +- 10 files changed, 254 insertions(+), 54 deletions(-) diff --git a/python/pyspark/sql/connect/conversion.py b/python/pyspark/sql/connect/conversion.py index a7ea88fb007..3cc301c38ea 100644 --- a/python/pyspark/sql/connect/conversion.py +++ b/python/pyspark/sql/connect/conversion.py @@ -44,7 +44,7 @@ from pyspark.sql.types import ( from pyspark.storagelevel import StorageLevel from pyspark.sql.connect.types import to_arrow_schema import pyspark.sql.connect.proto as pb2 -from pyspark.sql.pandas.types import _dedup_names +from pyspark.sql.pandas.types import _dedup_names, _deduplicate_field_names from typing import (
[spark] branch master updated (55735055d6c -> a232083f50d)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 55735055d6c [SPARK-43360][SS][CONNECT] Scala client StreamingQueryManager add a232083f50d [SPARK-43527][PYTHON] Fix `catalog.listCatalogs` in PySpark No new revisions were added by this update. Summary of changes: python/pyspark/sql/catalog.py | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-43527][PYTHON] Fix `catalog.listCatalogs` in PySpark
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 7b148f0f7cc [SPARK-43527][PYTHON] Fix `catalog.listCatalogs` in PySpark 7b148f0f7cc is described below commit 7b148f0f7ccdecc3d23ecf5ec8bb6818a9a5d3ac Author: Ruifeng Zheng AuthorDate: Wed May 17 08:31:28 2023 +0900 [SPARK-43527][PYTHON] Fix `catalog.listCatalogs` in PySpark ### What changes were proposed in this pull request? Fix `catalog.listCatalogs` in PySpark ### Why are the changes needed? existing implementation outputs incorrect results ### Does this PR introduce _any_ user-facing change? yes before this PR: ``` In [1]: spark.catalog.listCatalogs() Out[1]: [CatalogMetadata(name=, description=)] ``` after this PR: ``` In [1]: spark.catalog.listCatalogs() Out[1]: [CatalogMetadata(name='spark_catalog', description=None)] ``` ### How was this patch tested? added doctest Closes #41186 from zhengruifeng/py_list_catalog. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon (cherry picked from commit a232083f50ddfdc81f2027fd3ffa89dfaa3ba199) Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/catalog.py | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index ca93a4faec6..17d3fad9c9e 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -126,12 +126,19 @@ class Catalog: --- list A list of :class:`CatalogMetadata`. + +Examples + +>>> spark.catalog.listCatalogs() +[CatalogMetadata(name='spark_catalog', description=None)] """ iter = self._jcatalog.listCatalogs().toLocalIterator() catalogs = [] while iter.hasNext(): jcatalog = iter.next() -catalogs.append(CatalogMetadata(name=jcatalog.name, description=jcatalog.description)) +catalogs.append( +CatalogMetadata(name=jcatalog.name(), description=jcatalog.description()) +) return catalogs def currentDatabase(self) -> str: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (56cfd40e74d -> 55735055d6c)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 56cfd40e74d [SPARK-42958][CONNECT] Refactor `connect-jvm-client-mima-check` to support mima check with avro module add 55735055d6c [SPARK-43360][SS][CONNECT] Scala client StreamingQueryManager No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/sql/SparkSession.scala | 3 + .../spark/sql/streaming/StreamingQuery.scala | 17 +++ .../sql/streaming/StreamingQueryManager.scala | 147 + .../CheckConnectJvmClientCompatibility.scala | 4 + .../spark/sql/streaming/StreamingQuerySuite.scala | 30 + .../src/main/protobuf/spark/connect/commands.proto | 2 +- .../sql/connect/planner/SparkConnectPlanner.scala | 40 +++--- python/pyspark/sql/connect/proto/commands_pb2.py | 22 +-- python/pyspark/sql/connect/proto/commands_pb2.pyi | 13 +- 9 files changed, 241 insertions(+), 37 deletions(-) create mode 100644 connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-42958][CONNECT] Refactor `connect-jvm-client-mima-check` to support mima check with avro module
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 56cfd40e74d [SPARK-42958][CONNECT] Refactor `connect-jvm-client-mima-check` to support mima check with avro module 56cfd40e74d is described below commit 56cfd40e74d56362a425c5e5d5d9e7260176 Author: YangJie AuthorDate: Tue May 16 15:50:12 2023 -0400 [SPARK-42958][CONNECT] Refactor `connect-jvm-client-mima-check` to support mima check with avro module ### What changes were proposed in this pull request? This pr refactor `connect-jvm-client-mima-check` and `CheckConnectJvmClientCompatibility` to support mima check between `connect-client-jvm` and `avro` module. ### Why are the changes needed? Do mima check between `connect-client-jvm` and `avro` module. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #40605 from LuciferYang/SPARK-42958. Lead-authored-by: YangJie Co-authored-by: yangjie01 Signed-off-by: Herman van Hovell --- .../CheckConnectJvmClientCompatibility.scala | 109 +++-- dev/connect-jvm-client-mima-check | 10 +- 2 files changed, 83 insertions(+), 36 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index f9674ac38cd..ad99342e6e6 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -62,27 +62,25 @@ object CheckConnectJvmClientCompatibility { "spark-connect-client-jvm-assembly", "spark-connect-client-jvm") val sqlJar: File = findJar("sql/core", "spark-sql", "spark-sql") - val problems = checkMiMaCompatibility(clientJar, sqlJar) - if (problems.nonEmpty) { -resultWriter.write(s"ERROR: Comparing client jar: $clientJar and and sql jar: $sqlJar \n") -resultWriter.write(s"problems: \n") -resultWriter.write(s"${problems.map(p => p.description("client")).mkString("\n")}") -resultWriter.write("\n") -resultWriter.write( - "Exceptions to binary compatibility can be added in " + -"'CheckConnectJvmClientCompatibility#checkMiMaCompatibility'\n") - } + val problemsWithSqlModule = checkMiMaCompatibilityWithSqlModule(clientJar, sqlJar) + appendMimaCheckErrorMessageIfNeeded( +resultWriter, +problemsWithSqlModule, +clientJar, +sqlJar, +"Sql") + + val avroJar: File = findJar("connector/avro", "spark-avro", "spark-avro") + val problemsWithAvroModule = checkMiMaCompatibilityWithAvroModule(clientJar, sqlJar) + appendMimaCheckErrorMessageIfNeeded( +resultWriter, +problemsWithAvroModule, +clientJar, +avroJar, +"Avro") + val incompatibleApis = checkDatasetApiCompatibility(clientJar, sqlJar) - if (incompatibleApis.nonEmpty) { -resultWriter.write( - "ERROR: The Dataset apis only exist in the connect client " + -"module and not belong to the sql module include: \n") -resultWriter.write(incompatibleApis.mkString("\n")) -resultWriter.write("\n") -resultWriter.write( - "Exceptions can be added to exceptionMethods in " + - "'CheckConnectJvmClientCompatibility#checkDatasetApiCompatibility'\n") - } + appendIncompatibleDatasetApisErrorMessageIfNeeded(resultWriter, incompatibleApis) } catch { case e: Throwable => println(e.getMessage) @@ -94,16 +92,17 @@ object CheckConnectJvmClientCompatibility { } } - /** - * MiMa takes an old jar (sql jar) and a new jar (client jar) as inputs and then reports all - * incompatibilities found in the new jar. The incompatibility result is then filtered using - * include and exclude rules. Include rules are first applied to find all client classes that - * need to be checked. Then exclude rules are applied to filter out all unsupported methods in - * the client classes. - */ - private def checkMiMaCompatibility(clientJar: File, sqlJar: File): List[Problem] = { -val mima = new MiMaLib(Seq(clientJar, sqlJar)) -val allProblems = mima.collectProblems(sqlJar, clientJar, List.empty) + private def checkMiMaCompatibilityWithAvroModule( + clientJar: File, + avroJar: File): List[Problem] = { +val includedRules =
[spark] branch branch-3.4 updated: [SPARK-43043][CORE] Improve the performance of MapOutputTracker.updateMapOutput
This is an automated email from the ASF dual-hosted git repository. jiangxb1987 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new f68ece9e607 [SPARK-43043][CORE] Improve the performance of MapOutputTracker.updateMapOutput f68ece9e607 is described below commit f68ece9e6074cecdaf74ad9b39eae3c7dc2cfaf1 Author: Xingbo Jiang AuthorDate: Tue May 16 11:34:30 2023 -0700 [SPARK-43043][CORE] Improve the performance of MapOutputTracker.updateMapOutput ### What changes were proposed in this pull request? The PR changes the implementation of MapOutputTracker.updateMapOutput() to search for the MapStatus under the help of a mapping from mapId to mapIndex, previously it was performing a linear search, which would become performance bottleneck if a large proportion of all blocks in the map are migrated. ### Why are the changes needed? To avoid performance bottleneck when block decommission is enabled and a lot of blocks are migrated within a short time window. ### Does this PR introduce _any_ user-facing change? No, it's pure performance improvement. ### How was this patch tested? Manually test. Closes #40690 from jiangxb1987/SPARK-43043. Lead-authored-by: Xingbo Jiang Co-authored-by: Jiang Xingbo Signed-off-by: Xingbo Jiang (cherry picked from commit 66a2eb8f8957c22c69519b39be59beaaf931822b) Signed-off-by: Xingbo Jiang --- .../scala/org/apache/spark/MapOutputTracker.scala | 26 +- .../apache/spark/util/collection/OpenHashMap.scala | 18 +++ .../spark/util/collection/OpenHashMapSuite.scala | 18 +++ 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index fade0b86dd8..2dd3a903ee2 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -42,6 +42,7 @@ import org.apache.spark.scheduler.{MapStatus, MergeStatus, ShuffleOutputStatus} import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId, ShuffleMergedBlockId} import org.apache.spark.util._ +import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} /** @@ -147,6 +148,12 @@ private class ShuffleStatus( private[this] var shufflePushMergerLocations: Seq[BlockManagerId] = Seq.empty + /** + * Mapping from a mapId to the mapIndex, this is required to reduce the searching overhead within + * the function updateMapOutput(mapId, bmAddress). + */ + private[this] val mapIdToMapIndex = new OpenHashMap[Long, Int]() + /** * Register a map output. If there is already a registered location for the map output then it * will be replaced by the new location. @@ -157,6 +164,14 @@ private class ShuffleStatus( invalidateSerializedMapOutputStatusCache() } mapStatuses(mapIndex) = status +mapIdToMapIndex(status.mapId) = mapIndex + } + + /** + * Get the map output that corresponding to a given mapId. + */ + def getMapStatus(mapId: Long): Option[MapStatus] = withReadLock { +mapIdToMapIndex.get(mapId).map(mapStatuses(_)) } /** @@ -164,15 +179,16 @@ private class ShuffleStatus( */ def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = withWriteLock { try { - val mapStatusOpt = mapStatuses.find(x => x != null && x.mapId == mapId) + val mapIndex = mapIdToMapIndex.get(mapId) + val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_)) mapStatusOpt match { case Some(mapStatus) => logInfo(s"Updating map output for ${mapId} to ${bmAddress}") mapStatus.updateLocation(bmAddress) invalidateSerializedMapOutputStatusCache() case None => - val index = mapStatusesDeleted.indexWhere(x => x != null && x.mapId == mapId) - if (index >= 0 && mapStatuses(index) == null) { + if (mapIndex.map(mapStatusesDeleted).exists(_.mapId == mapId)) { +val index = mapIndex.get val mapStatus = mapStatusesDeleted(index) mapStatus.updateLocation(bmAddress) mapStatuses(index) = mapStatus @@ -1133,9 +1149,7 @@ private[spark] class MapOutputTrackerMaster( */ def getMapOutputLocation(shuffleId: Int, mapId: Long): Option[BlockManagerId] = { shuffleStatuses.get(shuffleId).flatMap { shuffleStatus => - shuffleStatus.withMapStatuses { mapStatues => -mapStatues.filter(_ != null).find(_.mapId == mapId).map(_.location) - } + shuffleStatus.getMapStatus(mapId).map(_.location) } }
[spark] branch master updated: [SPARK-43043][CORE] Improve the performance of MapOutputTracker.updateMapOutput
This is an automated email from the ASF dual-hosted git repository. jiangxb1987 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 66a2eb8f895 [SPARK-43043][CORE] Improve the performance of MapOutputTracker.updateMapOutput 66a2eb8f895 is described below commit 66a2eb8f8957c22c69519b39be59beaaf931822b Author: Xingbo Jiang AuthorDate: Tue May 16 11:34:30 2023 -0700 [SPARK-43043][CORE] Improve the performance of MapOutputTracker.updateMapOutput ### What changes were proposed in this pull request? The PR changes the implementation of MapOutputTracker.updateMapOutput() to search for the MapStatus under the help of a mapping from mapId to mapIndex, previously it was performing a linear search, which would become performance bottleneck if a large proportion of all blocks in the map are migrated. ### Why are the changes needed? To avoid performance bottleneck when block decommission is enabled and a lot of blocks are migrated within a short time window. ### Does this PR introduce _any_ user-facing change? No, it's pure performance improvement. ### How was this patch tested? Manually test. Closes #40690 from jiangxb1987/SPARK-43043. Lead-authored-by: Xingbo Jiang Co-authored-by: Jiang Xingbo Signed-off-by: Xingbo Jiang --- .../scala/org/apache/spark/MapOutputTracker.scala | 26 +- .../apache/spark/util/collection/OpenHashMap.scala | 18 +++ .../spark/util/collection/OpenHashMapSuite.scala | 18 +++ 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 5ad62159d24..9a5cf1da9e4 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -42,6 +42,7 @@ import org.apache.spark.scheduler.{MapStatus, MergeStatus, ShuffleOutputStatus} import org.apache.spark.shuffle.MetadataFetchFailedException import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockId, ShuffleMergedBlockId} import org.apache.spark.util._ +import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} /** @@ -147,6 +148,12 @@ private class ShuffleStatus( private[this] var shufflePushMergerLocations: Seq[BlockManagerId] = Seq.empty + /** + * Mapping from a mapId to the mapIndex, this is required to reduce the searching overhead within + * the function updateMapOutput(mapId, bmAddress). + */ + private[this] val mapIdToMapIndex = new OpenHashMap[Long, Int]() + /** * Register a map output. If there is already a registered location for the map output then it * will be replaced by the new location. @@ -157,6 +164,14 @@ private class ShuffleStatus( invalidateSerializedMapOutputStatusCache() } mapStatuses(mapIndex) = status +mapIdToMapIndex(status.mapId) = mapIndex + } + + /** + * Get the map output that corresponding to a given mapId. + */ + def getMapStatus(mapId: Long): Option[MapStatus] = withReadLock { +mapIdToMapIndex.get(mapId).map(mapStatuses(_)) } /** @@ -164,15 +179,16 @@ private class ShuffleStatus( */ def updateMapOutput(mapId: Long, bmAddress: BlockManagerId): Unit = withWriteLock { try { - val mapStatusOpt = mapStatuses.find(x => x != null && x.mapId == mapId) + val mapIndex = mapIdToMapIndex.get(mapId) + val mapStatusOpt = mapIndex.map(mapStatuses(_)).flatMap(Option(_)) mapStatusOpt match { case Some(mapStatus) => logInfo(s"Updating map output for ${mapId} to ${bmAddress}") mapStatus.updateLocation(bmAddress) invalidateSerializedMapOutputStatusCache() case None => - val index = mapStatusesDeleted.indexWhere(x => x != null && x.mapId == mapId) - if (index >= 0 && mapStatuses(index) == null) { + if (mapIndex.map(mapStatusesDeleted).exists(_.mapId == mapId)) { +val index = mapIndex.get val mapStatus = mapStatusesDeleted(index) mapStatus.updateLocation(bmAddress) mapStatuses(index) = mapStatus @@ -1137,9 +1153,7 @@ private[spark] class MapOutputTrackerMaster( */ def getMapOutputLocation(shuffleId: Int, mapId: Long): Option[BlockManagerId] = { shuffleStatuses.get(shuffleId).flatMap { shuffleStatus => - shuffleStatus.withMapStatuses { mapStatues => -mapStatues.filter(_ != null).find(_.mapId == mapId).map(_.location) - } + shuffleStatus.getMapStatus(mapId).map(_.location) } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
[spark] branch master updated: [SPARK-43359][SQL] Delete from Hive table should throw "UNSUPPORTED_FEATURE.TABLE_OPERATION"
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 530dea61c88 [SPARK-43359][SQL] Delete from Hive table should throw "UNSUPPORTED_FEATURE.TABLE_OPERATION" 530dea61c88 is described below commit 530dea61c88780c82b2e3e624fbe2a405e602731 Author: panbingkun AuthorDate: Tue May 16 11:21:42 2023 -0700 [SPARK-43359][SQL] Delete from Hive table should throw "UNSUPPORTED_FEATURE.TABLE_OPERATION" ### What changes were proposed in this pull request? The pr aims to fix error exception about 'DELETE from Hive table' ### Why are the changes needed? Proper names of error classes should improve user experience with Spark SQL. **BEFORE** ``` scala> sql("delete from t") org.apache.spark.SparkException: [INTERNAL_ERROR] Unexpected table relation: HiveTableRelation [`spark_catalog`.`default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [a#0], Partition Cols: []] ``` **AFTER** ``` scala> sql("delete from t") org.apache.spark.sql.AnalysisException: [UNSUPPORTED_FEATURE.TABLE_OPERATION] The feature is not supported: Table `spark_catalog`.`default`.`t` does not support DELETE. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by "spark.sql.catalog". ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA & Add new UT. Closes #41172 from panbingkun/SPARK-43359. Authored-by: panbingkun Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/sql/hive/HiveStrategies.scala | 7 ++- .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 16 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 6c5646a2416..b2438d38520 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} +import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution._ @@ -178,6 +178,11 @@ object HiveAnalysis extends Rule[LogicalPlan] { if (overwrite) DDLUtils.verifyNotReadPath(child, outputPath) InsertIntoHiveDirCommand(isLocal, storage, child, overwrite, child.output.map(_.name)) + +case DeleteFromTable(SubqueryAlias(_, HiveTableRelation(table, _, _, _, _)), _) => + throw QueryCompilationErrors.unsupportedTableOperationError( +table.identifier, +"DELETE") } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index f17ec922b9b..6fd17ed5d9e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -3098,4 +3098,20 @@ class HiveDDLSuite "CREATE TABLE tab (c1 int) PARTITIONED BY (c1) STORED AS PARQUET", "Cannot use all columns for partition columns") } + + test("SPARK-43359: Delete table not allowed") { +val tbl = "t1" +withTable(tbl) { + sql(s"CREATE TABLE $tbl(c1 INT)") + val e = intercept[AnalysisException] { +sql(s"DELETE FROM $tbl WHERE c1 = 1") + } + checkError(e, +errorClass = "UNSUPPORTED_FEATURE.TABLE_OPERATION", +parameters = Map( + "tableName" -> s"`spark_catalog`.`default`.`$tbl`", + "operation" -> "DELETE") + ) +} + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (6f377efd3f3 -> 60aa1127c51)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 6f377efd3f3 [SPARK-38469][CORE] Use error class in org.apache.spark.network add 60aa1127c51 [SPARK-43520][BUILD][TESTS] Upgrade `mysql-connector-java` to 8.0.33 No new revisions were added by this update. Summary of changes: pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-38469][CORE] Use error class in org.apache.spark.network
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6f377efd3f3 [SPARK-38469][CORE] Use error class in org.apache.spark.network 6f377efd3f3 is described below commit 6f377efd3f3b8db1909349a7c134929a2ec0bf60 Author: Bo Zhang AuthorDate: Tue May 16 19:13:20 2023 +0300 [SPARK-38469][CORE] Use error class in org.apache.spark.network ### What changes were proposed in this pull request? This PR aims to change exceptions created in package org.apache.spark.netrowk to use error class. This also adds an error class INTERNAL_ERROR_NETWORK and uses that for the internal errors in the package. ### Why are the changes needed? This is to move exceptions created in package org.apache.spark.network to error class. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #41140 from bozhang2820/spark-38469. Lead-authored-by: Bo Zhang Co-authored-by: Bo Zhang Signed-off-by: Max Gekk --- .../main/scala/org/apache/spark/SparkException.scala | 3 ++- core/src/main/resources/error/error-classes.json | 6 ++ .../spark/network/netty/NettyBlockRpcServer.scala | 19 --- .../network/netty/NettyBlockTransferService.scala | 2 +- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala b/common/utils/src/main/scala/org/apache/spark/SparkException.scala index 4abf0fdf498..feb7bf5b66e 100644 --- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala +++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala @@ -118,7 +118,8 @@ private[spark] case class SparkUserAppException(exitCode: Int) * Exception thrown when the relative executor to access is dead. */ private[spark] case class ExecutorDeadException(message: String) - extends SparkException(message) + extends SparkException(errorClass = "INTERNAL_ERROR_NETWORK", +messageParameters = Map("message" -> message), cause = null) /** * Exception thrown when Spark returns different result after upgrading to a new version. diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index edc5a5a66e5..24f972a5006 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -830,6 +830,12 @@ ], "sqlState" : "XX000" }, + "INTERNAL_ERROR_NETWORK" : { +"message" : [ + "" +], +"sqlState" : "XX000" + }, "INTERVAL_ARITHMETIC_OVERFLOW" : { "message" : [ "." diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index f2a1fe49fcf..16ad848a326 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import scala.collection.JavaConverters._ import scala.reflect.ClassTag +import org.apache.spark.SparkException import org.apache.spark.internal.Logging import org.apache.spark.network.BlockDataManager import org.apache.spark.network.buffer.NioManagedBuffer @@ -93,8 +94,8 @@ class NettyBlockRpcServer( } else { val startAndEndId = fetchShuffleBlocks.reduceIds(index) if (startAndEndId.length != 2) { - throw new IllegalStateException(s"Invalid shuffle fetch request when batch mode " + -s"is enabled: $fetchShuffleBlocks") + throw SparkException.internalError("Invalid shuffle fetch request when batch mode " + +s"is enabled: $fetchShuffleBlocks", category = "NETWORK") } Array(blockManager.getLocalBlockData( ShuffleBlockBatchId( @@ -125,8 +126,10 @@ class NettyBlockRpcServer( if (blockStored) { responseContext.onSuccess(ByteBuffer.allocate(0)) } else { - val exception = new Exception(s"Upload block for $blockId failed. This mostly happens " + -s"when there is not sufficient space available to store the block.") + val exception = SparkException.internalError( +s"Upload block for $blockId failed. This mostly happens " + +"when there is not sufficient space available to store the block.", +category = "NETWORK") responseContext.onFailure(exception) } @@ -137,13 +140,15 @@ class NettyBlockRpcServer( val errorMsg = "Invalid GetLocalDirsForExecutors request: " + s"${if (isIncorrectAppId) s"incorrect application id:
[spark] branch master updated: [SPARK-43512][SS][TESTS] Update StateStoreOperationsBenchmark to reflect updates to RocksDB usage as state store provider
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 28a2a2e08ee [SPARK-43512][SS][TESTS] Update StateStoreOperationsBenchmark to reflect updates to RocksDB usage as state store provider 28a2a2e08ee is described below commit 28a2a2e08ee90f6ca633fb95a7c44721e331d17b Author: Anish Shrigondekar AuthorDate: Tue May 16 08:55:58 2023 -0700 [SPARK-43512][SS][TESTS] Update StateStoreOperationsBenchmark to reflect updates to RocksDB usage as state store provider ### What changes were proposed in this pull request? Update StateStoreOperationsBenchmark to reflect updates to RocksDB usage as state store provider ### Why are the changes needed? Need the changes to unblock RocksDB JNI upgrade ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Ran it locally and using Github Actions. Run now takes ~40-50mins Closes #41175 from anishshri-db/task/SPARK-43512. Authored-by: Anish Shrigondekar Signed-off-by: Dongjoon Hyun --- ...StoreBasicOperationsBenchmark-jdk11-results.txt | 169 + ...StoreBasicOperationsBenchmark-jdk17-results.txt | 199 +++-- .../StateStoreBasicOperationsBenchmark-results.txt | 199 +++-- .../StateStoreBasicOperationsBenchmark.scala | 12 +- 4 files changed, 180 insertions(+), 399 deletions(-) diff --git a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk11-results.txt b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk11-results.txt index f75157f59ba..5ba8d824b71 100644 --- a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk11-results.txt +++ b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk11-results.txt @@ -2,182 +2,109 @@ put rows -OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1035-azure +OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1037-azure Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz putting 1 rows (1 rows to overwrite - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative --- -In-memory9 11 1 1.2 869.5 1.0X -RocksDB (trackTotalNumberOfRows: true) 55 59 1 0.25544.5 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 17 1 0.71466.2 0.6X +In-memory9 10 1 1.1 894.3 1.0X +RocksDB (trackTotalNumberOfRows: true) 70 75 3 0.16964.6 0.1X +RocksDB (trackTotalNumberOfRows: false) 23 26 1 0.42342.7 0.4X -OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1035-azure -Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz -putting 1 rows (7500 rows to overwrite - rate 75): Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative -- -In-memory 9 10 1 1.2 851.0 1.0X -RocksDB (trackTotalNumberOfRows: true)52 55 1 0.25168.1 0.2X -RocksDB (trackTotalNumberOfRows: false) 15 17 1 0.71521.9 0.6X - -OpenJDK 64-Bit Server VM 11.0.18+10 on Linux 5.15.0-1035-azure +OpenJDK 64-Bit Server VM 11.0.19+7 on Linux 5.15.0-1037-azure Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz putting 1 rows (5000 rows to overwrite - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative - -In-memory 9 10 1 1.2 864.1 1.0X -RocksDB (trackTotalNumberOfRows: true)46 49 1 0.24568.5 0.2X -RocksDB (trackTotalNumberOfRows: false) 16 17 1 0.6
[spark] branch master updated: [SPARK-42604][CONNECT][FOLLOWUP] Remove `typedlit/typedLit` `ProblemFilters.exclude` rule from mima check
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 606193301bf [SPARK-42604][CONNECT][FOLLOWUP] Remove `typedlit/typedLit` `ProblemFilters.exclude` rule from mima check 606193301bf is described below commit 606193301bf361fc6a2b763fab074d766aeb52a4 Author: yangjie01 AuthorDate: Tue May 16 21:34:15 2023 +0800 [SPARK-42604][CONNECT][FOLLOWUP] Remove `typedlit/typedLit` `ProblemFilters.exclude` rule from mima check ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/40355 has implemented the `functions#typedlit ` and `functions#typedLit `, so this pr remove the corresponding `ProblemFilters.exclude` rule from `CheckConnectJvmClientCompatibility` ### Why are the changes needed? Remove `unnecessary` `ProblemFilters.exclude` rule. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual check `dev/connect-jvm-client-mima-check` passed Closes #41183 from LuciferYang/SPARK-42604-FOLLOWUP. Authored-by: yangjie01 Signed-off-by: yangjie01 --- .../spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index 32a44c350d9..f9674ac38cd 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -177,8 +177,6 @@ object CheckConnectJvmClientCompatibility { ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.callUDF"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.unwrap_udt"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.udaf"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.typedlit"), - ProblemFilters.exclude[Problem]("org.apache.spark.sql.functions.typedLit"), // KeyValueGroupedDataset ProblemFilters.exclude[Problem]( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-39281][SQL] Speed up Timestamp type inference with legacy format in JSON/CSV data source
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3192bbd2958 [SPARK-39281][SQL] Speed up Timestamp type inference with legacy format in JSON/CSV data source 3192bbd2958 is described below commit 3192bbd29585607d43d0819c6c2d3ac00180261a Author: Jia Fan AuthorDate: Tue May 16 15:59:01 2023 +0300 [SPARK-39281][SQL] Speed up Timestamp type inference with legacy format in JSON/CSV data source ### What changes were proposed in this pull request? Follow up https://github.com/apache/spark/pull/36562 , performance improvement when Timestamp type inference with legacy format. In the current implementation of CSV/JSON data source, the schema inference with legacy format relies on methods that will throw exceptions if the fields can't convert as some data types . Throwing and catching exceptions can be slow. We can improve it by creating methods that return optional results instead. The optimization of DefaultTimestampFormatter has been implemented in https://github.com/apache/spark/pull/36562 , this PR adds the optimization of legacy format. The basic logic is to prevent the formatter from throwing exceptions, and then use catch to determine whether the parsing is successful. ### Why are the changes needed? Performance improvement when Timestamp type inference with legacy format. When use JSON datasource, the speed up `67%`. CSV datasource speed also up, but not obvious. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new test Closes #41091 from Hisoka-X/SPARK-39281_legacy_format. Lead-authored-by: Jia Fan Co-authored-by: Hisoka Signed-off-by: Max Gekk --- .../sql/catalyst/util/TimestampFormatter.scala | 22 .../catalyst/util/TimestampFormatterSuite.scala| 19 sql/core/benchmarks/CSVBenchmark-jdk11-results.txt | 82 +++--- sql/core/benchmarks/CSVBenchmark-jdk17-results.txt | 82 +++--- sql/core/benchmarks/CSVBenchmark-results.txt | 82 +++--- .../benchmarks/JsonBenchmark-jdk11-results.txt | 98 - .../benchmarks/JsonBenchmark-jdk17-results.txt | 122 ++--- sql/core/benchmarks/JsonBenchmark-results.txt | 122 ++--- 8 files changed, 335 insertions(+), 294 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 2a8283bde1d..aab90ec3844 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -407,6 +407,19 @@ class LegacyFastTimestampFormatter( if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) { throw new IllegalArgumentException(s"'$s' is an invalid timestamp") } +extractMicros(cal) + } + + override def parseOptional(s: String): Option[Long] = { +cal.clear() // Clear the calendar because it can be re-used many times +if (fastDateFormat.parse(s, new ParsePosition(0), cal)) { + Some(extractMicros(cal)) +} else { + None +} + } + + private def extractMicros(cal: MicrosCalendar): Long = { val micros = cal.getMicros() cal.set(Calendar.MILLISECOND, 0) val julianMicros = Math.addExact(millisToMicros(cal.getTimeInMillis), micros) @@ -451,6 +464,15 @@ class LegacySimpleTimestampFormatter( fromJavaTimestamp(new Timestamp(sdf.parse(s).getTime)) } + override def parseOptional(s: String): Option[Long] = { +val date = sdf.parse(s, new ParsePosition(0)) +if (date == null) { + None +} else { + Some(fromJavaTimestamp(new Timestamp(date.getTime))) +} + } + override def format(us: Long): String = { sdf.format(toJavaTimestamp(us)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala index 10553d421ea..8f6099e96ef 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/TimestampFormatterSuite.scala @@ -489,4 +489,23 @@ class TimestampFormatterSuite extends DatetimeFormatterSuite { assert(formatter.parseWithoutTimeZoneOptional("2012-00-65 23:59:59.9990", false) .isEmpty) } + + test("SPARK-39281: support returning optional parse results in the legacy formatter") { +val fastFormatter = new LegacyFastTimestampFormatter( + "-MM-dd
[spark] branch master updated: [SPARK-43518][SQL] Convert `_LEGACY_ERROR_TEMP_2029` to INTERNAL_ERROR
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new dbadb5f275c [SPARK-43518][SQL] Convert `_LEGACY_ERROR_TEMP_2029` to INTERNAL_ERROR dbadb5f275c is described below commit dbadb5f275cf0519b8b1ed78decfe4ce83934825 Author: panbingkun AuthorDate: Tue May 16 12:47:15 2023 +0300 [SPARK-43518][SQL] Convert `_LEGACY_ERROR_TEMP_2029` to INTERNAL_ERROR ### What changes were proposed in this pull request? The pr aims to convert _LEGACY_ERROR_TEMP_2029 to INTERNAL_ERROR. ### Why are the changes needed? 1. I found that it can only be triggered it with the parameter value: UP,DOWN,HALF_DOWN,UNNECESSARY, but from a user's perspective, it is impossible (the internal code limits its value to only: HALF_UP,HALF_EVEN,CEILING,FLOOR), so we should convert it to an internal error. 2. The changes improve the error framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? 1. Update existed UT. 2. Pass GA. Closes #41179 from panbingkun/SPARK-43518. Authored-by: panbingkun Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 5 - .../org/apache/spark/sql/errors/QueryExecutionErrors.scala| 6 ++ .../test/scala/org/apache/spark/sql/types/DecimalSuite.scala | 11 +++ 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index fa838a6da76..edc5a5a66e5 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -3873,11 +3873,6 @@ "This line should be unreachable." ] }, - "_LEGACY_ERROR_TEMP_2029" : { -"message" : [ - "Not supported rounding mode: ." -] - }, "_LEGACY_ERROR_TEMP_2030" : { "message" : [ "Can not handle nested schema yet... plan ." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 1f3ee517dd2..52e8c7df91e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -514,10 +514,8 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { messageParameters = Map("err" -> err)) } - def unsupportedRoundingMode(roundMode: BigDecimal.RoundingMode.Value): SparkRuntimeException = { -new SparkRuntimeException( - errorClass = "_LEGACY_ERROR_TEMP_2029", - messageParameters = Map("roundMode" -> roundMode.toString())) + def unsupportedRoundingMode(roundMode: BigDecimal.RoundingMode.Value): SparkException = { +SparkException.internalError(s"Not supported rounding mode: ${roundMode.toString}.") } def resolveCannotHandleNestedSchema(plan: LogicalPlan): SparkRuntimeException = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala index 465c25118fa..ab3f831fbcb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala @@ -303,6 +303,17 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester with SQLHelper } } + test("Not supported rounding mode: HALF_DOWN") { +val d = Decimal(1L, 100, 80) +checkError( + exception = intercept[SparkException] { +d.toPrecision(5, 50, BigDecimal.RoundingMode.HALF_DOWN) + }, + errorClass = "INTERNAL_ERROR", + parameters = Map("message" -> "Not supported rounding mode: HALF_DOWN.") +) + } + test("SPARK-20341: support BigInt's value does not fit in long value range") { val bigInt = scala.math.BigInt("9223372036854775808") val decimal = Decimal.apply(bigInt) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43457][CONNECT][PYTHON] Augument user agent with OS, Python and Spark versions
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6172615f707 [SPARK-43457][CONNECT][PYTHON] Augument user agent with OS, Python and Spark versions 6172615f707 is described below commit 6172615f70785b71224ecbc797de2f679ab0d593 Author: Niranjan Jayakar AuthorDate: Tue May 16 17:36:02 2023 +0900 [SPARK-43457][CONNECT][PYTHON] Augument user agent with OS, Python and Spark versions ### What changes were proposed in this pull request? Augument the user agent string sent over the service to include operating system and Python version. ### Why are the changes needed? Including OS, Python and Spark versions in the user agent improves tracking to see how Spark Connect is used across Python versions and platforms. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests attached. Closes #41138 from nija-at/user-agent-info. Lead-authored-by: Niranjan Jayakar Co-authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/client.py | 11 ++- python/pyspark/sql/tests/connect/test_client.py| 6 -- python/pyspark/sql/tests/connect/test_connect_basic.py | 10 ++ 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/connect/client.py b/python/pyspark/sql/connect/client.py index a2a2cc4cf5e..c1675eac9e1 100644 --- a/python/pyspark/sql/connect/client.py +++ b/python/pyspark/sql/connect/client.py @@ -25,6 +25,7 @@ check_dependencies(__name__) import logging import os +import platform import random import time import urllib.parse @@ -57,6 +58,7 @@ import grpc from google.protobuf import text_format from google.rpc import error_details_pb2 +from pyspark.version import __version__ from pyspark.resource.information import ResourceInformation from pyspark.sql.connect.conversion import storage_level_to_proto, proto_to_storage_level import pyspark.sql.connect.proto as pb2 @@ -299,7 +301,14 @@ class ChannelBuilder: raise SparkConnectException( f"'user_agent' parameter should not exceed 2048 characters, found {len} characters." ) -return user_agent +return " ".join( +[ +user_agent, +f"spark/{__version__}", +f"os/{platform.uname().system.lower()}", +f"python/{platform.python_version()}", +] +) def get(self, key: str) -> Any: """ diff --git a/python/pyspark/sql/tests/connect/test_client.py b/python/pyspark/sql/tests/connect/test_client.py index 191a5204bf3..722be1e2882 100644 --- a/python/pyspark/sql/tests/connect/test_client.py +++ b/python/pyspark/sql/tests/connect/test_client.py @@ -37,7 +37,7 @@ class SparkConnectClientTestCase(unittest.TestCase): client.execute_command(command) self.assertIsNotNone(mock.req, "ExecutePlan API was not called when expected") -self.assertEqual(mock.req.client_type, "bar") +self.assertRegex(mock.req.client_type, r"^bar spark/[^ ]+ os/[^ ]+ python/[^ ]+$") def test_user_agent_default(self): client = SparkConnectClient("sc://foo/") @@ -48,7 +48,9 @@ class SparkConnectClientTestCase(unittest.TestCase): client.execute_command(command) self.assertIsNotNone(mock.req, "ExecutePlan API was not called when expected") -self.assertEqual(mock.req.client_type, "_SPARK_CONNECT_PYTHON") +self.assertRegex( +mock.req.client_type, r"^_SPARK_CONNECT_PYTHON spark/[^ ]+ os/[^ ]+ python/[^ ]+$" +) def test_properties(self): client = SparkConnectClient("sc://foo/;token=bar") diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index b0bc2cba78e..8a83d040207 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -3404,13 +3404,15 @@ class ChannelBuilderTests(unittest.TestCase): chan = ChannelBuilder("sc://host/;token=abcs") self.assertTrue(chan.secure, "specifying a token must set the channel to secure") -self.assertEqual(chan.userAgent, "_SPARK_CONNECT_PYTHON") +self.assertRegex( +chan.userAgent, r"^_SPARK_CONNECT_PYTHON spark/[^ ]+ os/[^ ]+ python/[^ ]+$" +) chan = ChannelBuilder("sc://host/;use_ssl=abcs") self.assertFalse(chan.secure, "Garbage in, false out") def test_user_agent(self): chan = ChannelBuilder("sc://host/;user_agent=Agent123%20%2F3.4") -self.assertEqual("Agent123 /3.4", chan.userAgent) +