[spark] branch master updated (5fc482eb591 -> d08ab7e24b3)

2022-11-30 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 5fc482eb591 [SPARK-41315][CONNECT][PYTHON] Implement 
`DataFrame.replace` and `DataFrame.na.replace`
 add d08ab7e24b3 [SPARK-41332][CONNECT][PYTHON] Fix `nullOrdering` in 
`SortOrder`

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/column.py   | 26 ++
 python/pyspark/sql/connect/plan.py | 37 +++
 .../sql/tests/connect/test_connect_basic.py| 42 ++
 .../sql/tests/connect/test_connect_plan_only.py| 34 +-
 4 files changed, 106 insertions(+), 33 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41325][CONNECT] Fix missing avg() for GroupBy on DF

2022-11-30 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 32ff77cdb8e [SPARK-41325][CONNECT] Fix missing avg() for GroupBy on DF
32ff77cdb8e is described below

commit 32ff77cdb8ef4973494beb1a31ced05ea493dc6d
Author: Martin Grund 
AuthorDate: Wed Nov 30 19:06:12 2022 +0800

[SPARK-41325][CONNECT] Fix missing avg() for GroupBy on DF

### What changes were proposed in this pull request?
Previously, the `avg` function was missing in the `GroupedData` class. This 
patch adds this method and the necessary plan transformation using an 
unresolved function. In addition, it identified a small issue where when an 
alias is used for a grouping column, the planner would incorrectly try to wrap 
the existing alias expression using an unresolved alias which would then fail.

```
df = (
self.connect.range(10)
.groupBy((col("id") % lit(2)).alias("moded"))
.avg("id")
.sort("moded")
)
```

### Why are the changes needed?
Bug / Compatibility

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #38841 from grundprinzip/SPARK-41325.

Authored-by: Martin Grund 
Signed-off-by: Ruifeng Zheng 
---
 .../spark/sql/connect/planner/SparkConnectPlanner.scala |  3 ++-
 python/pyspark/sql/connect/dataframe.py |  4 
 python/pyspark/sql/tests/connect/test_connect_basic.py  | 13 +
 3 files changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 7b9e13cadab..d1d4c3d4fa9 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -682,7 +682,8 @@ class SparkConnectPlanner(session: SparkSession) {
   rel.getGroupingExpressionsList.asScala
 .map(transformExpression)
 .map {
-  case x @ UnresolvedAttribute(_) => x
+  case ua @ UnresolvedAttribute(_) => ua
+  case a @ Alias(_, _) => a
   case x => UnresolvedAlias(x)
 }
 
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index c9960a71fb8..ebfb52cdd74 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -84,6 +84,10 @@ class GroupedData(object):
 expr = self._map_cols_to_expression("sum", col)
 return self.agg(expr)
 
+def avg(self, col: Union[Column, str]) -> "DataFrame":
+expr = self._map_cols_to_expression("avg", col)
+return self.agg(expr)
+
 def count(self) -> "DataFrame":
 return self.agg([scalar_function("count", lit(1))])
 
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index f518a09ad4a..22d57994794 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -837,6 +837,19 @@ class SparkConnectTests(SparkConnectSQLTestCase):
 ndf = self.connect.read.table("parquet_test")
 self.assertEqual(set(df.collect()), set(ndf.collect()))
 
+def test_agg_with_avg(self):
+# SPARK-41325: groupby.avg()
+df = (
+self.connect.range(10)
+.groupBy((col("id") % lit(2)).alias("moded"))
+.avg("id")
+.sort("moded")
+)
+res = df.collect()
+self.assertEqual(2, len(res))
+self.assertEqual(4.0, res[0][1])
+self.assertEqual(5.0, res[1][1])
+
 
 class ChannelBuilderTests(ReusedPySparkTestCase):
 def test_invalid_connection_strings(self):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41328][CONNECT][PYTHON][FOLLOW-UP] Simplify startsWith and endsWith

2022-11-30 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 32b863866a5 [SPARK-41328][CONNECT][PYTHON][FOLLOW-UP] Simplify 
startsWith and endsWith
32b863866a5 is described below

commit 32b863866a5e4ff88ba3b111cdaefaf9f984039f
Author: Rui Wang 
AuthorDate: Wed Nov 30 19:15:29 2022 +0800

[SPARK-41328][CONNECT][PYTHON][FOLLOW-UP] Simplify startsWith and endsWith

### What changes were proposed in this pull request?

1. `startsWith` and `endsWith` can be implemented by simplify invoking 
`_bin_op` with additional documentation.
2.  Remove not working examples in the documentation.

### Why are the changes needed?

Codebase simplification.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

Existing UT.

Closes #38849 from amaliujia/simplify_strings.

Authored-by: Rui Wang 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/connect/column.py | 53 +++-
 1 file changed, 16 insertions(+), 37 deletions(-)

diff --git a/python/pyspark/sql/connect/column.py 
b/python/pyspark/sql/connect/column.py
index f9241b0bd58..e02508d6114 100644
--- a/python/pyspark/sql/connect/column.py
+++ b/python/pyspark/sql/connect/column.py
@@ -520,45 +520,24 @@ class Column(object):
 """
 return _bin_op("contains")(self, other)
 
-def startswith(self, other: Union[PrimitiveType, "Column"]) -> "Column":
-"""
-String starts with. Returns a boolean :class:`Column` based on a 
string match.
-
-Parameters
---
-other : :class:`Column` or str
-string at start of line (do not use a regex `^`)
-
-Examples
-
->>> df = spark.createDataFrame(
-...  [(2, "Alice"), (5, "Bob")], ["age", "name"])
->>> df.filter(df.name.startswith('Al')).collect()
-[Row(age=2, name='Alice')]
->>> df.filter(df.name.startswith('^Al')).collect()
-[]
-"""
-return _bin_op("startsWith")(self, other)
+_startswith_doc = """
+String starts with. Returns a boolean :class:`Column` based on a string 
match.
 
-def endswith(self, other: Union[PrimitiveType, "Column"]) -> "Column":
-"""
-String ends with. Returns a boolean :class:`Column` based on a string 
match.
-
-Parameters
---
-other : :class:`Column` or str
-string at end of line (do not use a regex `$`)
+Parameters
+--
+other : :class:`Column` or str
+string at start of line (do not use a regex `^`)
+"""
+_endswith_doc = """
+String ends with. Returns a boolean :class:`Column` based on a string 
match.
 
-Examples
-
->>> df = spark.createDataFrame(
-...  [(2, "Alice"), (5, "Bob")], ["age", "name"])
->>> df.filter(df.name.endswith('ice')).collect()
-[Row(age=2, name='Alice')]
->>> df.filter(df.name.endswith('ice$')).collect()
-[]
-"""
-return _bin_op("endsWith")(self, other)
+Parameters
+--
+other : :class:`Column` or str
+string at end of line (do not use a regex `$`)
+"""
+startswith = _bin_op("startsWith", _startswith_doc)
+endswith = _bin_op("endsWith", _endswith_doc)
 
 def like(self: "Column", other: str) -> "Column":
 """


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (32b863866a5 -> b64a95f72fe)

2022-11-30 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 32b863866a5 [SPARK-41328][CONNECT][PYTHON][FOLLOW-UP] Simplify 
startsWith and endsWith
 add b64a95f72fe [SPARK-41326][CONNECT][FOLLOW-UP] Add e2e tests for 
distinct and dropDuplicates

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/tests/connect/test_connect_basic.py | 10 ++
 1 file changed, 10 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b64a95f72fe -> 5b13a51dc0a)

2022-11-30 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from b64a95f72fe [SPARK-41326][CONNECT][FOLLOW-UP] Add e2e tests for 
distinct and dropDuplicates
 add 5b13a51dc0a [SPARK-41335][CONNECT][PYTHON] Support IsNull and 
IsNotNull in Column

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/column.py   | 17 +
 python/pyspark/sql/tests/connect/test_connect_basic.py | 10 ++
 2 files changed, 27 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (5b13a51dc0a -> 70502d7a043)

2022-11-30 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 5b13a51dc0a [SPARK-41335][CONNECT][PYTHON] Support IsNull and 
IsNotNull in Column
 add 70502d7a043 
[SPARK-41276][SQL][ML][MLLIB][PROTOBUF][PYTHON][R][SS][AVRO] Optimize 
constructor use of `StructType`

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/avro/SchemaConverters.scala |  4 ++--
 .../spark/sql/protobuf/utils/SchemaConverters.scala  |  2 +-
 .../main/scala/org/apache/spark/ml/fpm/FPGrowth.scala|  4 ++--
 .../main/scala/org/apache/spark/ml/fpm/PrefixSpan.scala  |  2 +-
 .../scala/org/apache/spark/ml/image/ImageSchema.scala| 16 
 .../scala/org/apache/spark/ml/linalg/MatrixUDT.scala |  2 +-
 .../scala/org/apache/spark/ml/linalg/VectorUDT.scala |  2 +-
 .../apache/spark/ml/source/libsvm/LibSVMRelation.scala   |  2 +-
 .../scala/org/apache/spark/mllib/linalg/Matrices.scala   |  2 +-
 .../scala/org/apache/spark/mllib/linalg/Vectors.scala|  2 +-
 .../spark/ml/source/libsvm/LibSVMRelationSuite.scala |  6 +++---
 .../spark/sql/catalyst/expressions/jsonExpressions.scala |  2 +-
 .../apache/spark/sql/catalyst/json/JsonInferSchema.scala |  2 +-
 .../sql/catalyst/optimizer/OptimizeCsvJsonExprs.scala|  6 +++---
 .../apache/spark/sql/catalyst/parser/AstBuilder.scala|  8 
 .../catalyst/plans/logical/basicLogicalOperators.scala   |  2 +-
 .../scala/org/apache/spark/sql/types/StructType.scala|  6 +++---
 .../scala/org/apache/spark/sql/util/ArrowUtils.scala |  4 ++--
 .../main/scala/org/apache/spark/sql/api/r/SQLUtils.scala |  2 +-
 .../apache/spark/sql/execution/command/SetCommand.scala  | 16 
 .../apache/spark/sql/execution/command/functions.scala   |  2 +-
 .../org/apache/spark/sql/execution/command/tables.scala  |  2 +-
 .../datasources/binaryfile/BinaryFileFormat.scala| 10 +-
 .../spark/sql/execution/datasources/orc/OrcUtils.scala   |  2 +-
 .../sql/execution/datasources/v2/text/TextTable.scala|  2 +-
 .../sql/execution/python/AggregateInPandasExec.scala |  2 +-
 .../spark/sql/execution/python/EvalPythonExec.scala  |  2 +-
 .../spark/sql/execution/python/MapInBatchExec.scala  |  2 +-
 .../streaming/sources/RatePerMicroBatchProvider.scala|  2 +-
 .../execution/streaming/sources/RateStreamProvider.scala |  2 +-
 .../streaming/sources/TextSocketSourceProvider.scala |  6 +++---
 .../scala/org/apache/spark/sql/hive/HiveInspectors.scala |  2 +-
 .../main/scala/org/apache/spark/sql/hive/HiveUtils.scala |  2 +-
 .../apache/spark/sql/hive/client/HiveClientImpl.scala|  2 +-
 .../main/scala/org/apache/spark/sql/hive/hiveUDFs.scala  |  2 +-
 35 files changed, 67 insertions(+), 67 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] dongjoon-hyun commented on pull request #428: Add 3.2.3 announcement news, release note and download link

2022-11-30 Thread GitBox


dongjoon-hyun commented on PR #428:
URL: https://github.com/apache/spark-website/pull/428#issuecomment-1332622082

   Thank you so much, @gengliangwang !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info

2022-11-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 388824c4488 [SPARK-41327][CORE] Fix 
`SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info
388824c4488 is described below

commit 388824c448804161b076507f0f39ef0596e0a0bf
Author: Lingyun Yuan 
AuthorDate: Wed Nov 30 11:47:15 2022 -0800

[SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch 
On/OffHeapStorageMemory info

### What changes were proposed in this pull request?

This PR aims to fix `SparkStatusTracker.getExecutorInfos` to return a 
correct `on/offHeapStorageMemory`.

### Why are the changes needed?

`SparkExecutorInfoImpl` used the following parameter order.

https://github.com/apache/spark/blob/54c57fa86906f933e089a33ef25ae0c053769cc8/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala#L42-L45

SPARK-20659 introduced a bug with wrong parameter order at Apache Spark 
2.4.0.
- 
https://github.com/apache/spark/pull/20546/files#diff-7daca909d33ff8e9b4938e2b4a4aaa1558fbdf4604273b9e38cce32c55e1508cR118-R121

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

Manually review.

Closes #38843 from ylybest/master.

Lead-authored-by: Lingyun Yuan 
Co-authored-by: ylybest <119458293+ylyb...@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/SparkStatusTracker.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala 
b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index 37e673cd8c7..22dc1d056ec 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -114,10 +114,10 @@ class SparkStatusTracker private[spark] (sc: 
SparkContext, store: AppStatusStore
 port,
 cachedMem,
 exec.activeTasks,
-exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L),
 exec.memoryMetrics.map(_.usedOnHeapStorageMemory).getOrElse(0L),
-exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L),
-exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L))
+exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L),
+exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L),
+exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L))
 }.toArray
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info

2022-11-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new a94dd1820d6 [SPARK-41327][CORE] Fix 
`SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info
a94dd1820d6 is described below

commit a94dd1820d65e6280941047dbb4abb15bf429bc3
Author: Lingyun Yuan 
AuthorDate: Wed Nov 30 11:47:15 2022 -0800

[SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch 
On/OffHeapStorageMemory info

### What changes were proposed in this pull request?

This PR aims to fix `SparkStatusTracker.getExecutorInfos` to return a 
correct `on/offHeapStorageMemory`.

### Why are the changes needed?

`SparkExecutorInfoImpl` used the following parameter order.

https://github.com/apache/spark/blob/54c57fa86906f933e089a33ef25ae0c053769cc8/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala#L42-L45

SPARK-20659 introduced a bug with wrong parameter order at Apache Spark 
2.4.0.
- 
https://github.com/apache/spark/pull/20546/files#diff-7daca909d33ff8e9b4938e2b4a4aaa1558fbdf4604273b9e38cce32c55e1508cR118-R121

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

Manually review.

Closes #38843 from ylybest/master.

Lead-authored-by: Lingyun Yuan 
Co-authored-by: ylybest <119458293+ylyb...@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 388824c448804161b076507f0f39ef0596e0a0bf)
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/SparkStatusTracker.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala 
b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index 37e673cd8c7..22dc1d056ec 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -114,10 +114,10 @@ class SparkStatusTracker private[spark] (sc: 
SparkContext, store: AppStatusStore
 port,
 cachedMem,
 exec.activeTasks,
-exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L),
 exec.memoryMetrics.map(_.usedOnHeapStorageMemory).getOrElse(0L),
-exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L),
-exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L))
+exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L),
+exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L),
+exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L))
 }.toArray
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info

2022-11-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 44b6db8d7a5 [SPARK-41327][CORE] Fix 
`SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info
44b6db8d7a5 is described below

commit 44b6db8d7a5828763774b21270cfc9de426b3f9a
Author: Lingyun Yuan 
AuthorDate: Wed Nov 30 11:47:15 2022 -0800

[SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch 
On/OffHeapStorageMemory info

### What changes were proposed in this pull request?

This PR aims to fix `SparkStatusTracker.getExecutorInfos` to return a 
correct `on/offHeapStorageMemory`.

### Why are the changes needed?

`SparkExecutorInfoImpl` used the following parameter order.

https://github.com/apache/spark/blob/54c57fa86906f933e089a33ef25ae0c053769cc8/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala#L42-L45

SPARK-20659 introduced a bug with wrong parameter order at Apache Spark 
2.4.0.
- 
https://github.com/apache/spark/pull/20546/files#diff-7daca909d33ff8e9b4938e2b4a4aaa1558fbdf4604273b9e38cce32c55e1508cR118-R121

### Does this PR introduce _any_ user-facing change?

Yes.

### How was this patch tested?

Manually review.

Closes #38843 from ylybest/master.

Lead-authored-by: Lingyun Yuan 
Co-authored-by: ylybest <119458293+ylyb...@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 388824c448804161b076507f0f39ef0596e0a0bf)
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/SparkStatusTracker.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala 
b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index 37e673cd8c7..22dc1d056ec 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -114,10 +114,10 @@ class SparkStatusTracker private[spark] (sc: 
SparkContext, store: AppStatusStore
 port,
 cachedMem,
 exec.activeTasks,
-exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L),
 exec.memoryMetrics.map(_.usedOnHeapStorageMemory).getOrElse(0L),
-exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L),
-exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L))
+exec.memoryMetrics.map(_.usedOffHeapStorageMemory).getOrElse(0L),
+exec.memoryMetrics.map(_.totalOnHeapStorageMemory).getOrElse(0L),
+exec.memoryMetrics.map(_.totalOffHeapStorageMemory).getOrElse(0L))
 }.toArray
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41321][CONNECT] Support target field for UnresolvedStar

2022-11-30 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7b200898967 [SPARK-41321][CONNECT] Support target field for 
UnresolvedStar
7b200898967 is described below

commit 7b20089896716a5fa7cad595bd560640d1b5afcf
Author: dengziming 
AuthorDate: Thu Dec 1 10:40:00 2022 +0800

[SPARK-41321][CONNECT] Support target field for UnresolvedStar

### What changes were proposed in this pull request?
1. Support target field UnresolvedStar
2. UnresolvedStar can be used simultaneously with other expression.

### Why are the changes needed?
This is a necessary feature for UnresolvedStar

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added 2 new unit tests.

Closes #38838 from dengziming/SPARK-41321.

Authored-by: dengziming 
Signed-off-by: Ruifeng Zheng 
---
 .../main/protobuf/spark/connect/expressions.proto  |   4 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  |  17 ++--
 .../connect/planner/SparkConnectPlannerSuite.scala | 105 -
 .../pyspark/sql/connect/proto/expressions_pb2.py   |  55 ++-
 .../pyspark/sql/connect/proto/expressions_pb2.pyi  |  13 +++
 5 files changed, 158 insertions(+), 36 deletions(-)

diff --git 
a/connector/connect/src/main/protobuf/spark/connect/expressions.proto 
b/connector/connect/src/main/protobuf/spark/connect/expressions.proto
index 2a1159c1d04..b90f7619b8f 100644
--- a/connector/connect/src/main/protobuf/spark/connect/expressions.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/expressions.proto
@@ -18,7 +18,6 @@
 syntax = 'proto3';
 
 import "spark/connect/types.proto";
-import "google/protobuf/any.proto";
 
 package spark.connect;
 
@@ -142,6 +141,9 @@ message Expression {
 
   // UnresolvedStar is used to expand all the fields of a relation or struct.
   message UnresolvedStar {
+// (Optional) The target of the expansion, either be a table name or 
struct name, this
+// is a list of identifiers that is the path of the expansion.
+repeated string target = 1;
   }
 
   message Alias {
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index d1d4c3d4fa9..5ebe7c7cce3 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -392,13 +392,8 @@ class SparkConnectPlanner(session: SparkSession) {
 } else {
   logical.OneRowRelation()
 }
-// TODO: support the target field for *.
 val projection =
-  if (rel.getExpressionsCount == 1 && 
rel.getExpressions(0).hasUnresolvedStar) {
-Seq(UnresolvedStar(Option.empty))
-  } else {
-
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
-  }
+  
rel.getExpressionsList.asScala.map(transformExpression).map(UnresolvedAlias(_))
 logical.Project(projectList = projection.toSeq, child = baseRel)
   }
 
@@ -416,6 +411,8 @@ class SparkConnectPlanner(session: SparkSession) {
   case proto.Expression.ExprTypeCase.ALIAS => transformAlias(exp.getAlias)
   case proto.Expression.ExprTypeCase.EXPRESSION_STRING =>
 transformExpressionString(exp.getExpressionString)
+  case proto.Expression.ExprTypeCase.UNRESOLVED_STAR =>
+transformUnresolvedStar(exp.getUnresolvedStar)
   case _ =>
 throw InvalidPlanInput(
   s"Expression with ID: ${exp.getExprTypeCase.getNumber} is not 
supported")
@@ -573,6 +570,14 @@ class SparkConnectPlanner(session: SparkSession) {
 session.sessionState.sqlParser.parseExpression(expr.getExpression)
   }
 
+  private def transformUnresolvedStar(regex: proto.Expression.UnresolvedStar): 
Expression = {
+if (regex.getTargetList.isEmpty) {
+  UnresolvedStar(Option.empty)
+} else {
+  UnresolvedStar(Some(regex.getTargetList.asScala.toSeq))
+}
+  }
+
   private def transformSetOperation(u: proto.SetOperation): LogicalPlan = {
 assert(u.hasLeftInput && u.hasRightInput, "Union must have 2 inputs")
 
diff --git 
a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
 
b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
index 8fbf2be3730..81e5ee3d0ce 100644
--- 
a/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
+++ 
b/connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
@@ -23,7 +23,7 @@ import com.google.protobuf.ByteString
 
 impor

[spark] branch master updated: [SPARK-41226][SQL] Refactor Spark types by introducing physical types

2022-11-30 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3d598594cd6 [SPARK-41226][SQL] Refactor Spark types by introducing 
physical types
3d598594cd6 is described below

commit 3d598594cd66eee481bf15da7542059082e148ef
Author: Desmond Cheong 
AuthorDate: Thu Dec 1 11:24:33 2022 +0800

[SPARK-41226][SQL] Refactor Spark types by introducing physical types

### What changes were proposed in this pull request?

Refactor Spark types by introducing physical types. Multiple logical types 
match to the same physical type, for example `DateType` and 
`YearMonthIntervalType` are both implemented using `IntegerType`. Since this is 
the case, we can simplify case matching logic on Spark types by matching their 
physical types rather than listing all possible logical types.

### Why are the changes needed?

These changes simplify the Spark type system.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Since this code is a refactor of existing code, we rely on existing tests.

Closes #38750 from desmondcheongzx/refactor-using-physical-types.

Authored-by: Desmond Cheong 
Signed-off-by: Wenchen Fan 
---
 .../expressions/SpecializedGettersReader.java  |  51 +++---
 .../spark/sql/vectorized/ColumnarBatchRow.java |  36 ++---
 .../apache/spark/sql/vectorized/ColumnarRow.java   |  36 ++---
 .../apache/spark/sql/catalyst/InternalRow.scala|  36 +++--
 .../spark/sql/catalyst/encoders/RowEncoder.scala   |  20 ++-
 .../expressions/InterpretedUnsafeProjection.scala  | 172 ++---
 .../expressions/codegen/CodeGenerator.scala|  55 ---
 .../spark/sql/catalyst/expressions/literals.scala  |  65 
 .../sql/catalyst/types/PhysicalDataType.scala  |  66 
 .../org/apache/spark/sql/types/ArrayType.scala |   4 +
 .../org/apache/spark/sql/types/BinaryType.scala|   3 +
 .../org/apache/spark/sql/types/BooleanType.scala   |   3 +
 .../org/apache/spark/sql/types/ByteType.scala  |   3 +
 .../spark/sql/types/CalendarIntervalType.scala |   3 +
 .../org/apache/spark/sql/types/CharType.scala  |   2 +
 .../org/apache/spark/sql/types/DataType.scala  |   3 +
 .../org/apache/spark/sql/types/DateType.scala  |   3 +
 .../spark/sql/types/DayTimeIntervalType.scala  |   3 +
 .../org/apache/spark/sql/types/DecimalType.scala   |   3 +
 .../org/apache/spark/sql/types/DoubleType.scala|   3 +
 .../org/apache/spark/sql/types/FloatType.scala |   3 +
 .../org/apache/spark/sql/types/IntegerType.scala   |   3 +
 .../org/apache/spark/sql/types/LongType.scala  |   3 +
 .../scala/org/apache/spark/sql/types/MapType.scala |   4 +
 .../org/apache/spark/sql/types/NullType.scala  |   3 +
 .../org/apache/spark/sql/types/ShortType.scala |   3 +
 .../org/apache/spark/sql/types/StringType.scala|   3 +
 .../org/apache/spark/sql/types/StructType.scala|   3 +
 .../apache/spark/sql/types/TimestampNTZType.scala  |   3 +
 .../org/apache/spark/sql/types/TimestampType.scala |   3 +
 .../org/apache/spark/sql/types/VarcharType.scala   |   2 +
 .../spark/sql/types/YearMonthIntervalType.scala|   3 +
 32 files changed, 367 insertions(+), 239 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java
index 90857c667ab..c5a7d34281f 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/SpecializedGettersReader.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions;
 
+import org.apache.spark.sql.catalyst.types.*;
 import org.apache.spark.sql.types.*;
 
 public final class SpecializedGettersReader {
@@ -28,70 +29,56 @@ public final class SpecializedGettersReader {
   DataType dataType,
   boolean handleNull,
   boolean handleUserDefinedType) {
-if (handleNull && (obj.isNullAt(ordinal) || dataType instanceof NullType)) 
{
+PhysicalDataType physicalDataType = dataType.physicalDataType();
+if (handleNull && (obj.isNullAt(ordinal) || physicalDataType instanceof 
PhysicalNullType)) {
   return null;
 }
-if (dataType instanceof BooleanType) {
+if (physicalDataType instanceof PhysicalBooleanType) {
   return obj.getBoolean(ordinal);
 }
-if (dataType instanceof ByteType) {
+if (physicalDataType instanceof PhysicalByteType) {
   return obj.getByte(ordinal);
 }
-if (dataType instanceof ShortType) {
+if (physicalDataType instanceof PhysicalShortType) {
   return obj.getShort(ord

[spark] branch master updated: [SPARK-41227][CONNECT][PYTHON] Implement DataFrame cross join

2022-11-30 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 93b9deedbd8 [SPARK-41227][CONNECT][PYTHON] Implement DataFrame cross 
join
93b9deedbd8 is described below

commit 93b9deedbd81dec79badec0e761f12c779b019f6
Author: Xinrong Meng 
AuthorDate: Thu Dec 1 13:05:51 2022 +0800

[SPARK-41227][CONNECT][PYTHON] Implement DataFrame cross join

### What changes were proposed in this pull request?
Implement DataFrame cross join for Spark Connect.

That consists of
- `DataFrame.crossJoin`
- `DataFrame.join(.., how="cross")`.

### Why are the changes needed?
Part of [SPARK-39375](https://issues.apache.org/jira/browse/SPARK-39375).

### Does this PR introduce _any_ user-facing change?
Yes. `DataFrame.crossJoin` and `DataFrame.join(.., how="cross")` are 
supported as shown below.

```py
>>> from pyspark.sql.connect.client import RemoteSparkSession
>>> cspark = RemoteSparkSession()
>>> df = cspark.range(1, 3)

>>> df.crossJoin(df).show()
+---+---+
| id| id|
+---+---+
|  1|  1|
|  1|  2|
|  2|  1|
|  2|  2|
+---+---+

>>> df.join(other=df, how="cross").show()
+---+---+
| id| id|
+---+---+
|  1|  1|
|  1|  2|
|  2|  1|
|  2|  2|
+---+---+
```

### How was this patch tested?
Unit tests.

Closes #38778 from xinrong-meng/connect_crossjoin.

Authored-by: Xinrong Meng 
Signed-off-by: Ruifeng Zheng 
---
 .../main/protobuf/spark/connect/relations.proto|   1 +
 .../sql/connect/planner/SparkConnectPlanner.scala  |   3 +-
 python/pyspark/sql/connect/dataframe.py|  25 -
 python/pyspark/sql/connect/plan.py |   4 +-
 python/pyspark/sql/connect/proto/relations_pb2.py  | 118 ++---
 python/pyspark/sql/connect/proto/relations_pb2.pyi |   2 +
 .../sql/tests/connect/test_connect_basic.py|  17 +++
 .../sql/tests/connect/test_connect_plan_only.py|   8 ++
 .../sql/tests/connect/test_connect_select_ops.py   |   1 +
 9 files changed, 117 insertions(+), 62 deletions(-)

diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/src/main/protobuf/spark/connect/relations.proto
index 8b87845245f..f4df95fdd73 100644
--- a/connector/connect/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto
@@ -166,6 +166,7 @@ message Join {
 JOIN_TYPE_RIGHT_OUTER = 4;
 JOIN_TYPE_LEFT_ANTI = 5;
 JOIN_TYPE_LEFT_SEMI = 6;
+JOIN_TYPE_CROSS = 7;
   }
 }
 
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 5ebe7c7cce3..6b11cbea7a5 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
-import org.apache.spark.sql.catalyst.plans.{logical, FullOuter, Inner, 
JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin}
+import org.apache.spark.sql.catalyst.plans.{logical, Cross, FullOuter, Inner, 
JoinType, LeftAnti, LeftOuter, LeftSemi, RightOuter, UsingJoin}
 import org.apache.spark.sql.catalyst.plans.logical.{Deduplicate, Except, 
Intersect, LocalRelation, LogicalPlan, Sample, SubqueryAlias, Union}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -641,6 +641,7 @@ class SparkConnectPlanner(session: SparkSession) {
   case proto.Join.JoinType.JOIN_TYPE_LEFT_OUTER => LeftOuter
   case proto.Join.JoinType.JOIN_TYPE_RIGHT_OUTER => RightOuter
   case proto.Join.JoinType.JOIN_TYPE_LEFT_SEMI => LeftSemi
+  case proto.Join.JoinType.JOIN_TYPE_CROSS => Cross
   case _ => throw InvalidPlanInput(s"Join type ${t} is not supported")
 }
   }
diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index ebfb52cdd74..749aab7c859 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -230,7 +230,30 @@ class DataFrame(object):
 return pdd.iloc[0, 0]
 
 def crossJoin(self, other: "DataFrame") -> "DataFrame":
-...
+"""
+Returns the cartesian product with another :class:`DataFrame`.
+
+.. versionadded:: 3.4.0
+
+Parame

[spark] branch master updated: [SPARK-41343][CONNECT] Move FunctionName parsing to server side

2022-11-30 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ce41ca0848e [SPARK-41343][CONNECT] Move FunctionName parsing to server 
side
ce41ca0848e is described below

commit ce41ca0848e740026048aa08cb1062cc4d5082d1
Author: Rui Wang 
AuthorDate: Thu Dec 1 13:27:03 2022 +0800

[SPARK-41343][CONNECT] Move FunctionName parsing to server side

### What changes were proposed in this pull request?

This PR propose to change the name of `UnresolvedFunction` from a sequence 
of name parts to a single name string, which help to move the function name 
parsing to server side.

For built-in functions, there is no need to even call SQL parser to parse 
the name (built-in functions should not belong to any catalog or database).

### Why are the changes needed?

This will help reduce redundant implementation on client sides to parse 
function names.

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

UT

Closes #38854 from amaliujia/function_name_parse.

Authored-by: Rui Wang 
Signed-off-by: Wenchen Fan 
---
 .../main/protobuf/spark/connect/expressions.proto  | 10 ++--
 .../org/apache/spark/sql/connect/dsl/package.scala | 11 +
 .../sql/connect/planner/SparkConnectPlanner.scala  | 22 +
 .../connect/planner/SparkConnectPlannerSuite.scala |  4 ++--
 .../connect/planner/SparkConnectProtoSuite.scala   |  4 
 .../connect/planner/SparkConnectServiceSuite.scala |  2 +-
 python/pyspark/sql/connect/column.py   |  2 +-
 .../pyspark/sql/connect/proto/expressions_pb2.py   | 20 
 .../pyspark/sql/connect/proto/expressions_pb2.pyi  | 28 +++---
 .../connect/test_connect_column_expressions.py |  4 ++--
 .../sql/tests/connect/test_connect_plan_only.py|  2 +-
 11 files changed, 63 insertions(+), 46 deletions(-)

diff --git 
a/connector/connect/src/main/protobuf/spark/connect/expressions.proto 
b/connector/connect/src/main/protobuf/spark/connect/expressions.proto
index b90f7619b8f..1b93c342381 100644
--- a/connector/connect/src/main/protobuf/spark/connect/expressions.proto
+++ b/connector/connect/src/main/protobuf/spark/connect/expressions.proto
@@ -126,11 +126,17 @@ message Expression {
   // An unresolved function is not explicitly bound to one explicit function, 
but the function
   // is resolved during analysis following Sparks name resolution rules.
   message UnresolvedFunction {
-// (Required) Names parts for the unresolved function.
-repeated string parts = 1;
+// (Required) name (or unparsed name for user defined function) for the 
unresolved function.
+string function_name = 1;
 
 // (Optional) Function arguments. Empty arguments are allowed.
 repeated Expression arguments = 2;
+
+// (Required) Indicate if this is a user defined function.
+//
+// When it is not a user defined function, Connect will use the function 
name directly.
+// When it is a user defined function, Connect will parse the function 
name first.
+bool is_user_defined_function = 3;
   }
 
   // Expression as string.
diff --git 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 654a4d5ce20..1342842cbc9 100644
--- 
a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -83,7 +83,7 @@ package object dsl {
   .setUnresolvedFunction(
 Expression.UnresolvedFunction
   .newBuilder()
-  .addParts("<")
+  .setFunctionName("<")
   .addArguments(expr)
   .addArguments(other))
   .build()
@@ -93,14 +93,14 @@ package object dsl {
   Expression
 .newBuilder()
 .setUnresolvedFunction(
-  
Expression.UnresolvedFunction.newBuilder().addParts("min").addArguments(e))
+  
Expression.UnresolvedFunction.newBuilder().setFunctionName("min").addArguments(e))
 .build()
 
 def proto_explode(e: Expression): Expression =
   Expression
 .newBuilder()
 .setUnresolvedFunction(
-  
Expression.UnresolvedFunction.newBuilder().addParts("explode").addArguments(e))
+  
Expression.UnresolvedFunction.newBuilder().setFunctionName("explode").addArguments(e))
 .build()
 
 /**
@@ -117,7 +117,8 @@ package object dsl {
 .setUnresolvedFunction(
   Expression.UnresolvedFunction
 .newBuilder()
-.addAllParts(nameParts.asJava)
+.setFunctionName(nameParts.mkString("."))
+.setIsUserDefinedF

[spark] branch master updated (ce41ca0848e -> c5f189c5365)

2022-11-30 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from ce41ca0848e [SPARK-41343][CONNECT] Move FunctionName parsing to server 
side
 add c5f189c5365 [SPARK-41237][SQL] Reuse the error class 
`UNSUPPORTED_DATATYPE` for `_LEGACY_ERROR_TEMP_0030`

No new revisions were added by this update.

Summary of changes:
 R/pkg/tests/fulltests/test_sparkSQL.R|  6 +++---
 R/pkg/tests/fulltests/test_streaming.R   |  2 +-
 R/pkg/tests/fulltests/test_utils.R   |  2 +-
 core/src/main/resources/error/error-classes.json |  5 -
 .../org/apache/spark/sql/errors/QueryParsingErrors.scala |  4 ++--
 .../apache/spark/sql/catalyst/parser/DDLParserSuite.scala|  4 ++--
 .../spark/sql/catalyst/parser/DataTypeParserSuite.scala  | 12 ++--
 .../apache/spark/sql/catalyst/parser/ErrorParserSuite.scala  |  4 ++--
 .../test/resources/sql-tests/results/csv-functions.sql.out   |  1 -
 .../test/resources/sql-tests/results/postgreSQL/with.sql.out | 10 ++
 .../sql/execution/datasources/jdbc/JdbcUtilsSuite.scala  |  4 ++--
 .../datasources/v2/jdbc/JDBCTableCatalogSuite.scala  |  4 ++--
 .../scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala |  4 ++--
 13 files changed, 33 insertions(+), 29 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41228][SQL] Rename & Improve error message for `COLUMN_NOT_IN_GROUP_BY_CLAUSE`

2022-11-30 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5badb2446fa [SPARK-41228][SQL] Rename & Improve error message for 
`COLUMN_NOT_IN_GROUP_BY_CLAUSE`
5badb2446fa is described below

commit 5badb2446fa2b51e8ea239ced6c9b44178b2f1fa
Author: itholic 
AuthorDate: Thu Dec 1 09:18:17 2022 +0300

[SPARK-41228][SQL] Rename & Improve error message for 
`COLUMN_NOT_IN_GROUP_BY_CLAUSE`

### What changes were proposed in this pull request?

This PR proposes to rename `COLUMN_NOT_IN_GROUP_BY_CLAUSE` to 
`MISSING_AGGREGATION`.

Also, improve its error message.

### Why are the changes needed?

The current error class name and its error message doesn't illustrate the 
error cause and resolution correctly.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

```
./build/sbt “sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*”
```

Closes #38769 from itholic/SPARK-41128.

Authored-by: itholic 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json  | 13 +++--
 .../sql/tests/pandas/test_pandas_udf_grouped_agg.py   |  2 +-
 .../apache/spark/sql/errors/QueryCompilationErrors.scala  |  7 +--
 .../spark/sql/catalyst/analysis/AnalysisErrorSuite.scala  |  7 +--
 .../src/test/resources/sql-tests/results/extract.sql.out  |  2 ++
 .../resources/sql-tests/results/group-by-filter.sql.out   | 10 ++
 .../src/test/resources/sql-tests/results/group-by.sql.out | 15 +--
 .../test/resources/sql-tests/results/grouping_set.sql.out |  5 +++--
 .../sql-tests/results/postgreSQL/create_view.sql.out  |  5 +++--
 .../sql-tests/results/udaf/udaf-group-by-ordinal.sql.out  | 15 +--
 .../sql-tests/results/udaf/udaf-group-by.sql.out  | 15 +--
 .../resources/sql-tests/results/udf/udf-group-by.sql.out  | 15 +--
 .../org/apache/spark/sql/execution/SQLViewSuite.scala |  5 +++--
 13 files changed, 71 insertions(+), 45 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index a79c02e1f1d..65b6dc68d12 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -109,12 +109,6 @@
   "The column  already exists. Consider to choose another name 
or rename the existing column."
 ]
   },
-  "COLUMN_NOT_IN_GROUP_BY_CLAUSE" : {
-"message" : [
-  "The expression  is neither present in the group by, nor is 
it an aggregate function. Add to group by or wrap in `first()` (or 
`first_value()`) if you don't care which value you get."
-],
-"sqlState" : "42000"
-  },
   "CONCURRENT_QUERY" : {
 "message" : [
   "Another instance of this query was just started by a concurrent 
session."
@@ -830,6 +824,13 @@
   "Malformed Protobuf messages are detected in message deserialization. 
Parse Mode: . To process malformed protobuf message as null 
result, try setting the option 'mode' as 'PERMISSIVE'."
 ]
   },
+  "MISSING_AGGREGATION" : {
+"message" : [
+  "The non-aggregating expression  is based on columns which 
are not participating in the GROUP BY clause.",
+  "Add the columns or the expression to the GROUP BY, aggregate the 
expression, or use  if you do not care which of the values 
within a group is returned."
+],
+"sqlState" : "42000"
+  },
   "MISSING_STATIC_PARTITION_COLUMN" : {
 "message" : [
   "Unknown static partition column: "
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py 
b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
index 6f475624b74..aa844fc5fd5 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_grouped_agg.py
@@ -475,7 +475,7 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
 mean_udf = self.pandas_agg_mean_udf
 
 with QuietTest(self.sc):
-with self.assertRaisesRegex(AnalysisException, "nor.*aggregate 
function"):
+with self.assertRaisesRegex(AnalysisException, 
"[MISSING_AGGREGATION]"):
 df.groupby(df.id).agg(plus_one(df.v)).collect()
 
 with QuietTest(self.sc):
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index e5b1c3c100d..fc9a08104b4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -28,6 +28,7 @@ import 
org.apache.spark.sql.cat

[spark] branch master updated: [SPARK-41338][SQL] Resolve outer references and normal columns in the same analyzer batch

2022-11-30 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8d60736abd5 [SPARK-41338][SQL] Resolve outer references and normal 
columns in the same analyzer batch
8d60736abd5 is described below

commit 8d60736abd56c5642ae4d3616593c94090cdf9ac
Author: Wenchen Fan 
AuthorDate: Thu Dec 1 14:20:23 2022 +0800

[SPARK-41338][SQL] Resolve outer references and normal columns in the same 
analyzer batch

### What changes were proposed in this pull request?

Today, the way we resolve outer references is very inefficient. It invokes 
the entire analyzer to resolve the subquery plan, then transforms the plan to 
resolve `UnresolvedAttribute` to outer references. If the plan is still 
unresolved, repeat the process until the plan is resolved or the plan doesn't 
change any more. Ideally, we should only invoke the analyzer once to resolve 
subquery plans.

This PR adds a new rule to resolve outer references, and put it in the main 
analyzer batch. Then we can safely invoke the analyzer only once.

### Why are the changes needed?

Simplify the subquery resolution code and make it more efficient

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #38851 from cloud-fan/outer.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/analysis/Analyzer.scala | 106 +++--
 1 file changed, 57 insertions(+), 49 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1daa8ea36bf..7f66ddaa894 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -298,6 +298,7 @@ class Analyzer(override val catalogManager: CatalogManager)
   ResolveOrdinalInOrderByAndGroupBy ::
   ResolveAggAliasInGroupBy ::
   ResolveMissingReferences ::
+  ResolveOuterReferences ::
   ExtractGenerator ::
   ResolveGenerate ::
   ResolveFunctions ::
@@ -2109,6 +2110,51 @@ class Analyzer(override val catalogManager: 
CatalogManager)
 }
   }
 
+  /**
+   * Resolves `UnresolvedAttribute` to `OuterReference` if we are resolving 
subquery plans (when
+   * `AnalysisContext.get.outerPlan` is set).
+   */
+  object ResolveOuterReferences extends Rule[LogicalPlan] {
+override def apply(plan: LogicalPlan): LogicalPlan = {
+  // Only apply this rule if we are resolving subquery plans.
+  if (AnalysisContext.get.outerPlan.isEmpty) return plan
+
+  // We must run these 3 rules first, as they also resolve 
`UnresolvedAttribute` and have
+  // higher priority than outer reference resolution.
+  val prepared = 
ResolveAggregateFunctions(ResolveMissingReferences(ResolveReferences(plan)))
+  
prepared.resolveOperatorsDownWithPruning(_.containsPattern(UNRESOLVED_ATTRIBUTE))
 {
+// Handle `Generate` specially here, because 
`Generate.generatorOutput` starts with
+// `UnresolvedAttribute` but we should never resolve it to outer 
references. It's a bit
+// hacky that `Generate` uses `UnresolvedAttribute` to store the 
generator column names,
+// we should clean it up later.
+case g: Generate if g.childrenResolved && !g.resolved =>
+  val newGenerator = g.generator.transformWithPruning(
+_.containsPattern(UNRESOLVED_ATTRIBUTE))(resolveOuterReference)
+  val resolved = g.copy(generator = 
newGenerator.asInstanceOf[Generator])
+  resolved.copyTagsFrom(g)
+  resolved
+case q: LogicalPlan if q.childrenResolved && !q.resolved =>
+  q.transformExpressionsWithPruning(
+_.containsPattern(UNRESOLVED_ATTRIBUTE))(resolveOuterReference)
+  }
+}
+
+private val resolveOuterReference: PartialFunction[Expression, Expression] 
= {
+  case u @ UnresolvedAttribute(nameParts) => withPosition(u) {
+try {
+  AnalysisContext.get.outerPlan.get.resolveChildren(nameParts, 
resolver) match {
+case Some(resolved) => wrapOuterReference(resolved)
+case None => u
+  }
+} catch {
+  case ae: AnalysisException =>
+logDebug(ae.getMessage)
+u
+}
+  }
+}
+  }
+
   /**
* Checks whether a function identifier referenced by an 
[[UnresolvedFunction]] is defined in the
* function registry. Note that this rule doesn't try to resolve the 
[[UnresolvedFunction]]. It
@@ -2482,65 +2528,27 @@ class Analyzer(override val catalogManager: 
CatalogManager)
*/
   object Resolve

[spark] branch master updated: [SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just clearing

2022-11-30 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d67e22826cd [SPARK-41339][SQL] Close and recreate RocksDB write batch 
instead of just clearing
d67e22826cd is described below

commit d67e22826cda41d732e010d73687e74fab60f4b6
Author: Adam Binford 
AuthorDate: Thu Dec 1 15:50:13 2022 +0900

[SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just 
clearing

### What changes were proposed in this pull request?

Instead of just calling `writeBatch.clear`, close the write batch and 
recreate it.

### Why are the changes needed?

A RocksDB `WriteBatch` (and by extension `WriteBatchWithIndex`) stores its 
underlying data in a `std::string`. Why? I'm not sure. But after a partition is 
finished, `writeBatch.clear()` is called (somewhat indirectly through a call to 
`store.abort`), presumably clearing the data in the `WriteBatch`. This calls 
`std::string::clear` followed by `std::string::resize` underneath the hood. 
However, neither of these two things actually reclaims native memory. All the 
memory allocated for ex [...]

### Does this PR introduce _any_ user-facing change?

Fix for excess native memory usage.

### How was this patch tested?

Existing UTs, not sure how to test for memory usage.

Closes #38853 from Kimahriman/rocksdb-write-batch-close.

Lead-authored-by: Adam Binford 
Co-authored-by: centos 
Signed-off-by: Jungtaek Lim 
---
 .../spark/sql/execution/streaming/state/RocksDB.scala   | 13 ++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 3e1bcbbbf0d..5acd20f49dc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -63,7 +63,7 @@ class RocksDB(
   private val readOptions = new ReadOptions()  // used for gets
   private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
   private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
-  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+  private var writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
 
   private val bloomFilter = new BloomFilter()
   private val tableFormatConfig = new BlockBasedTableConfig()
@@ -135,7 +135,7 @@ class RocksDB(
   }
   // reset resources to prevent side-effects from previous loaded version
   closePrefixScanIterators()
-  writeBatch.clear()
+  resetWriteBatch()
   logInfo(s"Loaded $version")
 } catch {
   case t: Throwable =>
@@ -328,7 +328,7 @@ class RocksDB(
*/
   def rollback(): Unit = {
 closePrefixScanIterators()
-writeBatch.clear()
+resetWriteBatch()
 numKeysOnWritingVersion = numKeysOnLoadedVersion
 release()
 logInfo(s"Rolled back to $loadedVersion")
@@ -455,6 +455,13 @@ class RocksDB(
 prefixScanReuseIter.clear()
   }
 
+  /** Create a new WriteBatch, clear doesn't deallocate the native memory */
+  private def resetWriteBatch(): Unit = {
+writeBatch.clear()
+writeBatch.close()
+writeBatch = new WriteBatchWithIndex(true)
+  }
+
   private def getDBProperty(property: String): Long = {
 db.getProperty(property).toLong
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just clearing

2022-11-30 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new bdafe574865 [SPARK-41339][SQL] Close and recreate RocksDB write batch 
instead of just clearing
bdafe574865 is described below

commit bdafe574865696e79eaa959dabef913cba5857ce
Author: Adam Binford 
AuthorDate: Thu Dec 1 15:50:13 2022 +0900

[SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just 
clearing

### What changes were proposed in this pull request?

Instead of just calling `writeBatch.clear`, close the write batch and 
recreate it.

### Why are the changes needed?

A RocksDB `WriteBatch` (and by extension `WriteBatchWithIndex`) stores its 
underlying data in a `std::string`. Why? I'm not sure. But after a partition is 
finished, `writeBatch.clear()` is called (somewhat indirectly through a call to 
`store.abort`), presumably clearing the data in the `WriteBatch`. This calls 
`std::string::clear` followed by `std::string::resize` underneath the hood. 
However, neither of these two things actually reclaims native memory. All the 
memory allocated for ex [...]

### Does this PR introduce _any_ user-facing change?

Fix for excess native memory usage.

### How was this patch tested?

Existing UTs, not sure how to test for memory usage.

Closes #38853 from Kimahriman/rocksdb-write-batch-close.

Lead-authored-by: Adam Binford 
Co-authored-by: centos 
Signed-off-by: Jungtaek Lim 
(cherry picked from commit d67e22826cda41d732e010d73687e74fab60f4b6)
Signed-off-by: Jungtaek Lim 
---
 .../spark/sql/execution/streaming/state/RocksDB.scala   | 13 ++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index a5bd489e04f..66e14f6bff1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -63,7 +63,7 @@ class RocksDB(
   private val readOptions = new ReadOptions()  // used for gets
   private val writeOptions = new WriteOptions().setSync(true)  // wait for 
batched write to complete
   private val flushOptions = new FlushOptions().setWaitForFlush(true)  // wait 
for flush to complete
-  private val writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
+  private var writeBatch = new WriteBatchWithIndex(true)  // overwrite 
multiple updates to a key
 
   private val bloomFilter = new BloomFilter()
   private val tableFormatConfig = new BlockBasedTableConfig()
@@ -134,7 +134,7 @@ class RocksDB(
   }
   // reset resources to prevent side-effects from previous loaded version
   closePrefixScanIterators()
-  writeBatch.clear()
+  resetWriteBatch()
   logInfo(s"Loaded $version")
 } catch {
   case t: Throwable =>
@@ -327,7 +327,7 @@ class RocksDB(
*/
   def rollback(): Unit = {
 closePrefixScanIterators()
-writeBatch.clear()
+resetWriteBatch()
 numKeysOnWritingVersion = numKeysOnLoadedVersion
 release()
 logInfo(s"Rolled back to $loadedVersion")
@@ -454,6 +454,13 @@ class RocksDB(
 prefixScanReuseIter.clear()
   }
 
+  /** Create a new WriteBatch, clear doesn't deallocate the native memory */
+  private def resetWriteBatch(): Unit = {
+writeBatch.clear()
+writeBatch.close()
+writeBatch = new WriteBatchWithIndex(true)
+  }
+
   private def getDBProperty(property: String): Long = {
 db.getProperty(property).toLong
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org