[spark] branch branch-3.5 updated: [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 959d93aa9be [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions 959d93aa9be is described below commit 959d93aa9bebde1d081afd5d2f2ad60aec907c31 Author: allisonwang-db AuthorDate: Fri Sep 8 14:25:51 2023 +0900 [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions ### What changes were proposed in this pull request? This PR adds a user guide for Python user-defined table functions (UDTFs) introduced in Spark 3.5. https://github.com/apache/spark/assets/66282705/11f5dc5e-681b-4677-a466-1a23c0b8dd01;> ### Why are the changes needed? To help users write Python UDTFs. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? docs test Closes #42272 from allisonwang-db/spark-44508-udtf-user-guide. Authored-by: allisonwang-db Signed-off-by: Hyukjin Kwon (cherry picked from commit aaf413ce351dd716096333df140f45f7f1bd5dd6) Signed-off-by: Hyukjin Kwon --- examples/src/main/python/sql/udtf.py | 240 ++ python/docs/source/user_guide/sql/index.rst | 1 + python/docs/source/user_guide/sql/python_udtf.rst | 233 + python/mypy.ini | 6 + python/pyspark/sql/functions.py | 7 - 5 files changed, 480 insertions(+), 7 deletions(-) diff --git a/examples/src/main/python/sql/udtf.py b/examples/src/main/python/sql/udtf.py new file mode 100644 index 000..768eb73566e --- /dev/null +++ b/examples/src/main/python/sql/udtf.py @@ -0,0 +1,240 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A simple example demonstrating Python UDTFs in Spark +Run with: + ./bin/spark-submit examples/src/main/python/sql/udtf.py +""" + +# NOTE that this file is imported in the User Guides in PySpark documentation. +# The codes are referred via line numbers. See also `literalinclude` directive in Sphinx. +from pyspark.sql import SparkSession +from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version + +# Python UDTFs use Arrow by default. +require_minimum_pandas_version() +require_minimum_pyarrow_version() + + +def python_udtf_simple_example(spark: SparkSession) -> None: + +# Define the UDTF class and implement the required `eval` method. +class SquareNumbers: +def eval(self, start: int, end: int): +for num in range(start, end + 1): +yield (num, num * num) + +from pyspark.sql.functions import lit, udtf + +# Create a UDTF using the class definition and the `udtf` function. +square_num = udtf(SquareNumbers, returnType="num: int, squared: int") + +# Invoke the UDTF in PySpark. +square_num(lit(1), lit(3)).show() +# +---+---+ +# |num|squared| +# +---+---+ +# | 1| 1| +# | 2| 4| +# | 3| 9| +# +---+---+ + + +def python_udtf_decorator_example(spark: SparkSession) -> None: + +from pyspark.sql.functions import lit, udtf + +# Define a UDTF using the `udtf` decorator directly on the class. +@udtf(returnType="num: int, squared: int") +class SquareNumbers: +def eval(self, start: int, end: int): +for num in range(start, end + 1): +yield (num, num * num) + +# Invoke the UDTF in PySpark using the SquareNumbers class directly. +SquareNumbers(lit(1), lit(3)).show() +# +---+---+ +# |num|squared| +# +---+---+ +# | 1| 1| +# | 2| 4| +# | 3| 9| +# +---+---+ + + +def python_udtf_registration(spark: SparkSession) -> None: + +from pyspark.sql.functions import udtf + +@udtf(returnType="word: string") +class WordSplitter: +def eval(self, text: str): +for word in text.split(" "): +yield (word.strip(),) + +# Register the UDTF
[spark] branch master updated: [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new aaf413ce351 [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions aaf413ce351 is described below commit aaf413ce351dd716096333df140f45f7f1bd5dd6 Author: allisonwang-db AuthorDate: Fri Sep 8 14:25:51 2023 +0900 [SPARK-44508][PYTHON][DOCS] Add user guide for Python user-defined table functions ### What changes were proposed in this pull request? This PR adds a user guide for Python user-defined table functions (UDTFs) introduced in Spark 3.5. https://github.com/apache/spark/assets/66282705/11f5dc5e-681b-4677-a466-1a23c0b8dd01;> ### Why are the changes needed? To help users write Python UDTFs. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? docs test Closes #42272 from allisonwang-db/spark-44508-udtf-user-guide. Authored-by: allisonwang-db Signed-off-by: Hyukjin Kwon --- examples/src/main/python/sql/udtf.py | 240 ++ python/docs/source/user_guide/sql/index.rst | 1 + python/docs/source/user_guide/sql/python_udtf.rst | 233 + python/mypy.ini | 6 + python/pyspark/sql/functions.py | 7 - 5 files changed, 480 insertions(+), 7 deletions(-) diff --git a/examples/src/main/python/sql/udtf.py b/examples/src/main/python/sql/udtf.py new file mode 100644 index 000..768eb73566e --- /dev/null +++ b/examples/src/main/python/sql/udtf.py @@ -0,0 +1,240 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +A simple example demonstrating Python UDTFs in Spark +Run with: + ./bin/spark-submit examples/src/main/python/sql/udtf.py +""" + +# NOTE that this file is imported in the User Guides in PySpark documentation. +# The codes are referred via line numbers. See also `literalinclude` directive in Sphinx. +from pyspark.sql import SparkSession +from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version + +# Python UDTFs use Arrow by default. +require_minimum_pandas_version() +require_minimum_pyarrow_version() + + +def python_udtf_simple_example(spark: SparkSession) -> None: + +# Define the UDTF class and implement the required `eval` method. +class SquareNumbers: +def eval(self, start: int, end: int): +for num in range(start, end + 1): +yield (num, num * num) + +from pyspark.sql.functions import lit, udtf + +# Create a UDTF using the class definition and the `udtf` function. +square_num = udtf(SquareNumbers, returnType="num: int, squared: int") + +# Invoke the UDTF in PySpark. +square_num(lit(1), lit(3)).show() +# +---+---+ +# |num|squared| +# +---+---+ +# | 1| 1| +# | 2| 4| +# | 3| 9| +# +---+---+ + + +def python_udtf_decorator_example(spark: SparkSession) -> None: + +from pyspark.sql.functions import lit, udtf + +# Define a UDTF using the `udtf` decorator directly on the class. +@udtf(returnType="num: int, squared: int") +class SquareNumbers: +def eval(self, start: int, end: int): +for num in range(start, end + 1): +yield (num, num * num) + +# Invoke the UDTF in PySpark using the SquareNumbers class directly. +SquareNumbers(lit(1), lit(3)).show() +# +---+---+ +# |num|squared| +# +---+---+ +# | 1| 1| +# | 2| 4| +# | 3| 9| +# +---+---+ + + +def python_udtf_registration(spark: SparkSession) -> None: + +from pyspark.sql.functions import udtf + +@udtf(returnType="word: string") +class WordSplitter: +def eval(self, text: str): +for word in text.split(" "): +yield (word.strip(),) + +# Register the UDTF for use in Spark SQL. +spark.udtf.register("split_words", WordSplitter) + +# Example: Using the UDTF in
[spark] branch branch-3.4 updated: [SPARK-45103][BUILD][3.4] Update ORC to 1.8.5
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new d53546298cd [SPARK-45103][BUILD][3.4] Update ORC to 1.8.5 d53546298cd is described below commit d53546298cdabfdf686b5e3f5fff5e4af3959874 Author: Dongjoon Hyun AuthorDate: Thu Sep 7 15:37:17 2023 -0700 [SPARK-45103][BUILD][3.4] Update ORC to 1.8.5 ### What changes were proposed in this pull request? This PR aims to upgrade Apache ORC to 1.8.4 for Apache Spark 3.4.2. Please note that Apache ORC community maintains - Apache ORC 1.8.x for Apache Spark 3.4.x - Apache ORC 1.9.x for Apache Spark 3.5.x - Apache ORC 2.0.x for Apache Spark 4.0.x ### Why are the changes needed? To bring the latest bug fixes like [ORC-1482](https://issues.apache.org/jira/browse/ORC-1482). - https://orc.apache.org/news/2023/09/05/ORC-1.8.5/ ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42851 from dongjoon-hyun/SPARK-45103. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 6 +++--- dev/deps/spark-deps-hadoop-3-hive-2.3 | 6 +++--- pom.xml | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index 6c22673a7df..c562b0b7e16 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -222,9 +222,9 @@ objenesis/3.2//objenesis-3.2.jar okhttp/3.12.12//okhttp-3.12.12.jar okio/1.15.0//okio-1.15.0.jar opencsv/2.3//opencsv-2.3.jar -orc-core/1.8.4/shaded-protobuf/orc-core-1.8.4-shaded-protobuf.jar -orc-mapreduce/1.8.4/shaded-protobuf/orc-mapreduce-1.8.4-shaded-protobuf.jar -orc-shims/1.8.4//orc-shims-1.8.4.jar +orc-core/1.8.5/shaded-protobuf/orc-core-1.8.5-shaded-protobuf.jar +orc-mapreduce/1.8.5/shaded-protobuf/orc-mapreduce-1.8.5-shaded-protobuf.jar +orc-shims/1.8.5//orc-shims-1.8.5.jar oro/2.0.8//oro-2.0.8.jar osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar paranamer/2.8//paranamer-2.8.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 68a5cbced62..bcfc8c92b10 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -209,9 +209,9 @@ opencsv/2.3//opencsv-2.3.jar opentracing-api/0.33.0//opentracing-api-0.33.0.jar opentracing-noop/0.33.0//opentracing-noop-0.33.0.jar opentracing-util/0.33.0//opentracing-util-0.33.0.jar -orc-core/1.8.4/shaded-protobuf/orc-core-1.8.4-shaded-protobuf.jar -orc-mapreduce/1.8.4/shaded-protobuf/orc-mapreduce-1.8.4-shaded-protobuf.jar -orc-shims/1.8.4//orc-shims-1.8.4.jar +orc-core/1.8.5/shaded-protobuf/orc-core-1.8.5-shaded-protobuf.jar +orc-mapreduce/1.8.5/shaded-protobuf/orc-mapreduce-1.8.5-shaded-protobuf.jar +orc-shims/1.8.5//orc-shims-1.8.5.jar oro/2.0.8//oro-2.0.8.jar osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar paranamer/2.8//paranamer-2.8.jar diff --git a/pom.xml b/pom.xml index a681adba193..12fe9b50248 100644 --- a/pom.xml +++ b/pom.xml @@ -141,7 +141,7 @@ 10.14.2.0 1.12.3 -1.8.4 +1.8.5 shaded-protobuf 9.4.50.v20221201 4.0.3 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44732][XML][FOLLOWUP] Partial backport of spark-xml "Shortcut common type inference cases to fail fast"
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a37c265371d [SPARK-44732][XML][FOLLOWUP] Partial backport of spark-xml "Shortcut common type inference cases to fail fast" a37c265371d is described below commit a37c265371dc861fa478dd63deaa38a86415fe3b Author: Sean Owen AuthorDate: Thu Sep 7 15:21:36 2023 -0700 [SPARK-44732][XML][FOLLOWUP] Partial backport of spark-xml "Shortcut common type inference cases to fail fast" ### What changes were proposed in this pull request? Partial back-port of https://github.com/databricks/spark-xml/commit/994e357f7666956b5d0e63627716b2c092d9abbd?diff=split from spark-xml ### Why are the changes needed? Though no more development was intended on spark-xml, there was a non-trivial improvement to inference speed that I committed anyway to resolve a customer issue. Part of it can be 'backported' here to sync the code. I attached this as a follow-up to the main code port JIRA. There is still, in general, no intent to commit more to spark-xml in the meantime unless it's significantly important. ### Does this PR introduce _any_ user-facing change? No, this should only speed up schema inference without behavior change. ### How was this patch tested? Tested in spark-xml, and will be tested by tests here too Closes #42844 from srowen/SPARK-44732.2. Authored-by: Sean Owen Signed-off-by: Sean Owen --- .../org/apache/spark/sql/catalyst/xml/TypeCast.scala | 16 1 file changed, 16 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/TypeCast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/TypeCast.scala index a00f372da7f..b065dd41f28 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/TypeCast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/TypeCast.scala @@ -155,6 +155,12 @@ private[sql] object TypeCast { } else { value } +// A little shortcut to avoid trying many formatters in the common case that +// the input isn't a double. All built-in formats will start with a digit or period. +if (signSafeValue.isEmpty || + !(Character.isDigit(signSafeValue.head) || signSafeValue.head == '.')) { + return false +} // Rule out strings ending in D or F, as they will parse as double but should be disallowed if (value.nonEmpty && (value.last match { case 'd' | 'D' | 'f' | 'F' => true @@ -171,6 +177,11 @@ private[sql] object TypeCast { } else { value } +// A little shortcut to avoid trying many formatters in the common case that +// the input isn't a number. All built-in formats will start with a digit. +if (signSafeValue.isEmpty || !Character.isDigit(signSafeValue.head)) { + return false +} (allCatch opt signSafeValue.toInt).isDefined } @@ -180,6 +191,11 @@ private[sql] object TypeCast { } else { value } +// A little shortcut to avoid trying many formatters in the common case that +// the input isn't a number. All built-in formats will start with a digit. +if (signSafeValue.isEmpty || !Character.isDigit(signSafeValue.head)) { + return false +} (allCatch opt signSafeValue.toLong).isDefined } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0e6e15ca633 -> b8b58e0b95b)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 0e6e15ca633 [SPARK-45080][SS] Explicitly call out support for columnar in DSv2 streaming data sources add b8b58e0b95b [SPARK-45077][UI] Upgrade dagre-d3.js from 0.4.3 to 0.6.4 No new revisions were added by this update. Summary of changes: .../org/apache/spark/ui/static/dagre-d3.min.js | 4836 +++- 1 file changed, 4829 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45080][SS] Explicitly call out support for columnar in DSv2 streaming data sources
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0e6e15ca633 [SPARK-45080][SS] Explicitly call out support for columnar in DSv2 streaming data sources 0e6e15ca633 is described below commit 0e6e15ca6331d37a6c38c970556903c6df5d5dfb Author: Jungtaek Lim AuthorDate: Thu Sep 7 19:42:58 2023 +0900 [SPARK-45080][SS] Explicitly call out support for columnar in DSv2 streaming data sources ### What changes were proposed in this pull request? This PR proposes to override `Scan.columnarSupportMode` for DSv2 streaming data sources. All of them don't support columnar. This applies [SPARK-44505](https://issues.apache.org/jira/browse/SPARK-44505) to the DSv2 streaming data sources. Rationalization will be explained in the next section. ### Why are the changes needed? The default value for `Scan.columnarSupportMode` is `PARTITION_DEFINED`, which requires `inputPartitions` to be called/evaluated. That could be referenced multiple times during planning. In `MicrobatchScanExec`, we define `inputPartitions` as lazy val, so that there is no multiple evaluation of inputPartitions, which calls `MicroBatchStream.planInputPartitions`. But we missed that there is no guarantee that the instance will be initialized only once (although the actual execution will happen once) - for example, executedPlan clones the plan (internally we call constructor to make a deep copy of the node), explain (internally called to build a SQL execution start event [...] I see `MicroBatchStream.planInputPartitions` gets called 4 times per microbatch, which can be concerning if the overhead of planInputPartitions is non-trivial, specifically Kafka. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42823 from HeartSaVioR/SPARK-45080. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim --- .../scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala | 3 +++ .../main/scala/org/apache/spark/sql/execution/streaming/memory.scala | 3 +++ .../sql/execution/streaming/sources/RatePerMicroBatchProvider.scala | 3 +++ .../spark/sql/execution/streaming/sources/RateStreamProvider.scala| 3 +++ .../sql/execution/streaming/sources/TextSocketSourceProvider.scala| 3 +++ .../scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala| 4 ++-- 6 files changed, 17 insertions(+), 2 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index de78992533b..d9e3a1256ea 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -525,6 +525,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister override def supportedCustomMetrics(): Array[CustomMetric] = { Array(new OffsetOutOfRangeMetric, new DataLossMetric) } + +override def columnarSupportMode(): Scan.ColumnarSupportMode = + Scan.ColumnarSupportMode.UNSUPPORTED } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 34076f26fe8..732eaa8d783 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -139,6 +139,9 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) extends ScanBuilder w override def toContinuousStream(checkpointLocation: String): ContinuousStream = { stream.asInstanceOf[ContinuousStream] } + + override def columnarSupportMode(): Scan.ColumnarSupportMode = +Scan.ColumnarSupportMode.UNSUPPORTED } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala index 41878a6a549..17cc1860fbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala @@ -111,6 +111,9 @@ class RatePerMicroBatchTable( override def toContinuousStream(checkpointLocation: String): ContinuousStream = { throw new
[spark] branch master updated: [SPARK-45036][FOLLOWUP][SQL] SPJ: Make sure result partitions are sorted according to partition values
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new af1615d026e [SPARK-45036][FOLLOWUP][SQL] SPJ: Make sure result partitions are sorted according to partition values af1615d026e is described below commit af1615d026eaf4aeec27ccfe3c58011ebbcb3de1 Author: Chao Sun AuthorDate: Thu Sep 7 18:19:54 2023 +0800 [SPARK-45036][FOLLOWUP][SQL] SPJ: Make sure result partitions are sorted according to partition values ### What changes were proposed in this pull request? This PR makes sure the result grouped partitions from `DataSourceV2ScanExec#groupPartitions` are sorted according to the partition values. Previously in the #42757 we were assuming Scala would preserve the input ordering but apparently that's not the case. ### Why are the changes needed? See https://github.com/apache/spark/pull/42757#discussion_r1316926504 for diagnosis. The partition ordering is a fundamental property for SPJ and thus must be guaranteed. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? We have tests in `KeyGroupedPartitioningSuite` to cover this. ### Was this patch authored or co-authored using generative AI tooling? Closes #42839 from sunchao/SPARK-45036-followup. Authored-by: Chao Sun Signed-off-by: yangjie01 --- .../execution/datasources/v2/DataSourceV2ScanExecBase.scala | 11 +-- 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 94667fbd00c..b2f94cae2df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -143,17 +143,16 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { // also sort the input partitions according to their partition key order. This ensures // a canonical order from both sides of a bucketed join, for example. val partitionDataTypes = expressions.map(_.dataType) -val partitionOrdering: Ordering[(InternalRow, InputPartition)] = { - RowOrdering.createNaturalAscendingOrdering(partitionDataTypes).on(_._1) -} -val sortedKeyToPartitions = results.sorted(partitionOrdering) -val groupedPartitions = sortedKeyToPartitions +val rowOrdering = RowOrdering.createNaturalAscendingOrdering(partitionDataTypes) +val sortedKeyToPartitions = results.sorted(rowOrdering.on((t: (InternalRow, _)) => t._1)) +val sortedGroupedPartitions = sortedKeyToPartitions .map(t => (InternalRowComparableWrapper(t._1, expressions), t._2)) .groupBy(_._1) .toSeq .map { case (key, s) => KeyGroupedPartition(key.row, s.map(_._2)) } +.sorted(rowOrdering.on((k: KeyGroupedPartition) => k.value)) -Some(KeyGroupedPartitionInfo(groupedPartitions, sortedKeyToPartitions.map(_._2))) +Some(KeyGroupedPartitionInfo(sortedGroupedPartitions, sortedKeyToPartitions.map(_._2))) } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45086][UI] Display hexadecimal for thread lock hash code
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7818f47ca2f [SPARK-45086][UI] Display hexadecimal for thread lock hash code 7818f47ca2f is described below commit 7818f47ca2fef37fe57888a6fdfb80c778f7339c Author: Kent Yao AuthorDate: Thu Sep 7 15:18:28 2023 +0800 [SPARK-45086][UI] Display hexadecimal for thread lock hash code ### What changes were proposed in this pull request? This PR fixes the stringify method for MonitorInfo/LockInfo to use `toString` which contains an extra step of Integer.toHexString. ### Why are the changes needed? to be consistent with the lock-holder ### Does this PR introduce _any_ user-facing change? yes, UI and the response for the rest API(/applications/[app-id]/executors/[executor-id]/threads) change ### How was this patch tested? verified locally ### Was this patch authored or co-authored using generative AI tooling? no Closes #42826 from yaooqinn/SPARK-45086. Authored-by: Kent Yao Signed-off-by: Kent Yao --- core/src/main/scala/org/apache/spark/util/Utils.scala | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7e4d9b78af2..149071ee1b6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2108,10 +2108,8 @@ private[spark] object Utils private implicit class Lock(lock: LockInfo) { def lockString: String = { lock match { -case monitor: MonitorInfo => - s"Monitor(${lock.getClassName}@${monitor.getIdentityHashCode})" -case _ => - s"Lock(${lock.getClassName}@${lock.getIdentityHashCode})" +case monitor: MonitorInfo => s"Monitor(${monitor.toString})" +case _ => s"Lock(${lock.toString})" } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org