[spark] branch master updated (5fc482eb591 -> d08ab7e24b3)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 5fc482eb591 [SPARK-41315][CONNECT][PYTHON] Implement `DataFrame.replace` and `DataFrame.na.replace` add d08ab7e24b3 [SPARK-41332][CONNECT][PYTHON] Fix `nullOrdering` in `SortOrder` No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/column.py | 26 ++ python/pyspark/sql/connect/plan.py | 37 +++ .../sql/tests/connect/test_connect_basic.py| 42 ++ .../sql/tests/connect/test_connect_plan_only.py| 34 +- 4 files changed, 106 insertions(+), 33 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41325][CONNECT] Fix missing avg() for GroupBy on DF
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 32ff77cdb8e [SPARK-41325][CONNECT] Fix missing avg() for GroupBy on DF 32ff77cdb8e is described below commit 32ff77cdb8ef4973494beb1a31ced05ea493dc6d Author: Martin Grund AuthorDate: Wed Nov 30 19:06:12 2022 +0800 [SPARK-41325][CONNECT] Fix missing avg() for GroupBy on DF ### What changes were proposed in this pull request? Previously, the `avg` function was missing in the `GroupedData` class. This patch adds this method and the necessary plan transformation using an unresolved function. In addition, it identified a small issue where when an alias is used for a grouping column, the planner would incorrectly try to wrap the existing alias expression using an unresolved alias which would then fail. ``` df = ( self.connect.range(10) .groupBy((col("id") % lit(2)).alias("moded")) .avg("id") .sort("moded") ) ``` ### Why are the changes needed? Bug / Compatibility ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38841 from grundprinzip/SPARK-41325. Authored-by: Martin Grund Signed-off-by: Ruifeng Zheng --- .../spark/sql/connect/planner/SparkConnectPlanner.scala | 3 ++- python/pyspark/sql/connect/dataframe.py | 4 python/pyspark/sql/tests/connect/test_connect_basic.py | 13 + 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 7b9e13cadab..d1d4c3d4fa9 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -682,7 +682,8 @@ class SparkConnectPlanner(session: SparkSession) { rel.getGroupingExpressionsList.asScala .map(transformExpression) .map { - case x @ UnresolvedAttribute(_) => x + case ua @ UnresolvedAttribute(_) => ua + case a @ Alias(_, _) => a case x => UnresolvedAlias(x) } diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index c9960a71fb8..ebfb52cdd74 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -84,6 +84,10 @@ class GroupedData(object): expr = self._map_cols_to_expression("sum", col) return self.agg(expr) +def avg(self, col: Union[Column, str]) -> "DataFrame": +expr = self._map_cols_to_expression("avg", col) +return self.agg(expr) + def count(self) -> "DataFrame": return self.agg([scalar_function("count", lit(1))]) diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index f518a09ad4a..22d57994794 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -837,6 +837,19 @@ class SparkConnectTests(SparkConnectSQLTestCase): ndf = self.connect.read.table("parquet_test") self.assertEqual(set(df.collect()), set(ndf.collect())) +def test_agg_with_avg(self): +# SPARK-41325: groupby.avg() +df = ( +self.connect.range(10) +.groupBy((col("id") % lit(2)).alias("moded")) +.avg("id") +.sort("moded") +) +res = df.collect() +self.assertEqual(2, len(res)) +self.assertEqual(4.0, res[0][1]) +self.assertEqual(5.0, res[1][1]) + class ChannelBuilderTests(ReusedPySparkTestCase): def test_invalid_connection_strings(self): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41328][CONNECT][PYTHON][FOLLOW-UP] Simplify startsWith and endsWith
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 32b863866a5 [SPARK-41328][CONNECT][PYTHON][FOLLOW-UP] Simplify startsWith and endsWith 32b863866a5 is described below commit 32b863866a5e4ff88ba3b111cdaefaf9f984039f Author: Rui Wang AuthorDate: Wed Nov 30 19:15:29 2022 +0800 [SPARK-41328][CONNECT][PYTHON][FOLLOW-UP] Simplify startsWith and endsWith ### What changes were proposed in this pull request? 1. `startsWith` and `endsWith` can be implemented by simplify invoking `_bin_op` with additional documentation. 2. Remove not working examples in the documentation. ### Why are the changes needed? Codebase simplification. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? Existing UT. Closes #38849 from amaliujia/simplify_strings. Authored-by: Rui Wang Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/connect/column.py | 53 +++- 1 file changed, 16 insertions(+), 37 deletions(-) diff --git a/python/pyspark/sql/connect/column.py b/python/pyspark/sql/connect/column.py index f9241b0bd58..e02508d6114 100644 --- a/python/pyspark/sql/connect/column.py +++ b/python/pyspark/sql/connect/column.py @@ -520,45 +520,24 @@ class Column(object): """ return _bin_op("contains")(self, other) -def startswith(self, other: Union[PrimitiveType, "Column"]) -> "Column": -""" -String starts with. Returns a boolean :class:`Column` based on a string match. - -Parameters --- -other : :class:`Column` or str -string at start of line (do not use a regex `^`) - -Examples - ->>> df = spark.createDataFrame( -... [(2, "Alice"), (5, "Bob")], ["age", "name"]) ->>> df.filter(df.name.startswith('Al')).collect() -[Row(age=2, name='Alice')] ->>> df.filter(df.name.startswith('^Al')).collect() -[] -""" -return _bin_op("startsWith")(self, other) +_startswith_doc = """ +String starts with. Returns a boolean :class:`Column` based on a string match. -def endswith(self, other: Union[PrimitiveType, "Column"]) -> "Column": -""" -String ends with. Returns a boolean :class:`Column` based on a string match. - -Parameters --- -other : :class:`Column` or str -string at end of line (do not use a regex `$`) +Parameters +-- +other : :class:`Column` or str +string at start of line (do not use a regex `^`) +""" +_endswith_doc = """ +String ends with. Returns a boolean :class:`Column` based on a string match. -Examples - ->>> df = spark.createDataFrame( -... [(2, "Alice"), (5, "Bob")], ["age", "name"]) ->>> df.filter(df.name.endswith('ice')).collect() -[Row(age=2, name='Alice')] ->>> df.filter(df.name.endswith('ice$')).collect() -[] -""" -return _bin_op("endsWith")(self, other) +Parameters +-- +other : :class:`Column` or str +string at end of line (do not use a regex `$`) +""" +startswith = _bin_op("startsWith", _startswith_doc) +endswith = _bin_op("endsWith", _endswith_doc) def like(self: "Column", other: str) -> "Column": """ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (32b863866a5 -> b64a95f72fe)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 32b863866a5 [SPARK-41328][CONNECT][PYTHON][FOLLOW-UP] Simplify startsWith and endsWith add b64a95f72fe [SPARK-41326][CONNECT][FOLLOW-UP] Add e2e tests for distinct and dropDuplicates No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/connect/test_connect_basic.py | 10 ++ 1 file changed, 10 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b64a95f72fe -> 5b13a51dc0a)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from b64a95f72fe [SPARK-41326][CONNECT][FOLLOW-UP] Add e2e tests for distinct and dropDuplicates add 5b13a51dc0a [SPARK-41335][CONNECT][PYTHON] Support IsNull and IsNotNull in Column No new revisions were added by this update. Summary of changes: python/pyspark/sql/connect/column.py | 17 + python/pyspark/sql/tests/connect/test_connect_basic.py | 10 ++ 2 files changed, 27 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (5b13a51dc0a -> 70502d7a043)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 5b13a51dc0a [SPARK-41335][CONNECT][PYTHON] Support IsNull and IsNotNull in Column add 70502d7a043 [SPARK-41276][SQL][ML][MLLIB][PROTOBUF][PYTHON][R][SS][AVRO] Optimize constructor use of `StructType` No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/avro/SchemaConverters.scala | 4 ++-- .../spark/sql/protobuf/utils/SchemaConverters.scala | 2 +- .../main/scala/org/apache/spark/ml/fpm/FPGrowth.scala| 4 ++-- .../main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala | 2 +- .../scala/org/apache/spark/ml/image/ImageSchema.scala| 16 .../scala/org/apache/spark/ml/linalg/MatrixUDT.scala | 2 +- .../scala/org/apache/spark/ml/linalg/VectorUDT.scala | 2 +- .../apache/spark/ml/source/libsvm/LibSVMRelation.scala | 2 +- .../scala/org/apache/spark/mllib/linalg/Matrices.scala | 2 +- .../scala/org/apache/spark/mllib/linalg/Vectors.scala| 2 +- .../spark/ml/source/libsvm/LibSVMRelationSuite.scala | 6 +++--- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 2 +- .../apache/spark/sql/catalyst/json/JsonInferSchema.scala | 2 +- .../sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala| 6 +++--- .../apache/spark/sql/catalyst/parser/AstBuilder.scala| 8 .../catalyst/plans/logical/basicLogicalOperators.scala | 2 +- .../scala/org/apache/spark/sql/types/StructType.scala| 6 +++--- .../scala/org/apache/spark/sql/util/ArrowUtils.scala | 4 ++-- .../main/scala/org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- .../apache/spark/sql/execution/command/SetCommand.scala | 16 .../apache/spark/sql/execution/command/functions.scala | 2 +- .../org/apache/spark/sql/execution/command/tables.scala | 2 +- .../datasources/binaryfile/BinaryFileFormat.scala| 10 +- .../spark/sql/execution/datasources/orc/OrcUtils.scala | 2 +- .../sql/execution/datasources/v2/text/TextTable.scala| 2 +- .../sql/execution/python/AggregateInPandasExec.scala | 2 +- .../spark/sql/execution/python/EvalPythonExec.scala | 2 +- .../spark/sql/execution/python/MapInBatchExec.scala | 2 +- .../streaming/sources/RatePerMicroBatchProvider.scala| 2 +- .../execution/streaming/sources/RateStreamProvider.scala | 2 +- .../streaming/sources/TextSocketSourceProvider.scala | 6 +++--- .../scala/org/apache/spark/sql/hive/HiveInspectors.scala | 2 +- .../main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 2 +- .../apache/spark/sql/hive/client/HiveClientImpl.scala| 2 +- .../main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 2 +- 35 files changed, 67 insertions(+), 67 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] dongjoon-hyun commented on pull request #428: Add 3.2.3 announcement news, release note and download link
dongjoon-hyun commented on PR #428: URL: https://github.com/apache/spark-website/pull/428#issuecomment-1332622082 Thank you so much, @gengliangwang ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 388824c4488 [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info 388824c4488 is described below commit 388824c448804161b076507f0f39ef0596e0a0bf Author: Lingyun Yuan AuthorDate: Wed Nov 30 11:47:15 2022 -0800 [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info ### What changes were proposed in this pull request? This PR aims to fix `SparkStatusTracker.getExecutorInfos` to return a correct `on/offHeapStorageMemory`. ### Why are the changes needed? `SparkExecutorInfoImpl` used the following parameter order. https://github.com/apache/spark/blob/54c57fa86906f933e089a33ef25ae0c053769cc8/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala#L42-L45 SPARK-20659 introduced a bug with wrong parameter order at Apache Spark 2.4.0. - https://github.com/apache/spark/pull/20546/files#diff-7daca909d33ff8e9b4938e2b4a4aaa1558fbdf4604273b9e38cce32c55e1508cR118-R121 ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Manually review. Closes #38843 from ylybest/master. Lead-authored-by: Lingyun Yuan Co-authored-by: ylybest <119458293+ylyb...@users.noreply.github.com> Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/SparkStatusTracker.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 37e673cd8c7..22dc1d056ec 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -114,10 +114,10 @@ class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStatusStore port, cachedMem, exec.activeTasks, -exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L), exec.memoryMetrics.map(_.usedOnHeapStorageMemory).getOrElse(0L), -exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L), -exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L)) +exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L), +exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L), +exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L)) }.toArray } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new a94dd1820d6 [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info a94dd1820d6 is described below commit a94dd1820d65e6280941047dbb4abb15bf429bc3 Author: Lingyun Yuan AuthorDate: Wed Nov 30 11:47:15 2022 -0800 [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info ### What changes were proposed in this pull request? This PR aims to fix `SparkStatusTracker.getExecutorInfos` to return a correct `on/offHeapStorageMemory`. ### Why are the changes needed? `SparkExecutorInfoImpl` used the following parameter order. https://github.com/apache/spark/blob/54c57fa86906f933e089a33ef25ae0c053769cc8/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala#L42-L45 SPARK-20659 introduced a bug with wrong parameter order at Apache Spark 2.4.0. - https://github.com/apache/spark/pull/20546/files#diff-7daca909d33ff8e9b4938e2b4a4aaa1558fbdf4604273b9e38cce32c55e1508cR118-R121 ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Manually review. Closes #38843 from ylybest/master. Lead-authored-by: Lingyun Yuan Co-authored-by: ylybest <119458293+ylyb...@users.noreply.github.com> Signed-off-by: Dongjoon Hyun (cherry picked from commit 388824c448804161b076507f0f39ef0596e0a0bf) Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/SparkStatusTracker.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 37e673cd8c7..22dc1d056ec 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -114,10 +114,10 @@ class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStatusStore port, cachedMem, exec.activeTasks, -exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L), exec.memoryMetrics.map(_.usedOnHeapStorageMemory).getOrElse(0L), -exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L), -exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L)) +exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L), +exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L), +exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L)) }.toArray } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 44b6db8d7a5 [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info 44b6db8d7a5 is described below commit 44b6db8d7a5828763774b21270cfc9de426b3f9a Author: Lingyun Yuan AuthorDate: Wed Nov 30 11:47:15 2022 -0800 [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info ### What changes were proposed in this pull request? This PR aims to fix `SparkStatusTracker.getExecutorInfos` to return a correct `on/offHeapStorageMemory`. ### Why are the changes needed? `SparkExecutorInfoImpl` used the following parameter order. https://github.com/apache/spark/blob/54c57fa86906f933e089a33ef25ae0c053769cc8/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala#L42-L45 SPARK-20659 introduced a bug with wrong parameter order at Apache Spark 2.4.0. - https://github.com/apache/spark/pull/20546/files#diff-7daca909d33ff8e9b4938e2b4a4aaa1558fbdf4604273b9e38cce32c55e1508cR118-R121 ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Manually review. Closes #38843 from ylybest/master. Lead-authored-by: Lingyun Yuan Co-authored-by: ylybest <119458293+ylyb...@users.noreply.github.com> Signed-off-by: Dongjoon Hyun (cherry picked from commit 388824c448804161b076507f0f39ef0596e0a0bf) Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/SparkStatusTracker.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala index 37e673cd8c7..22dc1d056ec 100644 --- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala +++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala @@ -114,10 +114,10 @@ class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStatusStore port, cachedMem, exec.activeTasks, -exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L), exec.memoryMetrics.map(_.usedOnHeapStorageMemory).getOrElse(0L), -exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L), -exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L)) +exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L), +exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L), +exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L)) }.toArray } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41321][CONNECT] Support target field for UnresolvedStar
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7b200898967 [SPARK-41321][CONNECT] Support target field for UnresolvedStar 7b200898967 is described below commit 7b20089896716a5fa7cad595bd560640d1b5afcf Author: dengziming AuthorDate: Thu Dec 1 10:40:00 2022 +0800 [SPARK-41321][CONNECT] Support target field for UnresolvedStar ### What changes were proposed in this pull request? 1. Support target field UnresolvedStar 2. UnresolvedStar can be used simultaneously with other expression. ### Why are the changes needed? This is a necessary feature for UnresolvedStar ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added 2 new unit tests. Closes #38838 from dengziming/SPARK-41321. Authored-by: dengziming Signed-off-by: Ruifeng Zheng --- .../main/protobuf/spark/connect/expressions.proto | 4 +- .../sql/connect/planner/SparkConnectPlanner.scala | 17 ++-- .../connect/planner/SparkConnectPlannerSuite.scala | 105 - .../pyspark/sql/connect/proto/expressions_pb2.py | 55 ++- .../pyspark/sql/connect/proto/expressions_pb2.pyi | 13 +++ 5 files changed, 158 insertions(+), 36 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/expressions.proto b/connector/connect/src/main/protobuf/spark/connect/expressions.proto index 2a1159c1d04..b90f7619b8f 100644 --- a/connector/connect/src/main/protobuf/spark/connect/expressions.proto +++ b/connector/connect/src/main/protobuf/spark/connect/expressions.proto @@ -18,7 +18,6 @@ syntax = 'proto3'; import "spark/connect/types.proto"; -import "google/protobuf/any.proto"; package spark.connect; @@ -142,6 +141,9 @@ message Expression { // UnresolvedStar is used to expand all the fields of a relation or struct. message UnresolvedStar { +// (Optional) The target of the expansion, either be a table name or struct name, this +// is a list of identifiers that is the path of the expansion. +repeated string target = 1; } message Alias { diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index d1d4c3d4fa9..5ebe7c7cce3 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -392,13 +392,8 @@ class SparkConnectPlanner(session: SparkSession) { } else { logical.OneRowRelation() } -// TODO: support the target field for *. val projection = - if (rel.getExpressionsCount == 1 && rel.getExpressions(0).hasUnresolvedStar) { -Seq(UnresolvedStar(Option.empty)) - } else { - rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_)) - } + rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_)) logical.Project(projectList = projection.toSeq, child = baseRel) } @@ -416,6 +411,8 @@ class SparkConnectPlanner(session: SparkSession) { case proto.Expression.ExprTypeCase.ALIAS => transformAlias(exp.getAlias) case proto.Expression.ExprTypeCase.EXPRESSION_STRING => transformExpressionString(exp.getExpressionString) + case proto.Expression.ExprTypeCase.UNRESOLVED_STAR => +transformUnresolvedStar(exp.getUnresolvedStar) case _ => throw InvalidPlanInput( s"Expression with ID: ${exp.getExprTypeCase.getNumber} is not supported") @@ -573,6 +570,14 @@ class SparkConnectPlanner(session: SparkSession) { session.sessionState.sqlParser.parseExpression(expr.getExpression) } + private def transformUnresolvedStar(regex: proto.Expression.UnresolvedStar): Expression = { +if (regex.getTargetList.isEmpty) { + UnresolvedStar(Option.empty) +} else { + UnresolvedStar(Some(regex.getTargetList.asScala.toSeq)) +} + } + private def transformSetOperation(u: proto.SetOperation): LogicalPlan = { assert(u.hasLeftInput && u.hasRightInput, "Union must have 2 inputs") diff --git a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala index 8fbf2be3730..81e5ee3d0ce 100644 --- a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala +++ b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala @@ -23,7 +23,7 @@ import com.google.protobuf.ByteString impor
[spark] branch master updated: [SPARK-41226][SQL] Refactor Spark types by introducing physical types
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3d598594cd6 [SPARK-41226][SQL] Refactor Spark types by introducing physical types 3d598594cd6 is described below commit 3d598594cd66eee481bf15da7542059082e148ef Author: Desmond Cheong AuthorDate: Thu Dec 1 11:24:33 2022 +0800 [SPARK-41226][SQL] Refactor Spark types by introducing physical types ### What changes were proposed in this pull request? Refactor Spark types by introducing physical types. Multiple logical types match to the same physical type, for example `DateType` and `YearMonthIntervalType` are both implemented using `IntegerType`. Since this is the case, we can simplify case matching logic on Spark types by matching their physical types rather than listing all possible logical types. ### Why are the changes needed? These changes simplify the Spark type system. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Since this code is a refactor of existing code, we rely on existing tests. Closes #38750 from desmondcheongzx/refactor-using-physical-types. Authored-by: Desmond Cheong Signed-off-by: Wenchen Fan --- .../expressions/SpecializedGettersReader.java | 51 +++--- .../spark/sql/vectorized/ColumnarBatchRow.java | 36 ++--- .../apache/spark/sql/vectorized/ColumnarRow.java | 36 ++--- .../apache/spark/sql/catalyst/InternalRow.scala| 36 +++-- .../spark/sql/catalyst/encoders/RowEncoder.scala | 20 ++- .../expressions/InterpretedUnsafeProjection.scala | 172 ++--- .../expressions/codegen/CodeGenerator.scala| 55 --- .../spark/sql/catalyst/expressions/literals.scala | 65 .../sql/catalyst/types/PhysicalDataType.scala | 66 .../org/apache/spark/sql/types/ArrayType.scala | 4 + .../org/apache/spark/sql/types/BinaryType.scala| 3 + .../org/apache/spark/sql/types/BooleanType.scala | 3 + .../org/apache/spark/sql/types/ByteType.scala | 3 + .../spark/sql/types/CalendarIntervalType.scala | 3 + .../org/apache/spark/sql/types/CharType.scala | 2 + .../org/apache/spark/sql/types/DataType.scala | 3 + .../org/apache/spark/sql/types/DateType.scala | 3 + .../spark/sql/types/DayTimeIntervalType.scala | 3 + .../org/apache/spark/sql/types/DecimalType.scala | 3 + .../org/apache/spark/sql/types/DoubleType.scala| 3 + .../org/apache/spark/sql/types/FloatType.scala | 3 + .../org/apache/spark/sql/types/IntegerType.scala | 3 + .../org/apache/spark/sql/types/LongType.scala | 3 + .../scala/org/apache/spark/sql/types/MapType.scala | 4 + .../org/apache/spark/sql/types/NullType.scala | 3 + .../org/apache/spark/sql/types/ShortType.scala | 3 + .../org/apache/spark/sql/types/StringType.scala| 3 + .../org/apache/spark/sql/types/StructType.scala| 3 + .../apache/spark/sql/types/TimestampNTZType.scala | 3 + .../org/apache/spark/sql/types/TimestampType.scala | 3 + .../org/apache/spark/sql/types/VarcharType.scala | 2 + .../spark/sql/types/YearMonthIntervalType.scala| 3 + 32 files changed, 367 insertions(+), 239 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java index 90857c667ab..c5a7d34281f 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions; +import org.apache.spark.sql.catalyst.types.*; import org.apache.spark.sql.types.*; public final class SpecializedGettersReader { @@ -28,70 +29,56 @@ public final class SpecializedGettersReader { DataType dataType, boolean handleNull, boolean handleUserDefinedType) { -if (handleNull && (obj.isNullAt(ordinal) || dataType instanceof NullType)) { +PhysicalDataType physicalDataType = dataType.physicalDataType(); +if (handleNull && (obj.isNullAt(ordinal) || physicalDataType instanceof PhysicalNullType)) { return null; } -if (dataType instanceof BooleanType) { +if (physicalDataType instanceof PhysicalBooleanType) { return obj.getBoolean(ordinal); } -if (dataType instanceof ByteType) { +if (physicalDataType instanceof PhysicalByteType) { return obj.getByte(ordinal); } -if (dataType instanceof ShortType) { +if (physicalDataType instanceof PhysicalShortType) { return obj.getShort(ord
[spark] branch master updated: [SPARK-41227][CONNECT][PYTHON] Implement DataFrame cross join
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 93b9deedbd8 [SPARK-41227][CONNECT][PYTHON] Implement DataFrame cross join 93b9deedbd8 is described below commit 93b9deedbd81dec79badec0e761f12c779b019f6 Author: Xinrong Meng AuthorDate: Thu Dec 1 13:05:51 2022 +0800 [SPARK-41227][CONNECT][PYTHON] Implement DataFrame cross join ### What changes were proposed in this pull request? Implement DataFrame cross join for Spark Connect. That consists of - `DataFrame.crossJoin` - `DataFrame.join(.., how="cross")`. ### Why are the changes needed? Part of [SPARK-39375](https://issues.apache.org/jira/browse/SPARK-39375). ### Does this PR introduce _any_ user-facing change? Yes. `DataFrame.crossJoin` and `DataFrame.join(.., how="cross")` are supported as shown below. ```py >>> from pyspark.sql.connect.client import RemoteSparkSession >>> cspark = RemoteSparkSession() >>> df = cspark.range(1, 3) >>> df.crossJoin(df).show() +---+---+ | id| id| +---+---+ | 1| 1| | 1| 2| | 2| 1| | 2| 2| +---+---+ >>> df.join(other=df, how="cross").show() +---+---+ | id| id| +---+---+ | 1| 1| | 1| 2| | 2| 1| | 2| 2| +---+---+ ``` ### How was this patch tested? Unit tests. Closes #38778 from xinrong-meng/connect_crossjoin. Authored-by: Xinrong Meng Signed-off-by: Ruifeng Zheng --- .../main/protobuf/spark/connect/relations.proto| 1 + .../sql/connect/planner/SparkConnectPlanner.scala | 3 +- python/pyspark/sql/connect/dataframe.py| 25 - python/pyspark/sql/connect/plan.py | 4 +- python/pyspark/sql/connect/proto/relations_pb2.py | 118 ++--- python/pyspark/sql/connect/proto/relations_pb2.pyi | 2 + .../sql/tests/connect/test_connect_basic.py| 17 +++ .../sql/tests/connect/test_connect_plan_only.py| 8 ++ .../sql/tests/connect/test_connect_select_ops.py | 1 + 9 files changed, 117 insertions(+), 62 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto b/connector/connect/src/main/protobuf/spark/connect/relations.proto index 8b87845245f..f4df95fdd73 100644 --- a/connector/connect/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto @@ -166,6 +166,7 @@ message Join { JOIN_TYPE_RIGHT_OUTER = 4; JOIN_TYPE_LEFT_ANTI = 5; JOIN_TYPE_LEFT_SEMI = 6; +JOIN_TYPE_CROSS = 7; } } diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 5ebe7c7cce3..6b11cbea7a5 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} -import org.apache.spark.sql.catalyst.plans.{logical, FullOuter, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin} +import org.apache.spark.sql.catalyst.plans.{logical, Cross, FullOuter, Inner, JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin} import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Except, Intersect, LocalRelation, LogicalPlan, Sample, SubqueryAlias, Union} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryCompilationErrors @@ -641,6 +641,7 @@ class SparkConnectPlanner(session: SparkSession) { case proto.Join.JoinType.JOIN_TYPE_LEFT_OUTER => LeftOuter case proto.Join.JoinType.JOIN_TYPE_RIGHT_OUTER => RightOuter case proto.Join.JoinType.JOIN_TYPE_LEFT_SEMI => LeftSemi + case proto.Join.JoinType.JOIN_TYPE_CROSS => Cross case _ => throw InvalidPlanInput(s"Join type ${t} is not supported") } } diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index ebfb52cdd74..749aab7c859 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -230,7 +230,30 @@ class DataFrame(object): return pdd.iloc[0, 0] def crossJoin(self, other: "DataFrame") -> "DataFrame": -... +""" +Returns the cartesian product with another :class:`DataFrame`. + +.. versionadded:: 3.4.0 + +Parame
[spark] branch master updated: [SPARK-41343][CONNECT] Move FunctionName parsing to server side
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ce41ca0848e [SPARK-41343][CONNECT] Move FunctionName parsing to server side ce41ca0848e is described below commit ce41ca0848e740026048aa08cb1062cc4d5082d1 Author: Rui Wang AuthorDate: Thu Dec 1 13:27:03 2022 +0800 [SPARK-41343][CONNECT] Move FunctionName parsing to server side ### What changes were proposed in this pull request? This PR propose to change the name of `UnresolvedFunction` from a sequence of name parts to a single name string, which help to move the function name parsing to server side. For built-in functions, there is no need to even call SQL parser to parse the name (built-in functions should not belong to any catalog or database). ### Why are the changes needed? This will help reduce redundant implementation on client sides to parse function names. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #38854 from amaliujia/function_name_parse. Authored-by: Rui Wang Signed-off-by: Wenchen Fan --- .../main/protobuf/spark/connect/expressions.proto | 10 ++-- .../org/apache/spark/sql/connect/dsl/package.scala | 11 + .../sql/connect/planner/SparkConnectPlanner.scala | 22 + .../connect/planner/SparkConnectPlannerSuite.scala | 4 ++-- .../connect/planner/SparkConnectProtoSuite.scala | 4 .../connect/planner/SparkConnectServiceSuite.scala | 2 +- python/pyspark/sql/connect/column.py | 2 +- .../pyspark/sql/connect/proto/expressions_pb2.py | 20 .../pyspark/sql/connect/proto/expressions_pb2.pyi | 28 +++--- .../connect/test_connect_column_expressions.py | 4 ++-- .../sql/tests/connect/test_connect_plan_only.py| 2 +- 11 files changed, 63 insertions(+), 46 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/expressions.proto b/connector/connect/src/main/protobuf/spark/connect/expressions.proto index b90f7619b8f..1b93c342381 100644 --- a/connector/connect/src/main/protobuf/spark/connect/expressions.proto +++ b/connector/connect/src/main/protobuf/spark/connect/expressions.proto @@ -126,11 +126,17 @@ message Expression { // An unresolved function is not explicitly bound to one explicit function, but the function // is resolved during analysis following Sparks name resolution rules. message UnresolvedFunction { -// (Required) Names parts for the unresolved function. -repeated string parts = 1; +// (Required) name (or unparsed name for user defined function) for the unresolved function. +string function_name = 1; // (Optional) Function arguments. Empty arguments are allowed. repeated Expression arguments = 2; + +// (Required) Indicate if this is a user defined function. +// +// When it is not a user defined function, Connect will use the function name directly. +// When it is a user defined function, Connect will parse the function name first. +bool is_user_defined_function = 3; } // Expression as string. diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 654a4d5ce20..1342842cbc9 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -83,7 +83,7 @@ package object dsl { .setUnresolvedFunction( Expression.UnresolvedFunction .newBuilder() - .addParts("<") + .setFunctionName("<") .addArguments(expr) .addArguments(other)) .build() @@ -93,14 +93,14 @@ package object dsl { Expression .newBuilder() .setUnresolvedFunction( - Expression.UnresolvedFunction.newBuilder().addParts("min").addArguments(e)) + Expression.UnresolvedFunction.newBuilder().setFunctionName("min").addArguments(e)) .build() def proto_explode(e: Expression): Expression = Expression .newBuilder() .setUnresolvedFunction( - Expression.UnresolvedFunction.newBuilder().addParts("explode").addArguments(e)) + Expression.UnresolvedFunction.newBuilder().setFunctionName("explode").addArguments(e)) .build() /** @@ -117,7 +117,8 @@ package object dsl { .setUnresolvedFunction( Expression.UnresolvedFunction .newBuilder() -.addAllParts(nameParts.asJava) +.setFunctionName(nameParts.mkString(".")) +.setIsUserDefinedF
[spark] branch master updated (ce41ca0848e -> c5f189c5365)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from ce41ca0848e [SPARK-41343][CONNECT] Move FunctionName parsing to server side add c5f189c5365 [SPARK-41237][SQL] Reuse the error class `UNSUPPORTED_DATATYPE` for `_LEGACY_ERROR_TEMP_0030` No new revisions were added by this update. Summary of changes: R/pkg/tests/fulltests/test_sparkSQL.R| 6 +++--- R/pkg/tests/fulltests/test_streaming.R | 2 +- R/pkg/tests/fulltests/test_utils.R | 2 +- core/src/main/resources/error/error-classes.json | 5 - .../org/apache/spark/sql/errors/QueryParsingErrors.scala | 4 ++-- .../apache/spark/sql/catalyst/parser/DDLParserSuite.scala| 4 ++-- .../spark/sql/catalyst/parser/DataTypeParserSuite.scala | 12 ++-- .../apache/spark/sql/catalyst/parser/ErrorParserSuite.scala | 4 ++-- .../test/resources/sql-tests/results/csv-functions.sql.out | 1 - .../test/resources/sql-tests/results/postgreSQL/with.sql.out | 10 ++ .../sql/execution/datasources/jdbc/JdbcUtilsSuite.scala | 4 ++-- .../datasources/v2/jdbc/JDBCTableCatalogSuite.scala | 4 ++-- .../scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 4 ++-- 13 files changed, 33 insertions(+), 29 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41228][SQL] Rename & Improve error message for `COLUMN_NOT_IN_GROUP_BY_CLAUSE`
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5badb2446fa [SPARK-41228][SQL] Rename & Improve error message for `COLUMN_NOT_IN_GROUP_BY_CLAUSE` 5badb2446fa is described below commit 5badb2446fa2b51e8ea239ced6c9b44178b2f1fa Author: itholic AuthorDate: Thu Dec 1 09:18:17 2022 +0300 [SPARK-41228][SQL] Rename & Improve error message for `COLUMN_NOT_IN_GROUP_BY_CLAUSE` ### What changes were proposed in this pull request? This PR proposes to rename `COLUMN_NOT_IN_GROUP_BY_CLAUSE` to `MISSING_AGGREGATION`. Also, improve its error message. ### Why are the changes needed? The current error class name and its error message doesn't illustrate the error cause and resolution correctly. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ``` ./build/sbt “sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*” ``` Closes #38769 from itholic/SPARK-41128. Authored-by: itholic Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 13 +++-- .../sql/tests/pandas/test_pandas_udf_grouped_agg.py | 2 +- .../apache/spark/sql/errors/QueryCompilationErrors.scala | 7 +-- .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala | 7 +-- .../src/test/resources/sql-tests/results/extract.sql.out | 2 ++ .../resources/sql-tests/results/group-by-filter.sql.out | 10 ++ .../src/test/resources/sql-tests/results/group-by.sql.out | 15 +-- .../test/resources/sql-tests/results/grouping_set.sql.out | 5 +++-- .../sql-tests/results/postgreSQL/create_view.sql.out | 5 +++-- .../sql-tests/results/udaf/udaf-group-by-ordinal.sql.out | 15 +-- .../sql-tests/results/udaf/udaf-group-by.sql.out | 15 +-- .../resources/sql-tests/results/udf/udf-group-by.sql.out | 15 +-- .../org/apache/spark/sql/execution/SQLViewSuite.scala | 5 +++-- 13 files changed, 71 insertions(+), 45 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index a79c02e1f1d..65b6dc68d12 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -109,12 +109,6 @@ "The column already exists. Consider to choose another name or rename the existing column." ] }, - "COLUMN_NOT_IN_GROUP_BY_CLAUSE" : { -"message" : [ - "The expression is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in `first()` (or `first_value()`) if you don't care which value you get." -], -"sqlState" : "42000" - }, "CONCURRENT_QUERY" : { "message" : [ "Another instance of this query was just started by a concurrent session." @@ -830,6 +824,13 @@ "Malformed Protobuf messages are detected in message deserialization. Parse Mode: . To process malformed protobuf message as null result, try setting the option 'mode' as 'PERMISSIVE'." ] }, + "MISSING_AGGREGATION" : { +"message" : [ + "The non-aggregating expression is based on columns which are not participating in the GROUP BY clause.", + "Add the columns or the expression to the GROUP BY, aggregate the expression, or use if you do not care which of the values within a group is returned." +], +"sqlState" : "42000" + }, "MISSING_STATIC_PARTITION_COLUMN" : { "message" : [ "Unknown static partition column: " diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py index 6f475624b74..aa844fc5fd5 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py @@ -475,7 +475,7 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase): mean_udf = self.pandas_agg_mean_udf with QuietTest(self.sc): -with self.assertRaisesRegex(AnalysisException, "nor.*aggregate function"): +with self.assertRaisesRegex(AnalysisException, "[MISSING_AGGREGATION]"): df.groupby(df.id).agg(plus_one(df.v)).collect() with QuietTest(self.sc): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index e5b1c3c100d..fc9a08104b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.cat
[spark] branch master updated: [SPARK-41338][SQL] Resolve outer references and normal columns in the same analyzer batch
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8d60736abd5 [SPARK-41338][SQL] Resolve outer references and normal columns in the same analyzer batch 8d60736abd5 is described below commit 8d60736abd56c5642ae4d3616593c94090cdf9ac Author: Wenchen Fan AuthorDate: Thu Dec 1 14:20:23 2022 +0800 [SPARK-41338][SQL] Resolve outer references and normal columns in the same analyzer batch ### What changes were proposed in this pull request? Today, the way we resolve outer references is very inefficient. It invokes the entire analyzer to resolve the subquery plan, then transforms the plan to resolve `UnresolvedAttribute` to outer references. If the plan is still unresolved, repeat the process until the plan is resolved or the plan doesn't change any more. Ideally, we should only invoke the analyzer once to resolve subquery plans. This PR adds a new rule to resolve outer references, and put it in the main analyzer batch. Then we can safely invoke the analyzer only once. ### Why are the changes needed? Simplify the subquery resolution code and make it more efficient ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #38851 from cloud-fan/outer. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/analysis/Analyzer.scala | 106 +++-- 1 file changed, 57 insertions(+), 49 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1daa8ea36bf..7f66ddaa894 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -298,6 +298,7 @@ class Analyzer(override val catalogManager: CatalogManager) ResolveOrdinalInOrderByAndGroupBy :: ResolveAggAliasInGroupBy :: ResolveMissingReferences :: + ResolveOuterReferences :: ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: @@ -2109,6 +2110,51 @@ class Analyzer(override val catalogManager: CatalogManager) } } + /** + * Resolves `UnresolvedAttribute` to `OuterReference` if we are resolving subquery plans (when + * `AnalysisContext.get.outerPlan` is set). + */ + object ResolveOuterReferences extends Rule[LogicalPlan] { +override def apply(plan: LogicalPlan): LogicalPlan = { + // Only apply this rule if we are resolving subquery plans. + if (AnalysisContext.get.outerPlan.isEmpty) return plan + + // We must run these 3 rules first, as they also resolve `UnresolvedAttribute` and have + // higher priority than outer reference resolution. + val prepared = ResolveAggregateFunctions(ResolveMissingReferences(ResolveReferences(plan))) + prepared.resolveOperatorsDownWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE)) { +// Handle `Generate` specially here, because `Generate.generatorOutput` starts with +// `UnresolvedAttribute` but we should never resolve it to outer references. It's a bit +// hacky that `Generate` uses `UnresolvedAttribute` to store the generator column names, +// we should clean it up later. +case g: Generate if g.childrenResolved && !g.resolved => + val newGenerator = g.generator.transformWithPruning( +_.containsPattern(UNRESOLVED_ATTRIBUTE))(resolveOuterReference) + val resolved = g.copy(generator = newGenerator.asInstanceOf[Generator]) + resolved.copyTagsFrom(g) + resolved +case q: LogicalPlan if q.childrenResolved && !q.resolved => + q.transformExpressionsWithPruning( +_.containsPattern(UNRESOLVED_ATTRIBUTE))(resolveOuterReference) + } +} + +private val resolveOuterReference: PartialFunction[Expression, Expression] = { + case u @ UnresolvedAttribute(nameParts) => withPosition(u) { +try { + AnalysisContext.get.outerPlan.get.resolveChildren(nameParts, resolver) match { +case Some(resolved) => wrapOuterReference(resolved) +case None => u + } +} catch { + case ae: AnalysisException => +logDebug(ae.getMessage) +u +} + } +} + } + /** * Checks whether a function identifier referenced by an [[UnresolvedFunction]] is defined in the * function registry. Note that this rule doesn't try to resolve the [[UnresolvedFunction]]. It @@ -2482,65 +2528,27 @@ class Analyzer(override val catalogManager: CatalogManager) */ object Resolve
[spark] branch master updated: [SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just clearing
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d67e22826cd [SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just clearing d67e22826cd is described below commit d67e22826cda41d732e010d73687e74fab60f4b6 Author: Adam Binford AuthorDate: Thu Dec 1 15:50:13 2022 +0900 [SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just clearing ### What changes were proposed in this pull request? Instead of just calling `writeBatch.clear`, close the write batch and recreate it. ### Why are the changes needed? A RocksDB `WriteBatch` (and by extension `WriteBatchWithIndex`) stores its underlying data in a `std::string`. Why? I'm not sure. But after a partition is finished, `writeBatch.clear()` is called (somewhat indirectly through a call to `store.abort`), presumably clearing the data in the `WriteBatch`. This calls `std::string::clear` followed by `std::string::resize` underneath the hood. However, neither of these two things actually reclaims native memory. All the memory allocated for ex [...] ### Does this PR introduce _any_ user-facing change? Fix for excess native memory usage. ### How was this patch tested? Existing UTs, not sure how to test for memory usage. Closes #38853 from Kimahriman/rocksdb-write-batch-close. Lead-authored-by: Adam Binford Co-authored-by: centos Signed-off-by: Jungtaek Lim --- .../spark/sql/execution/streaming/state/RocksDB.scala | 13 ++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 3e1bcbbbf0d..5acd20f49dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -63,7 +63,7 @@ class RocksDB( private val readOptions = new ReadOptions() // used for gets private val writeOptions = new WriteOptions().setSync(true) // wait for batched write to complete private val flushOptions = new FlushOptions().setWaitForFlush(true) // wait for flush to complete - private val writeBatch = new WriteBatchWithIndex(true) // overwrite multiple updates to a key + private var writeBatch = new WriteBatchWithIndex(true) // overwrite multiple updates to a key private val bloomFilter = new BloomFilter() private val tableFormatConfig = new BlockBasedTableConfig() @@ -135,7 +135,7 @@ class RocksDB( } // reset resources to prevent side-effects from previous loaded version closePrefixScanIterators() - writeBatch.clear() + resetWriteBatch() logInfo(s"Loaded $version") } catch { case t: Throwable => @@ -328,7 +328,7 @@ class RocksDB( */ def rollback(): Unit = { closePrefixScanIterators() -writeBatch.clear() +resetWriteBatch() numKeysOnWritingVersion = numKeysOnLoadedVersion release() logInfo(s"Rolled back to $loadedVersion") @@ -455,6 +455,13 @@ class RocksDB( prefixScanReuseIter.clear() } + /** Create a new WriteBatch, clear doesn't deallocate the native memory */ + private def resetWriteBatch(): Unit = { +writeBatch.clear() +writeBatch.close() +writeBatch = new WriteBatchWithIndex(true) + } + private def getDBProperty(property: String): Long = { db.getProperty(property).toLong } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just clearing
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new bdafe574865 [SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just clearing bdafe574865 is described below commit bdafe574865696e79eaa959dabef913cba5857ce Author: Adam Binford AuthorDate: Thu Dec 1 15:50:13 2022 +0900 [SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just clearing ### What changes were proposed in this pull request? Instead of just calling `writeBatch.clear`, close the write batch and recreate it. ### Why are the changes needed? A RocksDB `WriteBatch` (and by extension `WriteBatchWithIndex`) stores its underlying data in a `std::string`. Why? I'm not sure. But after a partition is finished, `writeBatch.clear()` is called (somewhat indirectly through a call to `store.abort`), presumably clearing the data in the `WriteBatch`. This calls `std::string::clear` followed by `std::string::resize` underneath the hood. However, neither of these two things actually reclaims native memory. All the memory allocated for ex [...] ### Does this PR introduce _any_ user-facing change? Fix for excess native memory usage. ### How was this patch tested? Existing UTs, not sure how to test for memory usage. Closes #38853 from Kimahriman/rocksdb-write-batch-close. Lead-authored-by: Adam Binford Co-authored-by: centos Signed-off-by: Jungtaek Lim (cherry picked from commit d67e22826cda41d732e010d73687e74fab60f4b6) Signed-off-by: Jungtaek Lim --- .../spark/sql/execution/streaming/state/RocksDB.scala | 13 ++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index a5bd489e04f..66e14f6bff1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -63,7 +63,7 @@ class RocksDB( private val readOptions = new ReadOptions() // used for gets private val writeOptions = new WriteOptions().setSync(true) // wait for batched write to complete private val flushOptions = new FlushOptions().setWaitForFlush(true) // wait for flush to complete - private val writeBatch = new WriteBatchWithIndex(true) // overwrite multiple updates to a key + private var writeBatch = new WriteBatchWithIndex(true) // overwrite multiple updates to a key private val bloomFilter = new BloomFilter() private val tableFormatConfig = new BlockBasedTableConfig() @@ -134,7 +134,7 @@ class RocksDB( } // reset resources to prevent side-effects from previous loaded version closePrefixScanIterators() - writeBatch.clear() + resetWriteBatch() logInfo(s"Loaded $version") } catch { case t: Throwable => @@ -327,7 +327,7 @@ class RocksDB( */ def rollback(): Unit = { closePrefixScanIterators() -writeBatch.clear() +resetWriteBatch() numKeysOnWritingVersion = numKeysOnLoadedVersion release() logInfo(s"Rolled back to $loadedVersion") @@ -454,6 +454,13 @@ class RocksDB( prefixScanReuseIter.clear() } + /** Create a new WriteBatch, clear doesn't deallocate the native memory */ + private def resetWriteBatch(): Unit = { +writeBatch.clear() +writeBatch.close() +writeBatch = new WriteBatchWithIndex(true) + } + private def getDBProperty(property: String): Long = { db.getProperty(property).toLong } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org