[spark] branch master updated: [SPARK-41391][SQL] The output column name of groupBy.agg(count_distinct) is incorrect

2023-03-30 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cb7d0828062 [SPARK-41391][SQL] The output column name of 
groupBy.agg(count_distinct) is incorrect
cb7d0828062 is described below

commit cb7d08280623794b238297d2f3de8abdc8b72bdb
Author: Ritika Maheshwari 
AuthorDate: Fri Mar 31 12:13:23 2023 +0800

[SPARK-41391][SQL] The output column name of groupBy.agg(count_distinct) is 
incorrect

### What changes were proposed in this pull request?

correct the output column name of groupBy.agg(count_distinct),  so the "*" 
is expanded correctly into column names and the output column has the distinct 
keyword.

### Why are the changes needed?

Output column name for groupBy.agg(count_distinct)  is incorrect . However 
similar queries in spark sql return correct output column. For groupBy.agg 
queries on dataframe "*" is not expanded correctly in the output column  and 
the distinct keyword is missing from output column.

```
// initializing data
scala> val df = spark.range(1, 10).withColumn("value", lit(1))
df: org.apache.spark.sql.DataFrame = [id: bigint, value: int]
scala> df.createOrReplaceTempView("table")

// Dataframe  aggregate queries with incorrect output column
scala> df.groupBy("id").agg(count_distinct($"*"))
res3: org.apache.spark.sql.DataFrame = [id: bigint, 
count(unresolvedstar()): bigint]
scala> df.groupBy("id").agg(count_distinct($"value"))
res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint]

// Spark Sql aggregate queries with correct output column
scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ")
res4: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT id, 
value): bigint]
scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id 
")
res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): 
bigint]
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added UT

Closes #40116 from ritikam2/master.

Authored-by: Ritika Maheshwari 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/RelationalGroupedDataset.scala  |  1 +
 .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 12 
 2 files changed, 13 insertions(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 1c2e309bdaf..31c303921f3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -89,6 +89,7 @@ class RelationalGroupedDataset protected[sql](
 case expr: NamedExpression => expr
 case a: AggregateExpression if 
a.aggregateFunction.isInstanceOf[TypedAggregateExpression] =>
   UnresolvedAlias(a, Some(Column.generateAlias))
+case u: UnresolvedFunction => UnresolvedAlias(expr, None)
 case expr: Expression => Alias(expr, toPrettySQL(expr))()
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index a15c049715b..d4c4c7c9b16 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1137,6 +1137,18 @@ class DataFrameSuite extends QueryTest
 checkAnswer(approxSummaryDF, approxSummaryResult)
   }
 
+  test("SPARK-41391: Correct the output column name of 
groupBy.agg(count_distinct)") {
+withTempView("person") {
+  person.createOrReplaceTempView("person")
+  val df1 = person.groupBy("id").agg(count_distinct(col("name")))
+  val df2 = spark.sql("SELECT id, COUNT(DISTINCT name) FROM person GROUP 
BY id")
+  assert(df1.columns === df2.columns)
+  val df3 = person.groupBy("id").agg(count_distinct(col("*")))
+  val df4 = spark.sql("SELECT id, COUNT(DISTINCT *) FROM person GROUP BY 
id")
+  assert(df3.columns === df4.columns)
+}
+  }
+
   test("summary advanced") {
 val stats = Array("count", "50.01%", "max", "mean", "min", "25%")
 val orderMatters = person2.summary(stats: _*)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-42969][CONNECT][TESTS] Fix the comparison the result with Arrow optimization enabled/disabled

2023-03-30 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 68fa8cafc3c [SPARK-42969][CONNECT][TESTS] Fix the comparison the 
result with Arrow optimization enabled/disabled
68fa8cafc3c is described below

commit 68fa8cafc3cfcdc043920ca8544c24ac88f0a63c
Author: Takuya UESHIN 
AuthorDate: Fri Mar 31 08:45:44 2023 +0900

[SPARK-42969][CONNECT][TESTS] Fix the comparison the result with Arrow 
optimization enabled/disabled

Fixes the comparison the result with Arrow optimization enabled/disabled.

in `test_arrow`, there are a bunch of comparison between DataFrames with 
Arrow optimization enabled/disabled.

These should be fixed to compare with the expected values so that it can be 
reusable for Spark Connect parity tests.

No.

Updated the tests.

Closes #40612 from ueshin/issues/SPARK-42969/test_arrow.

Authored-by: Takuya UESHIN 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 35503a535771d257b517e7ddf2adfaefefd97dad)
Signed-off-by: Hyukjin Kwon 
---
 .../pyspark/sql/tests/connect/test_parity_arrow.py |  47 +++--
 python/pyspark/sql/tests/test_arrow.py | 202 +
 2 files changed, 163 insertions(+), 86 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py 
b/python/pyspark/sql/tests/connect/test_parity_arrow.py
index f8180d661db..8953b2f8d98 100644
--- a/python/pyspark/sql/tests/connect/test_parity_arrow.py
+++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py
@@ -37,19 +37,20 @@ class ArrowParityTests(ArrowTestsMixin, 
ReusedConnectTestCase):
 def test_createDataFrame_with_incorrect_schema(self):
 self.check_createDataFrame_with_incorrect_schema()
 
-# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization 
enabled/disabled.
+# TODO(SPARK-42982): INVALID_COLUMN_OR_FIELD_DATA_TYPE
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_createDataFrame_with_map_type(self):
-super().test_createDataFrame_with_map_type()
+self.check_createDataFrame_with_map_type(True)
 
-# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization 
enabled/disabled.
+# TODO(SPARK-42983): len() of unsized object
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_createDataFrame_with_ndarray(self):
-super().test_createDataFrame_with_ndarray()
+self.check_createDataFrame_with_ndarray(True)
 
+# TODO(SPARK-42984): ValueError not raised
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_createDataFrame_with_single_data_type(self):
-super().test_createDataFrame_with_single_data_type()
+self.check_createDataFrame_with_single_data_type()
 
 @unittest.skip("Spark Connect does not support RDD but the tests depend on 
them.")
 def test_no_partition_frame(self):
@@ -70,9 +71,20 @@ class ArrowParityTests(ArrowTestsMixin, 
ReusedConnectTestCase):
 def test_toPandas_batch_order(self):
 super().test_toPandas_batch_order()
 
-@unittest.skip("Spark Connect does not support Spark Context but the test 
depends on that.")
 def test_toPandas_empty_df_arrow_enabled(self):
-super().test_toPandas_empty_df_arrow_enabled()
+self.check_toPandas_empty_df_arrow_enabled(True)
+
+def test_create_data_frame_to_pandas_timestamp_ntz(self):
+self.check_create_data_frame_to_pandas_timestamp_ntz(True)
+
+def test_create_data_frame_to_pandas_day_time_internal(self):
+self.check_create_data_frame_to_pandas_day_time_internal(True)
+
+def test_toPandas_respect_session_timezone(self):
+self.check_toPandas_respect_session_timezone(True)
+
+def test_toPandas_with_array_type(self):
+self.check_toPandas_with_array_type(True)
 
 @unittest.skip("Spark Connect does not support fallback.")
 def test_toPandas_fallback_disabled(self):
@@ -82,20 +94,29 @@ class ArrowParityTests(ArrowTestsMixin, 
ReusedConnectTestCase):
 def test_toPandas_fallback_enabled(self):
 super().test_toPandas_fallback_enabled()
 
-# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization 
enabled/disabled.
+# TODO(SPARK-42982): INVALID_COLUMN_OR_FIELD_DATA_TYPE
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_toPandas_with_map_type(self):
-super().test_toPandas_with_map_type()
+self.check_toPandas_with_map_type(True)
 
-# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization 
enabled/disabled.
+# TODO(SPARK-42982): INVALID_COLUMN_OR_FIELD_DATA_TYPE
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_toPandas_with_map_type_nulls(self):
-super().test_toPandas

[spark] branch branch-3.4 updated: [SPARK-42970][CONNECT][PYTHON][TESTS][3.4] Reuse pyspark.sql.tests.test_arrow test cases

2023-03-30 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 98f00ea2bcf [SPARK-42970][CONNECT][PYTHON][TESTS][3.4] Reuse 
pyspark.sql.tests.test_arrow test cases
98f00ea2bcf is described below

commit 98f00ea2bcfacb55615b13405573bb79768eb7cc
Author: Takuya UESHIN 
AuthorDate: Fri Mar 31 08:46:36 2023 +0900

[SPARK-42970][CONNECT][PYTHON][TESTS][3.4] Reuse 
pyspark.sql.tests.test_arrow test cases

### What changes were proposed in this pull request?

Reuses `pyspark.sql.tests.test_arrow` test cases.

### Why are the changes needed?

`test_arrow` is also helpful because it contains many tests for 
`createDataFrame` with pandas or `toPandas`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added the tests.

Closes #40595 from ueshin/issues/SPARK-42970/3.4/test_arrow.

Authored-by: Takuya UESHIN 
Signed-off-by: Hyukjin Kwon 
---
 dev/sparktestsupport/modules.py|   1 +
 .../pyspark/sql/tests/connect/test_parity_arrow.py | 110 +
 python/pyspark/sql/tests/test_arrow.py |  37 ---
 3 files changed, 135 insertions(+), 13 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 29bc39e14bf..fd18ddd6d13 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -524,6 +524,7 @@ pyspark_connect = Module(
 "pyspark.sql.tests.connect.test_connect_basic",
 "pyspark.sql.tests.connect.test_connect_function",
 "pyspark.sql.tests.connect.test_connect_column",
+"pyspark.sql.tests.connect.test_parity_arrow",
 "pyspark.sql.tests.connect.test_parity_datasources",
 "pyspark.sql.tests.connect.test_parity_errors",
 "pyspark.sql.tests.connect.test_parity_catalog",
diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py 
b/python/pyspark/sql/tests/connect/test_parity_arrow.py
new file mode 100644
index 000..f8180d661db
--- /dev/null
+++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from pyspark.sql.tests.test_arrow import ArrowTestsMixin
+from pyspark.testing.connectutils import ReusedConnectTestCase
+
+
+class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase):
+@unittest.skip("Spark Connect does not support Spark Context but the test 
depends on that.")
+def test_createDataFrame_empty_partition(self):
+super().test_createDataFrame_empty_partition()
+
+@unittest.skip("Spark Connect does not support fallback.")
+def test_createDataFrame_fallback_disabled(self):
+super().test_createDataFrame_fallback_disabled()
+
+@unittest.skip("Spark Connect does not support fallback.")
+def test_createDataFrame_fallback_enabled(self):
+super().test_createDataFrame_fallback_enabled()
+
+def test_createDataFrame_with_incorrect_schema(self):
+self.check_createDataFrame_with_incorrect_schema()
+
+# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization 
enabled/disabled.
+@unittest.skip("Fails in Spark Connect, should enable.")
+def test_createDataFrame_with_map_type(self):
+super().test_createDataFrame_with_map_type()
+
+# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization 
enabled/disabled.
+@unittest.skip("Fails in Spark Connect, should enable.")
+def test_createDataFrame_with_ndarray(self):
+super().test_createDataFrame_with_ndarray()
+
+@unittest.skip("Fails in Spark Connect, should enable.")
+def test_createDataFrame_with_single_data_type(self):
+super().test_createDataFrame_with_single_data_type()
+
+@unittest.skip("Spark Connect does not support RDD but the tests depend on 
them.")
+def test_no_partition_frame(self):
+super().test_no_partition_frame()
+
+@unittest.skip("Spark Connect does not support RDD but th

[spark] branch master updated: [SPARK-42969][CONNECT][TESTS] Fix the comparison the result with Arrow optimization enabled/disabled

2023-03-30 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 35503a53577 [SPARK-42969][CONNECT][TESTS] Fix the comparison the 
result with Arrow optimization enabled/disabled
35503a53577 is described below

commit 35503a535771d257b517e7ddf2adfaefefd97dad
Author: Takuya UESHIN 
AuthorDate: Fri Mar 31 08:45:44 2023 +0900

[SPARK-42969][CONNECT][TESTS] Fix the comparison the result with Arrow 
optimization enabled/disabled

### What changes were proposed in this pull request?

Fixes the comparison the result with Arrow optimization enabled/disabled.

### Why are the changes needed?

in `test_arrow`, there are a bunch of comparison between DataFrames with 
Arrow optimization enabled/disabled.

These should be fixed to compare with the expected values so that it can be 
reusable for Spark Connect parity tests.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Updated the tests.

Closes #40612 from ueshin/issues/SPARK-42969/test_arrow.

Authored-by: Takuya UESHIN 
Signed-off-by: Hyukjin Kwon 
---
 .../pyspark/sql/tests/connect/test_parity_arrow.py |  47 +++--
 python/pyspark/sql/tests/test_arrow.py | 202 +
 2 files changed, 163 insertions(+), 86 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py 
b/python/pyspark/sql/tests/connect/test_parity_arrow.py
index f8180d661db..8953b2f8d98 100644
--- a/python/pyspark/sql/tests/connect/test_parity_arrow.py
+++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py
@@ -37,19 +37,20 @@ class ArrowParityTests(ArrowTestsMixin, 
ReusedConnectTestCase):
 def test_createDataFrame_with_incorrect_schema(self):
 self.check_createDataFrame_with_incorrect_schema()
 
-# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization 
enabled/disabled.
+# TODO(SPARK-42982): INVALID_COLUMN_OR_FIELD_DATA_TYPE
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_createDataFrame_with_map_type(self):
-super().test_createDataFrame_with_map_type()
+self.check_createDataFrame_with_map_type(True)
 
-# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization 
enabled/disabled.
+# TODO(SPARK-42983): len() of unsized object
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_createDataFrame_with_ndarray(self):
-super().test_createDataFrame_with_ndarray()
+self.check_createDataFrame_with_ndarray(True)
 
+# TODO(SPARK-42984): ValueError not raised
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_createDataFrame_with_single_data_type(self):
-super().test_createDataFrame_with_single_data_type()
+self.check_createDataFrame_with_single_data_type()
 
 @unittest.skip("Spark Connect does not support RDD but the tests depend on 
them.")
 def test_no_partition_frame(self):
@@ -70,9 +71,20 @@ class ArrowParityTests(ArrowTestsMixin, 
ReusedConnectTestCase):
 def test_toPandas_batch_order(self):
 super().test_toPandas_batch_order()
 
-@unittest.skip("Spark Connect does not support Spark Context but the test 
depends on that.")
 def test_toPandas_empty_df_arrow_enabled(self):
-super().test_toPandas_empty_df_arrow_enabled()
+self.check_toPandas_empty_df_arrow_enabled(True)
+
+def test_create_data_frame_to_pandas_timestamp_ntz(self):
+self.check_create_data_frame_to_pandas_timestamp_ntz(True)
+
+def test_create_data_frame_to_pandas_day_time_internal(self):
+self.check_create_data_frame_to_pandas_day_time_internal(True)
+
+def test_toPandas_respect_session_timezone(self):
+self.check_toPandas_respect_session_timezone(True)
+
+def test_toPandas_with_array_type(self):
+self.check_toPandas_with_array_type(True)
 
 @unittest.skip("Spark Connect does not support fallback.")
 def test_toPandas_fallback_disabled(self):
@@ -82,20 +94,29 @@ class ArrowParityTests(ArrowTestsMixin, 
ReusedConnectTestCase):
 def test_toPandas_fallback_enabled(self):
 super().test_toPandas_fallback_enabled()
 
-# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization 
enabled/disabled.
+# TODO(SPARK-42982): INVALID_COLUMN_OR_FIELD_DATA_TYPE
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_toPandas_with_map_type(self):
-super().test_toPandas_with_map_type()
+self.check_toPandas_with_map_type(True)
 
-# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization 
enabled/disabled.
+# TODO(SPARK-42982): INVALID_COLUMN_OR_FIELD_DATA_TYPE
 @unittest.skip("Fails in Spark Connect, should

[spark] branch branch-3.2 updated: [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled

2023-03-30 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 8488a255912 [SPARK-42967][CORE][3.2][3.3][3.4] Fix 
SparkListenerTaskStart.stageAttemptId when a task is started after the stage is 
cancelled
8488a255912 is described below

commit 8488a255912b821c1439603c2e72d7b252ef0a57
Author: Xingbo Jiang 
AuthorDate: Thu Mar 30 15:48:04 2023 -0700

[SPARK-42967][CORE][3.2][3.3][3.4] Fix 
SparkListenerTaskStart.stageAttemptId when a task is started after the stage is 
cancelled

### What changes were proposed in this pull request?
The PR fixes a bug that SparkListenerTaskStart can have `stageAttemptId = 
-1` when a task is launched after the stage is cancelled. Actually, we should 
use the information within `Task` to update the `stageAttemptId` field.

### Why are the changes needed?
-1 is not a legal stageAttemptId value, thus it can lead to unexpected 
problem if a subscriber try to parse the stage information from the 
SparkListenerTaskStart event.

### Does this PR introduce _any_ user-facing change?
No, it's a bugfix.

### How was this patch tested?
Manually verified.

Closes #40592 from jiangxb1987/SPARK-42967.

Authored-by: Xingbo Jiang 
Signed-off-by: Gengliang Wang 
(cherry picked from commit 1a6b1770c85f37982b15d261abf9cc6e4be740f4)
Signed-off-by: Gengliang Wang 
---
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a82d261d545..b950c07f3d8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1130,11 +1130,7 @@ private[spark] class DAGScheduler(
   }
 
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): 
Unit = {
-// Note that there is a chance that this task is launched after the stage 
is cancelled.
-// In that case, we wouldn't have the stage anymore in stageIdToStage.
-val stageAttemptId =
-  
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
-listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
+listenerBus.post(SparkListenerTaskStart(task.stageId, task.stageAttemptId, 
taskInfo))
   }
 
   private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = 
{


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled

2023-03-30 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1a6b1770c85 [SPARK-42967][CORE][3.2][3.3][3.4] Fix 
SparkListenerTaskStart.stageAttemptId when a task is started after the stage is 
cancelled
1a6b1770c85 is described below

commit 1a6b1770c85f37982b15d261abf9cc6e4be740f4
Author: Xingbo Jiang 
AuthorDate: Thu Mar 30 15:48:04 2023 -0700

[SPARK-42967][CORE][3.2][3.3][3.4] Fix 
SparkListenerTaskStart.stageAttemptId when a task is started after the stage is 
cancelled

### What changes were proposed in this pull request?
The PR fixes a bug that SparkListenerTaskStart can have `stageAttemptId = 
-1` when a task is launched after the stage is cancelled. Actually, we should 
use the information within `Task` to update the `stageAttemptId` field.

### Why are the changes needed?
-1 is not a legal stageAttemptId value, thus it can lead to unexpected 
problem if a subscriber try to parse the stage information from the 
SparkListenerTaskStart event.

### Does this PR introduce _any_ user-facing change?
No, it's a bugfix.

### How was this patch tested?
Manually verified.

Closes #40592 from jiangxb1987/SPARK-42967.

Authored-by: Xingbo Jiang 
Signed-off-by: Gengliang Wang 
---
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 065bf88ab16..c78a26d91eb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1183,11 +1183,7 @@ private[spark] class DAGScheduler(
   }
 
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): 
Unit = {
-// Note that there is a chance that this task is launched after the stage 
is cancelled.
-// In that case, we wouldn't have the stage anymore in stageIdToStage.
-val stageAttemptId =
-  
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
-listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
+listenerBus.post(SparkListenerTaskStart(task.stageId, task.stageAttemptId, 
taskInfo))
   }
 
   private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], 
taskIndex: Int): Unit = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled

2023-03-30 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 6f266ee1f76 [SPARK-42967][CORE][3.2][3.3][3.4] Fix 
SparkListenerTaskStart.stageAttemptId when a task is started after the stage is 
cancelled
6f266ee1f76 is described below

commit 6f266ee1f76323e147317a8ab98600dd3868667a
Author: Xingbo Jiang 
AuthorDate: Thu Mar 30 15:48:04 2023 -0700

[SPARK-42967][CORE][3.2][3.3][3.4] Fix 
SparkListenerTaskStart.stageAttemptId when a task is started after the stage is 
cancelled

### What changes were proposed in this pull request?
The PR fixes a bug that SparkListenerTaskStart can have `stageAttemptId = 
-1` when a task is launched after the stage is cancelled. Actually, we should 
use the information within `Task` to update the `stageAttemptId` field.

### Why are the changes needed?
-1 is not a legal stageAttemptId value, thus it can lead to unexpected 
problem if a subscriber try to parse the stage information from the 
SparkListenerTaskStart event.

### Does this PR introduce _any_ user-facing change?
No, it's a bugfix.

### How was this patch tested?
Manually verified.

Closes #40592 from jiangxb1987/SPARK-42967.

Authored-by: Xingbo Jiang 
Signed-off-by: Gengliang Wang 
(cherry picked from commit 1a6b1770c85f37982b15d261abf9cc6e4be740f4)
Signed-off-by: Gengliang Wang 
---
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 0de60224179..bd2823bcac1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1152,11 +1152,7 @@ private[spark] class DAGScheduler(
   }
 
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): 
Unit = {
-// Note that there is a chance that this task is launched after the stage 
is cancelled.
-// In that case, we wouldn't have the stage anymore in stageIdToStage.
-val stageAttemptId =
-  
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
-listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
+listenerBus.post(SparkListenerTaskStart(task.stageId, task.stageAttemptId, 
taskInfo))
   }
 
   private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = 
{


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled

2023-03-30 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new e20b55bf4de [SPARK-42967][CORE][3.2][3.3][3.4] Fix 
SparkListenerTaskStart.stageAttemptId when a task is started after the stage is 
cancelled
e20b55bf4de is described below

commit e20b55bf4dec353b97f9eae38988fa915213a214
Author: Xingbo Jiang 
AuthorDate: Thu Mar 30 15:48:04 2023 -0700

[SPARK-42967][CORE][3.2][3.3][3.4] Fix 
SparkListenerTaskStart.stageAttemptId when a task is started after the stage is 
cancelled

### What changes were proposed in this pull request?
The PR fixes a bug that SparkListenerTaskStart can have `stageAttemptId = 
-1` when a task is launched after the stage is cancelled. Actually, we should 
use the information within `Task` to update the `stageAttemptId` field.

### Why are the changes needed?
-1 is not a legal stageAttemptId value, thus it can lead to unexpected 
problem if a subscriber try to parse the stage information from the 
SparkListenerTaskStart event.

### Does this PR introduce _any_ user-facing change?
No, it's a bugfix.

### How was this patch tested?
Manually verified.

Closes #40592 from jiangxb1987/SPARK-42967.

Authored-by: Xingbo Jiang 
Signed-off-by: Gengliang Wang 
(cherry picked from commit 1a6b1770c85f37982b15d261abf9cc6e4be740f4)
Signed-off-by: Gengliang Wang 
---
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index f1ccaf05509..2a966fab6f0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1171,11 +1171,7 @@ private[spark] class DAGScheduler(
   }
 
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): 
Unit = {
-// Note that there is a chance that this task is launched after the stage 
is cancelled.
-// In that case, we wouldn't have the stage anymore in stageIdToStage.
-val stageAttemptId =
-  
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
-listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
+listenerBus.post(SparkListenerTaskStart(task.stageId, task.stageAttemptId, 
taskInfo))
   }
 
   private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], 
taskIndex: Int): Unit = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-42979][SQL] Define literal constructors as keywords

2023-03-30 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5a17537aa4a [SPARK-42979][SQL] Define literal constructors as keywords
5a17537aa4a is described below

commit 5a17537aa4a777429431542cfa6184591476e54a
Author: Max Gekk 
AuthorDate: Thu Mar 30 17:43:54 2023 +0300

[SPARK-42979][SQL] Define literal constructors as keywords

### What changes were proposed in this pull request?
In the PR, I propose to define literal constructors `DATE`, `TIMESTAMP`, 
`TIMESTAMP_NTZ`, `TIMESTAMP_LTZ`, `INTERVAL`, and `X` as Spark SQL keywords.

### Why are the changes needed?
The non-keywords literal constructors cause some inconveniences while 
analysing/transforming the lexer tree. For example, while forming the stable 
column aliases, see https://github.com/apache/spark/pull/40126.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *SQLKeywordSuite"
$ build/sbt "test:testOnly *.ResolveAliasesSuite"
```

Closes #40593 from MaxGekk/typed-literal-keywords.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 docs/sql-ref-ansi-compliance.md|  1 +
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4  |  1 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 12 ++-
 .../spark/sql/catalyst/parser/AstBuilder.scala | 23 +++---
 .../catalyst/analysis/ResolveAliasesSuite.scala|  4 ++--
 5 files changed, 27 insertions(+), 14 deletions(-)

diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index 36d1f8f73eb..d4bb0e93bee 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -672,6 +672,7 @@ Below is a list of all the keywords in Spark SQL.
 |WINDOW|non-reserved|non-reserved|reserved|
 |WITH|reserved|non-reserved|reserved|
 |WITHIN|reserved|non-reserved|reserved|
+|X|non-reserved|non-reserved|non-reserved|
 |YEAR|non-reserved|non-reserved|non-reserved|
 |YEARS|non-reserved|non-reserved|non-reserved|
 |ZONE|non-reserved|non-reserved|non-reserved|
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 4d446b494f7..c9930fa0986 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -208,6 +208,7 @@ GRANT: 'GRANT';
 GROUP: 'GROUP';
 GROUPING: 'GROUPING';
 HAVING: 'HAVING';
+BINARY_HEX: 'X';
 HOUR: 'HOUR';
 HOURS: 'HOURS';
 IF: 'IF';
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index ab54aef35df..a112b6e31fe 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -928,11 +928,19 @@ primaryExpression
 (FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)? ( OVER 
windowSpec)? #percentile
 ;
 
+literalType
+: DATE
+| TIMESTAMP | TIMESTAMP_LTZ | TIMESTAMP_NTZ
+| INTERVAL
+| BINARY_HEX
+| unsupportedType=identifier
+;
+
 constant
 : NULL 
#nullLiteral
 | COLON identifier 
#parameterLiteral
 | interval 
#intervalLiteral
-| identifier stringLit 
#typeConstructor
+| literalType stringLit
 #typeConstructor
 | number   
#numericLiteral
 | booleanValue 
#booleanLiteral
 | stringLit+   
#stringLiteral
@@ -1227,6 +1235,7 @@ ansiNonReserved
 | BETWEEN
 | BIGINT
 | BINARY
+| BINARY_HEX
 | BOOLEAN
 | BUCKET
 | BUCKETS
@@ -1514,6 +1523,7 @@ nonReserved
 | BETWEEN
 | BIGINT
 | BINARY
+| BINARY_HEX
 | BOOLEAN
 | BOTH
 | BUCKET
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
inde

[spark] branch master updated: [SPARK-42968][SS] Add option to skip commit coordinator as part of StreamingWrite API for DSv2 sources/sinks

2023-03-30 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 122a88c4941 [SPARK-42968][SS] Add option to skip commit coordinator as 
part of StreamingWrite API for DSv2 sources/sinks
122a88c4941 is described below

commit 122a88c4941d8ce1b8344c425fed455b79298afa
Author: Anish Shrigondekar 
AuthorDate: Thu Mar 30 21:48:47 2023 +0900

[SPARK-42968][SS] Add option to skip commit coordinator as part of 
StreamingWrite API for DSv2 sources/sinks

### What changes were proposed in this pull request?
Add option to skip commit coordinator as part of StreamingWrite API for 
DSv2 sources/sinks. This option was already present as part of the BatchWrite 
API

### Why are the changes needed?
Sinks such as the following are atleast-once for which we do not need to go 
through the commit coordinator on the driver to ensure that a single partition 
commits. This is even less useful for streaming use-cases where batches could 
be replayed from the checkpoint dir.

- memory sink
- console sink
- no-op sink
- Kafka v2 sink

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit test for the change
```
[info] ReportSinkMetricsSuite:
22:23:01.276 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
22:23:03.139 WARN 
org.apache.spark.sql.execution.streaming.ResolveWriteToStream: 
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets 
and will be disabled.
[info] - test ReportSinkMetrics with useCommitCoordinator=true (2 seconds, 
709 milliseconds)
22:23:04.522 WARN 
org.apache.spark.sql.execution.streaming.ResolveWriteToStream: 
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets 
and will be disabled.
[info] - test ReportSinkMetrics with useCommitCoordinator=false (373 
milliseconds)
22:23:04.941 WARN org.apache.spark.sql.streaming.ReportSinkMetricsSuite:

= POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.streaming.ReportSinkMetricsSuite, threads: 
ForkJoinPool.commonPool-worker-19 (daemon=true), rpc-boss-3-1 (daemon=true), 
shuffle-boss-6-1 (daemon=true) =
[info] Run completed in 4 seconds, 934 milliseconds.
[info] Total number of tests run: 2
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

Closes #40600 from anishshri-db/task/SPARK-42968.

Authored-by: Anish Shrigondekar 
Signed-off-by: Jungtaek Lim 
---
 .../spark/sql/kafka010/KafkaStreamingWrite.scala   |  2 +
 .../connector/write/streaming/StreamingWrite.java  | 10 +++
 .../datasources/noop/NoopDataSource.scala  |  1 +
 .../streaming/sources/ConsoleStreamingWrite.scala  |  2 +
 .../streaming/sources/MicroBatchWrite.scala|  4 +
 .../sql/execution/streaming/sources/memory.scala   |  2 +
 .../sql/streaming/ReportSinkMetricsSuite.scala | 85 --
 7 files changed, 67 insertions(+), 39 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
index bcf9e3416f8..db719966267 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala
@@ -45,6 +45,8 @@ private[kafka010] class KafkaStreamingWrite(
   info: PhysicalWriteInfo): KafkaStreamWriterFactory =
 KafkaStreamWriterFactory(topic, producerParams, schema)
 
+  override def useCommitCoordinator(): Boolean = false
+
   override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
   override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
 }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java
index 20694f0b051..ab98bc01b3a 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java
@@ -58,6 +58,16 @@ public interface StreamingWrite {
*/
   StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo 
info);
 
+  /**
+   * Returns whether Spark should use the commit coordinator to ensure that at 
most one task for
+   * each partition commits.
+   *
+   * @return true if commit coordinator should

[spark] branch branch-3.4 updated: Revert "[SPARK-41765][SQL] Pull out v1 write metrics to WriteFiles"

2023-03-30 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new e586527f79c Revert "[SPARK-41765][SQL] Pull out v1 write metrics to 
WriteFiles"
e586527f79c is described below

commit e586527f79c93519467d202a4e258864fda6e8f8
Author: Wenchen Fan 
AuthorDate: Thu Mar 30 19:43:36 2023 +0800

Revert "[SPARK-41765][SQL] Pull out v1 write metrics to WriteFiles"

This reverts commit a111a02de1a814c5f335e0bcac4cffb0515557dc.

### What changes were proposed in this pull request?

SQLMetrics is not only used in the UI, but is also a programming API as 
users can write a listener, get the physical plan, and read the SQLMetrics 
values directly.

We can ask users to update their code and read SQLMetrics from the new 
`WriteFiles` node instead. But this is troublesome and sometimes they may need 
to get both write metrics and commit metrics, then they need to look at two 
physical plan nodes. Given that https://github.com/apache/spark/pull/39428 is 
mostly for cleanup and does not have many benefits, reverting is a better idea.

### Why are the changes needed?

avoid breaking changes.

### Does this PR introduce _any_ user-facing change?

Yes, they can programmatically get the write command metrics as before.

### How was this patch tested?

N/A

Closes #40604 from cloud-fan/revert.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit f4af6a05c0879887e7db2377a174e7b7d7bab693)
Signed-off-by: Wenchen Fan 
---
 .../sql/execution/command/DataWritingCommand.scala | 47 ---
 .../datasources/BasicWriteStatsTracker.scala   | 17 +-
 .../execution/datasources/FileFormatWriter.scala   | 69 +-
 .../sql/execution/datasources/WriteFiles.scala |  3 -
 .../BasicWriteJobStatsTrackerMetricSuite.scala | 19 +++---
 .../sql/execution/metric/SQLMetricsSuite.scala | 28 -
 .../sql/execution/metric/SQLMetricsTestUtils.scala | 21 +--
 .../hive/execution/InsertIntoHiveDirCommand.scala  |  7 ---
 .../spark/sql/hive/execution/SQLMetricsSuite.scala | 42 +
 9 files changed, 94 insertions(+), 159 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index 58c3fca4ad7..338ce8cac42 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -21,13 +21,14 @@ import java.net.URI
 
 import org.apache.hadoop.conf.Configuration
 
+import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand}
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
-import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.SerializableConfiguration
 
@@ -51,19 +52,11 @@ trait DataWritingCommand extends UnaryCommand {
   def outputColumns: Seq[Attribute] =
 DataWritingCommand.logicalPlanOutputWithNames(query, outputColumnNames)
 
-  lazy val metrics: Map[String, SQLMetric] = {
-// If planned write is enable, we have pulled out write files metrics from 
`V1WriteCommand`
-// to `WriteFiles`. `DataWritingCommand` should only holds the task commit 
metric and driver
-// commit metric.
-if (conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) {
-  BasicWriteJobStatsTracker.writeCommitMetrics
-} else {
-  BasicWriteJobStatsTracker.metrics
-}
-  }
+  lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
 
   def basicWriteJobStatsTracker(hadoopConf: Configuration): 
BasicWriteJobStatsTracker = {
-DataWritingCommand.basicWriteJobStatsTracker(metrics, hadoopConf)
+val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
+new BasicWriteJobStatsTracker(serializableHadoopConf, metrics)
   }
 
   def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
@@ -86,6 +79,27 @@ object DataWritingCommand {
 }
   }
 
+  /**
+   * When execute CTAS operators, Spark will use 
[[InsertIntoHadoopFsRelationCommand]]
+   * or [[InsertIntoHiveTable]] command to write data, they both inherit 
metrics from
+   * [[DataWritingCom

[spark] branch master updated: Revert "[SPARK-41765][SQL] Pull out v1 write metrics to WriteFiles"

2023-03-30 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f4af6a05c08 Revert "[SPARK-41765][SQL] Pull out v1 write metrics to 
WriteFiles"
f4af6a05c08 is described below

commit f4af6a05c0879887e7db2377a174e7b7d7bab693
Author: Wenchen Fan 
AuthorDate: Thu Mar 30 19:43:36 2023 +0800

Revert "[SPARK-41765][SQL] Pull out v1 write metrics to WriteFiles"

This reverts commit a111a02de1a814c5f335e0bcac4cffb0515557dc.

### What changes were proposed in this pull request?

SQLMetrics is not only used in the UI, but is also a programming API as 
users can write a listener, get the physical plan, and read the SQLMetrics 
values directly.

We can ask users to update their code and read SQLMetrics from the new 
`WriteFiles` node instead. But this is troublesome and sometimes they may need 
to get both write metrics and commit metrics, then they need to look at two 
physical plan nodes. Given that https://github.com/apache/spark/pull/39428 is 
mostly for cleanup and does not have many benefits, reverting is a better idea.

### Why are the changes needed?

avoid breaking changes.

### Does this PR introduce _any_ user-facing change?

Yes, they can programmatically get the write command metrics as before.

### How was this patch tested?

N/A

Closes #40604 from cloud-fan/revert.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../sql/execution/command/DataWritingCommand.scala | 47 ---
 .../datasources/BasicWriteStatsTracker.scala   | 17 +-
 .../execution/datasources/FileFormatWriter.scala   | 69 +-
 .../sql/execution/datasources/WriteFiles.scala |  3 -
 .../BasicWriteJobStatsTrackerMetricSuite.scala | 19 +++---
 .../sql/execution/metric/SQLMetricsSuite.scala | 28 -
 .../sql/execution/metric/SQLMetricsTestUtils.scala | 21 +--
 .../hive/execution/InsertIntoHiveDirCommand.scala  |  7 ---
 .../spark/sql/hive/execution/SQLMetricsSuite.scala | 42 +
 9 files changed, 94 insertions(+), 159 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index 58c3fca4ad7..338ce8cac42 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -21,13 +21,14 @@ import java.net.URI
 
 import org.apache.hadoop.conf.Configuration
 
+import org.apache.spark.SparkContext
 import org.apache.spark.sql.{Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand}
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
-import org.apache.spark.sql.execution.metric.SQLMetric
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.util.SerializableConfiguration
 
@@ -51,19 +52,11 @@ trait DataWritingCommand extends UnaryCommand {
   def outputColumns: Seq[Attribute] =
 DataWritingCommand.logicalPlanOutputWithNames(query, outputColumnNames)
 
-  lazy val metrics: Map[String, SQLMetric] = {
-// If planned write is enable, we have pulled out write files metrics from 
`V1WriteCommand`
-// to `WriteFiles`. `DataWritingCommand` should only holds the task commit 
metric and driver
-// commit metric.
-if (conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) {
-  BasicWriteJobStatsTracker.writeCommitMetrics
-} else {
-  BasicWriteJobStatsTracker.metrics
-}
-  }
+  lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
 
   def basicWriteJobStatsTracker(hadoopConf: Configuration): 
BasicWriteJobStatsTracker = {
-DataWritingCommand.basicWriteJobStatsTracker(metrics, hadoopConf)
+val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
+new BasicWriteJobStatsTracker(serializableHadoopConf, metrics)
   }
 
   def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
@@ -86,6 +79,27 @@ object DataWritingCommand {
 }
   }
 
+  /**
+   * When execute CTAS operators, Spark will use 
[[InsertIntoHadoopFsRelationCommand]]
+   * or [[InsertIntoHiveTable]] command to write data, they both inherit 
metrics from
+   * [[DataWritingCommand]], but after running 
[[InsertIntoHadoopFsRelationCommand]]
+   * or [[InsertIntoHiveTable]], we only update

[spark] branch master updated (880312b5ade -> b85fe2bff30)

2023-03-30 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 880312b5ade [SPARK-42907][TESTS][FOLLOWUP] Avro functions doctest 
cleanup
 add b85fe2bff30 [SPARK-42683] Automatically rename conflicting metadata 
columns

No new revisions were added by this update.

Summary of changes:
 .../CheckConnectJvmClientCompatibility.scala   |   1 +
 .../connector/catalog/SupportsMetadataColumns.java |  24 +++--
 .../catalyst/expressions/namedExpressions.scala|  33 +--
 .../sql/catalyst/plans/logical/LogicalPlan.scala   |  48 --
 .../apache/spark/sql/catalyst/util/package.scala   |   5 +-
 .../datasources/v2/DataSourceV2Implicits.scala |   2 +-
 .../datasources/v2/DataSourceV2Relation.scala  |   3 +-
 .../sql/connector/catalog/InMemoryBaseTable.scala  |  29 --
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  12 +++
 .../execution/datasources/FileSourceStrategy.scala |   4 +-
 .../spark/sql/connector/MetadataColumnSuite.scala  |  78 +++-
 .../datasources/FileMetadataStructSuite.scala  | 101 +
 12 files changed, 305 insertions(+), 35 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org