[spark] branch master updated: [SPARK-41391][SQL] The output column name of groupBy.agg(count_distinct) is incorrect
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cb7d0828062 [SPARK-41391][SQL] The output column name of groupBy.agg(count_distinct) is incorrect cb7d0828062 is described below commit cb7d08280623794b238297d2f3de8abdc8b72bdb Author: Ritika Maheshwari AuthorDate: Fri Mar 31 12:13:23 2023 +0800 [SPARK-41391][SQL] The output column name of groupBy.agg(count_distinct) is incorrect ### What changes were proposed in this pull request? correct the output column name of groupBy.agg(count_distinct), so the "*" is expanded correctly into column names and the output column has the distinct keyword. ### Why are the changes needed? Output column name for groupBy.agg(count_distinct) is incorrect . However similar queries in spark sql return correct output column. For groupBy.agg queries on dataframe "*" is not expanded correctly in the output column and the distinct keyword is missing from output column. ``` // initializing data scala> val df = spark.range(1, 10).withColumn("value", lit(1)) df: org.apache.spark.sql.DataFrame = [id: bigint, value: int] scala> df.createOrReplaceTempView("table") // Dataframe aggregate queries with incorrect output column scala> df.groupBy("id").agg(count_distinct($"*")) res3: org.apache.spark.sql.DataFrame = [id: bigint, count(unresolvedstar()): bigint] scala> df.groupBy("id").agg(count_distinct($"value")) res1: org.apache.spark.sql.DataFrame = [id: bigint, count(value): bigint] // Spark Sql aggregate queries with correct output column scala> spark.sql(" SELECT id, COUNT(DISTINCT *) FROM table GROUP BY id ") res4: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT id, value): bigint] scala> spark.sql(" SELECT id, COUNT(DISTINCT value) FROM table GROUP BY id ") res2: org.apache.spark.sql.DataFrame = [id: bigint, count(DISTINCT value): bigint] ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes #40116 from ritikam2/master. Authored-by: Ritika Maheshwari Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/RelationalGroupedDataset.scala | 1 + .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 12 2 files changed, 13 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 1c2e309bdaf..31c303921f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -89,6 +89,7 @@ class RelationalGroupedDataset protected[sql]( case expr: NamedExpression => expr case a: AggregateExpression if a.aggregateFunction.isInstanceOf[TypedAggregateExpression] => UnresolvedAlias(a, Some(Column.generateAlias)) +case u: UnresolvedFunction => UnresolvedAlias(expr, None) case expr: Expression => Alias(expr, toPrettySQL(expr))() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index a15c049715b..d4c4c7c9b16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1137,6 +1137,18 @@ class DataFrameSuite extends QueryTest checkAnswer(approxSummaryDF, approxSummaryResult) } + test("SPARK-41391: Correct the output column name of groupBy.agg(count_distinct)") { +withTempView("person") { + person.createOrReplaceTempView("person") + val df1 = person.groupBy("id").agg(count_distinct(col("name"))) + val df2 = spark.sql("SELECT id, COUNT(DISTINCT name) FROM person GROUP BY id") + assert(df1.columns === df2.columns) + val df3 = person.groupBy("id").agg(count_distinct(col("*"))) + val df4 = spark.sql("SELECT id, COUNT(DISTINCT *) FROM person GROUP BY id") + assert(df3.columns === df4.columns) +} + } + test("summary advanced") { val stats = Array("count", "50.01%", "max", "mean", "min", "25%") val orderMatters = person2.summary(stats: _*) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42969][CONNECT][TESTS] Fix the comparison the result with Arrow optimization enabled/disabled
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 68fa8cafc3c [SPARK-42969][CONNECT][TESTS] Fix the comparison the result with Arrow optimization enabled/disabled 68fa8cafc3c is described below commit 68fa8cafc3cfcdc043920ca8544c24ac88f0a63c Author: Takuya UESHIN AuthorDate: Fri Mar 31 08:45:44 2023 +0900 [SPARK-42969][CONNECT][TESTS] Fix the comparison the result with Arrow optimization enabled/disabled Fixes the comparison the result with Arrow optimization enabled/disabled. in `test_arrow`, there are a bunch of comparison between DataFrames with Arrow optimization enabled/disabled. These should be fixed to compare with the expected values so that it can be reusable for Spark Connect parity tests. No. Updated the tests. Closes #40612 from ueshin/issues/SPARK-42969/test_arrow. Authored-by: Takuya UESHIN Signed-off-by: Hyukjin Kwon (cherry picked from commit 35503a535771d257b517e7ddf2adfaefefd97dad) Signed-off-by: Hyukjin Kwon --- .../pyspark/sql/tests/connect/test_parity_arrow.py | 47 +++-- python/pyspark/sql/tests/test_arrow.py | 202 + 2 files changed, 163 insertions(+), 86 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py b/python/pyspark/sql/tests/connect/test_parity_arrow.py index f8180d661db..8953b2f8d98 100644 --- a/python/pyspark/sql/tests/connect/test_parity_arrow.py +++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py @@ -37,19 +37,20 @@ class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase): def test_createDataFrame_with_incorrect_schema(self): self.check_createDataFrame_with_incorrect_schema() -# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization enabled/disabled. +# TODO(SPARK-42982): INVALID_COLUMN_OR_FIELD_DATA_TYPE @unittest.skip("Fails in Spark Connect, should enable.") def test_createDataFrame_with_map_type(self): -super().test_createDataFrame_with_map_type() +self.check_createDataFrame_with_map_type(True) -# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization enabled/disabled. +# TODO(SPARK-42983): len() of unsized object @unittest.skip("Fails in Spark Connect, should enable.") def test_createDataFrame_with_ndarray(self): -super().test_createDataFrame_with_ndarray() +self.check_createDataFrame_with_ndarray(True) +# TODO(SPARK-42984): ValueError not raised @unittest.skip("Fails in Spark Connect, should enable.") def test_createDataFrame_with_single_data_type(self): -super().test_createDataFrame_with_single_data_type() +self.check_createDataFrame_with_single_data_type() @unittest.skip("Spark Connect does not support RDD but the tests depend on them.") def test_no_partition_frame(self): @@ -70,9 +71,20 @@ class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase): def test_toPandas_batch_order(self): super().test_toPandas_batch_order() -@unittest.skip("Spark Connect does not support Spark Context but the test depends on that.") def test_toPandas_empty_df_arrow_enabled(self): -super().test_toPandas_empty_df_arrow_enabled() +self.check_toPandas_empty_df_arrow_enabled(True) + +def test_create_data_frame_to_pandas_timestamp_ntz(self): +self.check_create_data_frame_to_pandas_timestamp_ntz(True) + +def test_create_data_frame_to_pandas_day_time_internal(self): +self.check_create_data_frame_to_pandas_day_time_internal(True) + +def test_toPandas_respect_session_timezone(self): +self.check_toPandas_respect_session_timezone(True) + +def test_toPandas_with_array_type(self): +self.check_toPandas_with_array_type(True) @unittest.skip("Spark Connect does not support fallback.") def test_toPandas_fallback_disabled(self): @@ -82,20 +94,29 @@ class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase): def test_toPandas_fallback_enabled(self): super().test_toPandas_fallback_enabled() -# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization enabled/disabled. +# TODO(SPARK-42982): INVALID_COLUMN_OR_FIELD_DATA_TYPE @unittest.skip("Fails in Spark Connect, should enable.") def test_toPandas_with_map_type(self): -super().test_toPandas_with_map_type() +self.check_toPandas_with_map_type(True) -# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization enabled/disabled. +# TODO(SPARK-42982): INVALID_COLUMN_OR_FIELD_DATA_TYPE @unittest.skip("Fails in Spark Connect, should enable.") def test_toPandas_with_map_type_nulls(self): -super().test_toPandas
[spark] branch branch-3.4 updated: [SPARK-42970][CONNECT][PYTHON][TESTS][3.4] Reuse pyspark.sql.tests.test_arrow test cases
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 98f00ea2bcf [SPARK-42970][CONNECT][PYTHON][TESTS][3.4] Reuse pyspark.sql.tests.test_arrow test cases 98f00ea2bcf is described below commit 98f00ea2bcfacb55615b13405573bb79768eb7cc Author: Takuya UESHIN AuthorDate: Fri Mar 31 08:46:36 2023 +0900 [SPARK-42970][CONNECT][PYTHON][TESTS][3.4] Reuse pyspark.sql.tests.test_arrow test cases ### What changes were proposed in this pull request? Reuses `pyspark.sql.tests.test_arrow` test cases. ### Why are the changes needed? `test_arrow` is also helpful because it contains many tests for `createDataFrame` with pandas or `toPandas`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added the tests. Closes #40595 from ueshin/issues/SPARK-42970/3.4/test_arrow. Authored-by: Takuya UESHIN Signed-off-by: Hyukjin Kwon --- dev/sparktestsupport/modules.py| 1 + .../pyspark/sql/tests/connect/test_parity_arrow.py | 110 + python/pyspark/sql/tests/test_arrow.py | 37 --- 3 files changed, 135 insertions(+), 13 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 29bc39e14bf..fd18ddd6d13 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -524,6 +524,7 @@ pyspark_connect = Module( "pyspark.sql.tests.connect.test_connect_basic", "pyspark.sql.tests.connect.test_connect_function", "pyspark.sql.tests.connect.test_connect_column", +"pyspark.sql.tests.connect.test_parity_arrow", "pyspark.sql.tests.connect.test_parity_datasources", "pyspark.sql.tests.connect.test_parity_errors", "pyspark.sql.tests.connect.test_parity_catalog", diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py b/python/pyspark/sql/tests/connect/test_parity_arrow.py new file mode 100644 index 000..f8180d661db --- /dev/null +++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py @@ -0,0 +1,110 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from pyspark.sql.tests.test_arrow import ArrowTestsMixin +from pyspark.testing.connectutils import ReusedConnectTestCase + + +class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase): +@unittest.skip("Spark Connect does not support Spark Context but the test depends on that.") +def test_createDataFrame_empty_partition(self): +super().test_createDataFrame_empty_partition() + +@unittest.skip("Spark Connect does not support fallback.") +def test_createDataFrame_fallback_disabled(self): +super().test_createDataFrame_fallback_disabled() + +@unittest.skip("Spark Connect does not support fallback.") +def test_createDataFrame_fallback_enabled(self): +super().test_createDataFrame_fallback_enabled() + +def test_createDataFrame_with_incorrect_schema(self): +self.check_createDataFrame_with_incorrect_schema() + +# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization enabled/disabled. +@unittest.skip("Fails in Spark Connect, should enable.") +def test_createDataFrame_with_map_type(self): +super().test_createDataFrame_with_map_type() + +# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization enabled/disabled. +@unittest.skip("Fails in Spark Connect, should enable.") +def test_createDataFrame_with_ndarray(self): +super().test_createDataFrame_with_ndarray() + +@unittest.skip("Fails in Spark Connect, should enable.") +def test_createDataFrame_with_single_data_type(self): +super().test_createDataFrame_with_single_data_type() + +@unittest.skip("Spark Connect does not support RDD but the tests depend on them.") +def test_no_partition_frame(self): +super().test_no_partition_frame() + +@unittest.skip("Spark Connect does not support RDD but th
[spark] branch master updated: [SPARK-42969][CONNECT][TESTS] Fix the comparison the result with Arrow optimization enabled/disabled
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 35503a53577 [SPARK-42969][CONNECT][TESTS] Fix the comparison the result with Arrow optimization enabled/disabled 35503a53577 is described below commit 35503a535771d257b517e7ddf2adfaefefd97dad Author: Takuya UESHIN AuthorDate: Fri Mar 31 08:45:44 2023 +0900 [SPARK-42969][CONNECT][TESTS] Fix the comparison the result with Arrow optimization enabled/disabled ### What changes were proposed in this pull request? Fixes the comparison the result with Arrow optimization enabled/disabled. ### Why are the changes needed? in `test_arrow`, there are a bunch of comparison between DataFrames with Arrow optimization enabled/disabled. These should be fixed to compare with the expected values so that it can be reusable for Spark Connect parity tests. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Updated the tests. Closes #40612 from ueshin/issues/SPARK-42969/test_arrow. Authored-by: Takuya UESHIN Signed-off-by: Hyukjin Kwon --- .../pyspark/sql/tests/connect/test_parity_arrow.py | 47 +++-- python/pyspark/sql/tests/test_arrow.py | 202 + 2 files changed, 163 insertions(+), 86 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_arrow.py b/python/pyspark/sql/tests/connect/test_parity_arrow.py index f8180d661db..8953b2f8d98 100644 --- a/python/pyspark/sql/tests/connect/test_parity_arrow.py +++ b/python/pyspark/sql/tests/connect/test_parity_arrow.py @@ -37,19 +37,20 @@ class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase): def test_createDataFrame_with_incorrect_schema(self): self.check_createDataFrame_with_incorrect_schema() -# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization enabled/disabled. +# TODO(SPARK-42982): INVALID_COLUMN_OR_FIELD_DATA_TYPE @unittest.skip("Fails in Spark Connect, should enable.") def test_createDataFrame_with_map_type(self): -super().test_createDataFrame_with_map_type() +self.check_createDataFrame_with_map_type(True) -# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization enabled/disabled. +# TODO(SPARK-42983): len() of unsized object @unittest.skip("Fails in Spark Connect, should enable.") def test_createDataFrame_with_ndarray(self): -super().test_createDataFrame_with_ndarray() +self.check_createDataFrame_with_ndarray(True) +# TODO(SPARK-42984): ValueError not raised @unittest.skip("Fails in Spark Connect, should enable.") def test_createDataFrame_with_single_data_type(self): -super().test_createDataFrame_with_single_data_type() +self.check_createDataFrame_with_single_data_type() @unittest.skip("Spark Connect does not support RDD but the tests depend on them.") def test_no_partition_frame(self): @@ -70,9 +71,20 @@ class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase): def test_toPandas_batch_order(self): super().test_toPandas_batch_order() -@unittest.skip("Spark Connect does not support Spark Context but the test depends on that.") def test_toPandas_empty_df_arrow_enabled(self): -super().test_toPandas_empty_df_arrow_enabled() +self.check_toPandas_empty_df_arrow_enabled(True) + +def test_create_data_frame_to_pandas_timestamp_ntz(self): +self.check_create_data_frame_to_pandas_timestamp_ntz(True) + +def test_create_data_frame_to_pandas_day_time_internal(self): +self.check_create_data_frame_to_pandas_day_time_internal(True) + +def test_toPandas_respect_session_timezone(self): +self.check_toPandas_respect_session_timezone(True) + +def test_toPandas_with_array_type(self): +self.check_toPandas_with_array_type(True) @unittest.skip("Spark Connect does not support fallback.") def test_toPandas_fallback_disabled(self): @@ -82,20 +94,29 @@ class ArrowParityTests(ArrowTestsMixin, ReusedConnectTestCase): def test_toPandas_fallback_enabled(self): super().test_toPandas_fallback_enabled() -# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization enabled/disabled. +# TODO(SPARK-42982): INVALID_COLUMN_OR_FIELD_DATA_TYPE @unittest.skip("Fails in Spark Connect, should enable.") def test_toPandas_with_map_type(self): -super().test_toPandas_with_map_type() +self.check_toPandas_with_map_type(True) -# TODO(SPARK-42969): Fix the comparison the result with Arrow optimization enabled/disabled. +# TODO(SPARK-42982): INVALID_COLUMN_OR_FIELD_DATA_TYPE @unittest.skip("Fails in Spark Connect, should
[spark] branch branch-3.2 updated: [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 8488a255912 [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled 8488a255912 is described below commit 8488a255912b821c1439603c2e72d7b252ef0a57 Author: Xingbo Jiang AuthorDate: Thu Mar 30 15:48:04 2023 -0700 [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled ### What changes were proposed in this pull request? The PR fixes a bug that SparkListenerTaskStart can have `stageAttemptId = -1` when a task is launched after the stage is cancelled. Actually, we should use the information within `Task` to update the `stageAttemptId` field. ### Why are the changes needed? -1 is not a legal stageAttemptId value, thus it can lead to unexpected problem if a subscriber try to parse the stage information from the SparkListenerTaskStart event. ### Does this PR introduce _any_ user-facing change? No, it's a bugfix. ### How was this patch tested? Manually verified. Closes #40592 from jiangxb1987/SPARK-42967. Authored-by: Xingbo Jiang Signed-off-by: Gengliang Wang (cherry picked from commit 1a6b1770c85f37982b15d261abf9cc6e4be740f4) Signed-off-by: Gengliang Wang --- core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a82d261d545..b950c07f3d8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1130,11 +1130,7 @@ private[spark] class DAGScheduler( } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = { -// Note that there is a chance that this task is launched after the stage is cancelled. -// In that case, we wouldn't have the stage anymore in stageIdToStage. -val stageAttemptId = - stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1) -listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) +listenerBus.post(SparkListenerTaskStart(task.stageId, task.stageAttemptId, taskInfo)) } private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1a6b1770c85 [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled 1a6b1770c85 is described below commit 1a6b1770c85f37982b15d261abf9cc6e4be740f4 Author: Xingbo Jiang AuthorDate: Thu Mar 30 15:48:04 2023 -0700 [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled ### What changes were proposed in this pull request? The PR fixes a bug that SparkListenerTaskStart can have `stageAttemptId = -1` when a task is launched after the stage is cancelled. Actually, we should use the information within `Task` to update the `stageAttemptId` field. ### Why are the changes needed? -1 is not a legal stageAttemptId value, thus it can lead to unexpected problem if a subscriber try to parse the stage information from the SparkListenerTaskStart event. ### Does this PR introduce _any_ user-facing change? No, it's a bugfix. ### How was this patch tested? Manually verified. Closes #40592 from jiangxb1987/SPARK-42967. Authored-by: Xingbo Jiang Signed-off-by: Gengliang Wang --- core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 065bf88ab16..c78a26d91eb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1183,11 +1183,7 @@ private[spark] class DAGScheduler( } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = { -// Note that there is a chance that this task is launched after the stage is cancelled. -// In that case, we wouldn't have the stage anymore in stageIdToStage. -val stageAttemptId = - stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1) -listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) +listenerBus.post(SparkListenerTaskStart(task.stageId, task.stageAttemptId, taskInfo)) } private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 6f266ee1f76 [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled 6f266ee1f76 is described below commit 6f266ee1f76323e147317a8ab98600dd3868667a Author: Xingbo Jiang AuthorDate: Thu Mar 30 15:48:04 2023 -0700 [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled ### What changes were proposed in this pull request? The PR fixes a bug that SparkListenerTaskStart can have `stageAttemptId = -1` when a task is launched after the stage is cancelled. Actually, we should use the information within `Task` to update the `stageAttemptId` field. ### Why are the changes needed? -1 is not a legal stageAttemptId value, thus it can lead to unexpected problem if a subscriber try to parse the stage information from the SparkListenerTaskStart event. ### Does this PR introduce _any_ user-facing change? No, it's a bugfix. ### How was this patch tested? Manually verified. Closes #40592 from jiangxb1987/SPARK-42967. Authored-by: Xingbo Jiang Signed-off-by: Gengliang Wang (cherry picked from commit 1a6b1770c85f37982b15d261abf9cc6e4be740f4) Signed-off-by: Gengliang Wang --- core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 0de60224179..bd2823bcac1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1152,11 +1152,7 @@ private[spark] class DAGScheduler( } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = { -// Note that there is a chance that this task is launched after the stage is cancelled. -// In that case, we wouldn't have the stage anymore in stageIdToStage. -val stageAttemptId = - stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1) -listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) +listenerBus.post(SparkListenerTaskStart(task.stageId, task.stageAttemptId, taskInfo)) } private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new e20b55bf4de [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled e20b55bf4de is described below commit e20b55bf4dec353b97f9eae38988fa915213a214 Author: Xingbo Jiang AuthorDate: Thu Mar 30 15:48:04 2023 -0700 [SPARK-42967][CORE][3.2][3.3][3.4] Fix SparkListenerTaskStart.stageAttemptId when a task is started after the stage is cancelled ### What changes were proposed in this pull request? The PR fixes a bug that SparkListenerTaskStart can have `stageAttemptId = -1` when a task is launched after the stage is cancelled. Actually, we should use the information within `Task` to update the `stageAttemptId` field. ### Why are the changes needed? -1 is not a legal stageAttemptId value, thus it can lead to unexpected problem if a subscriber try to parse the stage information from the SparkListenerTaskStart event. ### Does this PR introduce _any_ user-facing change? No, it's a bugfix. ### How was this patch tested? Manually verified. Closes #40592 from jiangxb1987/SPARK-42967. Authored-by: Xingbo Jiang Signed-off-by: Gengliang Wang (cherry picked from commit 1a6b1770c85f37982b15d261abf9cc6e4be740f4) Signed-off-by: Gengliang Wang --- core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f1ccaf05509..2a966fab6f0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1171,11 +1171,7 @@ private[spark] class DAGScheduler( } private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = { -// Note that there is a chance that this task is launched after the stage is cancelled. -// In that case, we wouldn't have the stage anymore in stageIdToStage. -val stageAttemptId = - stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1) -listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) +listenerBus.post(SparkListenerTaskStart(task.stageId, task.stageAttemptId, taskInfo)) } private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_], taskIndex: Int): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-42979][SQL] Define literal constructors as keywords
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5a17537aa4a [SPARK-42979][SQL] Define literal constructors as keywords 5a17537aa4a is described below commit 5a17537aa4a777429431542cfa6184591476e54a Author: Max Gekk AuthorDate: Thu Mar 30 17:43:54 2023 +0300 [SPARK-42979][SQL] Define literal constructors as keywords ### What changes were proposed in this pull request? In the PR, I propose to define literal constructors `DATE`, `TIMESTAMP`, `TIMESTAMP_NTZ`, `TIMESTAMP_LTZ`, `INTERVAL`, and `X` as Spark SQL keywords. ### Why are the changes needed? The non-keywords literal constructors cause some inconveniences while analysing/transforming the lexer tree. For example, while forming the stable column aliases, see https://github.com/apache/spark/pull/40126. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *SQLKeywordSuite" $ build/sbt "test:testOnly *.ResolveAliasesSuite" ``` Closes #40593 from MaxGekk/typed-literal-keywords. Authored-by: Max Gekk Signed-off-by: Max Gekk --- docs/sql-ref-ansi-compliance.md| 1 + .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 1 + .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 12 ++- .../spark/sql/catalyst/parser/AstBuilder.scala | 23 +++--- .../catalyst/analysis/ResolveAliasesSuite.scala| 4 ++-- 5 files changed, 27 insertions(+), 14 deletions(-) diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index 36d1f8f73eb..d4bb0e93bee 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -672,6 +672,7 @@ Below is a list of all the keywords in Spark SQL. |WINDOW|non-reserved|non-reserved|reserved| |WITH|reserved|non-reserved|reserved| |WITHIN|reserved|non-reserved|reserved| +|X|non-reserved|non-reserved|non-reserved| |YEAR|non-reserved|non-reserved|non-reserved| |YEARS|non-reserved|non-reserved|non-reserved| |ZONE|non-reserved|non-reserved|non-reserved| diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 4d446b494f7..c9930fa0986 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -208,6 +208,7 @@ GRANT: 'GRANT'; GROUP: 'GROUP'; GROUPING: 'GROUPING'; HAVING: 'HAVING'; +BINARY_HEX: 'X'; HOUR: 'HOUR'; HOURS: 'HOURS'; IF: 'IF'; diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index ab54aef35df..a112b6e31fe 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -928,11 +928,19 @@ primaryExpression (FILTER LEFT_PAREN WHERE where=booleanExpression RIGHT_PAREN)? ( OVER windowSpec)? #percentile ; +literalType +: DATE +| TIMESTAMP | TIMESTAMP_LTZ | TIMESTAMP_NTZ +| INTERVAL +| BINARY_HEX +| unsupportedType=identifier +; + constant : NULL #nullLiteral | COLON identifier #parameterLiteral | interval #intervalLiteral -| identifier stringLit #typeConstructor +| literalType stringLit #typeConstructor | number #numericLiteral | booleanValue #booleanLiteral | stringLit+ #stringLiteral @@ -1227,6 +1235,7 @@ ansiNonReserved | BETWEEN | BIGINT | BINARY +| BINARY_HEX | BOOLEAN | BUCKET | BUCKETS @@ -1514,6 +1523,7 @@ nonReserved | BETWEEN | BIGINT | BINARY +| BINARY_HEX | BOOLEAN | BOTH | BUCKET diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala inde
[spark] branch master updated: [SPARK-42968][SS] Add option to skip commit coordinator as part of StreamingWrite API for DSv2 sources/sinks
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 122a88c4941 [SPARK-42968][SS] Add option to skip commit coordinator as part of StreamingWrite API for DSv2 sources/sinks 122a88c4941 is described below commit 122a88c4941d8ce1b8344c425fed455b79298afa Author: Anish Shrigondekar AuthorDate: Thu Mar 30 21:48:47 2023 +0900 [SPARK-42968][SS] Add option to skip commit coordinator as part of StreamingWrite API for DSv2 sources/sinks ### What changes were proposed in this pull request? Add option to skip commit coordinator as part of StreamingWrite API for DSv2 sources/sinks. This option was already present as part of the BatchWrite API ### Why are the changes needed? Sinks such as the following are atleast-once for which we do not need to go through the commit coordinator on the driver to ensure that a single partition commits. This is even less useful for streaming use-cases where batches could be replayed from the checkpoint dir. - memory sink - console sink - no-op sink - Kafka v2 sink ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test for the change ``` [info] ReportSinkMetricsSuite: 22:23:01.276 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 22:23:03.139 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. [info] - test ReportSinkMetrics with useCommitCoordinator=true (2 seconds, 709 milliseconds) 22:23:04.522 WARN org.apache.spark.sql.execution.streaming.ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled. [info] - test ReportSinkMetrics with useCommitCoordinator=false (373 milliseconds) 22:23:04.941 WARN org.apache.spark.sql.streaming.ReportSinkMetricsSuite: = POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.streaming.ReportSinkMetricsSuite, threads: ForkJoinPool.commonPool-worker-19 (daemon=true), rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true) = [info] Run completed in 4 seconds, 934 milliseconds. [info] Total number of tests run: 2 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 2, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` Closes #40600 from anishshri-db/task/SPARK-42968. Authored-by: Anish Shrigondekar Signed-off-by: Jungtaek Lim --- .../spark/sql/kafka010/KafkaStreamingWrite.scala | 2 + .../connector/write/streaming/StreamingWrite.java | 10 +++ .../datasources/noop/NoopDataSource.scala | 1 + .../streaming/sources/ConsoleStreamingWrite.scala | 2 + .../streaming/sources/MicroBatchWrite.scala| 4 + .../sql/execution/streaming/sources/memory.scala | 2 + .../sql/streaming/ReportSinkMetricsSuite.scala | 85 -- 7 files changed, 67 insertions(+), 39 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala index bcf9e3416f8..db719966267 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWrite.scala @@ -45,6 +45,8 @@ private[kafka010] class KafkaStreamingWrite( info: PhysicalWriteInfo): KafkaStreamWriterFactory = KafkaStreamWriterFactory(topic, producerParams, schema) + override def useCommitCoordinator(): Boolean = false + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java index 20694f0b051..ab98bc01b3a 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java @@ -58,6 +58,16 @@ public interface StreamingWrite { */ StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info); + /** + * Returns whether Spark should use the commit coordinator to ensure that at most one task for + * each partition commits. + * + * @return true if commit coordinator should
[spark] branch branch-3.4 updated: Revert "[SPARK-41765][SQL] Pull out v1 write metrics to WriteFiles"
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new e586527f79c Revert "[SPARK-41765][SQL] Pull out v1 write metrics to WriteFiles" e586527f79c is described below commit e586527f79c93519467d202a4e258864fda6e8f8 Author: Wenchen Fan AuthorDate: Thu Mar 30 19:43:36 2023 +0800 Revert "[SPARK-41765][SQL] Pull out v1 write metrics to WriteFiles" This reverts commit a111a02de1a814c5f335e0bcac4cffb0515557dc. ### What changes were proposed in this pull request? SQLMetrics is not only used in the UI, but is also a programming API as users can write a listener, get the physical plan, and read the SQLMetrics values directly. We can ask users to update their code and read SQLMetrics from the new `WriteFiles` node instead. But this is troublesome and sometimes they may need to get both write metrics and commit metrics, then they need to look at two physical plan nodes. Given that https://github.com/apache/spark/pull/39428 is mostly for cleanup and does not have many benefits, reverting is a better idea. ### Why are the changes needed? avoid breaking changes. ### Does this PR introduce _any_ user-facing change? Yes, they can programmatically get the write command metrics as before. ### How was this patch tested? N/A Closes #40604 from cloud-fan/revert. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan (cherry picked from commit f4af6a05c0879887e7db2377a174e7b7d7bab693) Signed-off-by: Wenchen Fan --- .../sql/execution/command/DataWritingCommand.scala | 47 --- .../datasources/BasicWriteStatsTracker.scala | 17 +- .../execution/datasources/FileFormatWriter.scala | 69 +- .../sql/execution/datasources/WriteFiles.scala | 3 - .../BasicWriteJobStatsTrackerMetricSuite.scala | 19 +++--- .../sql/execution/metric/SQLMetricsSuite.scala | 28 - .../sql/execution/metric/SQLMetricsTestUtils.scala | 21 +-- .../hive/execution/InsertIntoHiveDirCommand.scala | 7 --- .../spark/sql/hive/execution/SQLMetricsSuite.scala | 42 + 9 files changed, 94 insertions(+), 159 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 58c3fca4ad7..338ce8cac42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -21,13 +21,14 @@ import java.net.URI import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand} import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker -import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.SerializableConfiguration @@ -51,19 +52,11 @@ trait DataWritingCommand extends UnaryCommand { def outputColumns: Seq[Attribute] = DataWritingCommand.logicalPlanOutputWithNames(query, outputColumnNames) - lazy val metrics: Map[String, SQLMetric] = { -// If planned write is enable, we have pulled out write files metrics from `V1WriteCommand` -// to `WriteFiles`. `DataWritingCommand` should only holds the task commit metric and driver -// commit metric. -if (conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) { - BasicWriteJobStatsTracker.writeCommitMetrics -} else { - BasicWriteJobStatsTracker.metrics -} - } + lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics def basicWriteJobStatsTracker(hadoopConf: Configuration): BasicWriteJobStatsTracker = { -DataWritingCommand.basicWriteJobStatsTracker(metrics, hadoopConf) +val serializableHadoopConf = new SerializableConfiguration(hadoopConf) +new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) } def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] @@ -86,6 +79,27 @@ object DataWritingCommand { } } + /** + * When execute CTAS operators, Spark will use [[InsertIntoHadoopFsRelationCommand]] + * or [[InsertIntoHiveTable]] command to write data, they both inherit metrics from + * [[DataWritingCom
[spark] branch master updated: Revert "[SPARK-41765][SQL] Pull out v1 write metrics to WriteFiles"
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f4af6a05c08 Revert "[SPARK-41765][SQL] Pull out v1 write metrics to WriteFiles" f4af6a05c08 is described below commit f4af6a05c0879887e7db2377a174e7b7d7bab693 Author: Wenchen Fan AuthorDate: Thu Mar 30 19:43:36 2023 +0800 Revert "[SPARK-41765][SQL] Pull out v1 write metrics to WriteFiles" This reverts commit a111a02de1a814c5f335e0bcac4cffb0515557dc. ### What changes were proposed in this pull request? SQLMetrics is not only used in the UI, but is also a programming API as users can write a listener, get the physical plan, and read the SQLMetrics values directly. We can ask users to update their code and read SQLMetrics from the new `WriteFiles` node instead. But this is troublesome and sometimes they may need to get both write metrics and commit metrics, then they need to look at two physical plan nodes. Given that https://github.com/apache/spark/pull/39428 is mostly for cleanup and does not have many benefits, reverting is a better idea. ### Why are the changes needed? avoid breaking changes. ### Does this PR introduce _any_ user-facing change? Yes, they can programmatically get the write command metrics as before. ### How was this patch tested? N/A Closes #40604 from cloud-fan/revert. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../sql/execution/command/DataWritingCommand.scala | 47 --- .../datasources/BasicWriteStatsTracker.scala | 17 +- .../execution/datasources/FileFormatWriter.scala | 69 +- .../sql/execution/datasources/WriteFiles.scala | 3 - .../BasicWriteJobStatsTrackerMetricSuite.scala | 19 +++--- .../sql/execution/metric/SQLMetricsSuite.scala | 28 - .../sql/execution/metric/SQLMetricsTestUtils.scala | 21 +-- .../hive/execution/InsertIntoHiveDirCommand.scala | 7 --- .../spark/sql/hive/execution/SQLMetricsSuite.scala | 42 + 9 files changed, 94 insertions(+), 159 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 58c3fca4ad7..338ce8cac42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -21,13 +21,14 @@ import java.net.URI import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkContext import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand} import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker -import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.SerializableConfiguration @@ -51,19 +52,11 @@ trait DataWritingCommand extends UnaryCommand { def outputColumns: Seq[Attribute] = DataWritingCommand.logicalPlanOutputWithNames(query, outputColumnNames) - lazy val metrics: Map[String, SQLMetric] = { -// If planned write is enable, we have pulled out write files metrics from `V1WriteCommand` -// to `WriteFiles`. `DataWritingCommand` should only holds the task commit metric and driver -// commit metric. -if (conf.getConf(SQLConf.PLANNED_WRITE_ENABLED)) { - BasicWriteJobStatsTracker.writeCommitMetrics -} else { - BasicWriteJobStatsTracker.metrics -} - } + lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics def basicWriteJobStatsTracker(hadoopConf: Configuration): BasicWriteJobStatsTracker = { -DataWritingCommand.basicWriteJobStatsTracker(metrics, hadoopConf) +val serializableHadoopConf = new SerializableConfiguration(hadoopConf) +new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) } def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] @@ -86,6 +79,27 @@ object DataWritingCommand { } } + /** + * When execute CTAS operators, Spark will use [[InsertIntoHadoopFsRelationCommand]] + * or [[InsertIntoHiveTable]] command to write data, they both inherit metrics from + * [[DataWritingCommand]], but after running [[InsertIntoHadoopFsRelationCommand]] + * or [[InsertIntoHiveTable]], we only update
[spark] branch master updated (880312b5ade -> b85fe2bff30)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 880312b5ade [SPARK-42907][TESTS][FOLLOWUP] Avro functions doctest cleanup add b85fe2bff30 [SPARK-42683] Automatically rename conflicting metadata columns No new revisions were added by this update. Summary of changes: .../CheckConnectJvmClientCompatibility.scala | 1 + .../connector/catalog/SupportsMetadataColumns.java | 24 +++-- .../catalyst/expressions/namedExpressions.scala| 33 +-- .../sql/catalyst/plans/logical/LogicalPlan.scala | 48 -- .../apache/spark/sql/catalyst/util/package.scala | 5 +- .../datasources/v2/DataSourceV2Implicits.scala | 2 +- .../datasources/v2/DataSourceV2Relation.scala | 3 +- .../sql/connector/catalog/InMemoryBaseTable.scala | 29 -- .../main/scala/org/apache/spark/sql/Dataset.scala | 12 +++ .../execution/datasources/FileSourceStrategy.scala | 4 +- .../spark/sql/connector/MetadataColumnSuite.scala | 78 +++- .../datasources/FileMetadataStructSuite.scala | 101 + 12 files changed, 305 insertions(+), 35 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org