[spark] branch master updated (33830d2694f -> 5f643ee1444)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 33830d2694f [SPARK-44113][INFRA][FOLLOW-UP] Remove Scala 2.13 scheduled jobs add 5f643ee1444 [MINOR][PYTHON][TESTS] Add init file to pyspark.ml.deepspeed.tests No new revisions were added by this update. Summary of changes: .../src/main/python => python/pyspark/ml/deepspeed/tests}/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) copy {examples/src/main/python => python/pyspark/ml/deepspeed/tests}/__init__.py (100%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44113][INFRA][FOLLOW-UP] Remove Scala 2.13 scheduled jobs
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 33830d2694f [SPARK-44113][INFRA][FOLLOW-UP] Remove Scala 2.13 scheduled jobs 33830d2694f is described below commit 33830d2694f658e2d8a69f14c692fe0e85704f45 Author: Hyukjin Kwon AuthorDate: Thu Sep 21 21:58:19 2023 -0700 [SPARK-44113][INFRA][FOLLOW-UP] Remove Scala 2.13 scheduled jobs ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/43008 that removes the leftover scheduled GitHub Actions build for Scala 2.13 scheduled build. ### Why are the changes needed? After dropping Scala 2.12, the default build is exactly same as the scheduled job for Scala 2.13 now. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Will monitor the scheduled builds. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43047 from HyukjinKwon/SPARK-44113-folliwup. Authored-by: Hyukjin Kwon Signed-off-by: Dongjoon Hyun --- .github/workflows/build_scala213.yml | 49 1 file changed, 49 deletions(-) diff --git a/.github/workflows/build_scala213.yml b/.github/workflows/build_scala213.yml deleted file mode 100644 index cae0981ee1e..000 --- a/.github/workflows/build_scala213.yml +++ /dev/null @@ -1,49 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -name: "Build (master, Scala 2.13, Hadoop 3, JDK 8)" - -on: - schedule: -- cron: '0 19 * * *' - -jobs: - run-build: -permissions: - packages: write -name: Run -uses: ./.github/workflows/build_and_test.yml -if: github.repository == 'apache/spark' -with: - java: 8 - branch: master - hadoop: hadoop3 - envs: >- -{ - "SCALA_PROFILE": "scala2.13" -} - jobs: >- -{ - "build": "true", - "pyspark": "true", - "sparkr": "true", - "tpcds-1g": "true", - "docker-integration-tests": "true", - "lint" : "true" -} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (6b747ab8cef -> db02469434a)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 6b747ab8cef [SPARK-45253][SQL][DOCS] Correct the group of `ShiftLeft` and `ArraySize` add db02469434a [SPARK-45251][CONNECT] Add client_type field for FetchErrorDetails No new revisions were added by this update. Summary of changes: .../src/main/protobuf/spark/connect/base.proto | 5 python/pyspark/sql/connect/proto/base_pb2.py | 20 +++--- python/pyspark/sql/connect/proto/base_pb2.pyi | 31 -- 3 files changed, 44 insertions(+), 12 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0bf950ee4f7 -> 6b747ab8cef)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 0bf950ee4f7 [SPARK-43433][PS] Match `GroupBy.nth` behavior to the latest Pandas add 6b747ab8cef [SPARK-45253][SQL][DOCS] Correct the group of `ShiftLeft` and `ArraySize` No new revisions were added by this update. Summary of changes: .../apache/spark/sql/catalyst/expressions/collectionOperations.scala| 2 +- .../org/apache/spark/sql/catalyst/expressions/mathExpressions.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43433][PS] Match `GroupBy.nth` behavior to the latest Pandas
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0bf950ee4f7 [SPARK-43433][PS] Match `GroupBy.nth` behavior to the latest Pandas 0bf950ee4f7 is described below commit 0bf950ee4f77eb1b50d7bd26df330094d44c0804 Author: Haejoon Lee AuthorDate: Fri Sep 22 12:48:03 2023 +0900 [SPARK-43433][PS] Match `GroupBy.nth` behavior to the latest Pandas ### What changes were proposed in this pull request? This PR proposes to match `GroupBy.nth` behavior to the latest Pandas. ### Why are the changes needed? To match the behavior of Pandas 2.0.0 and above. ### Does this PR introduce _any_ user-facing change? **Test DataFrame** ```python >>> psdf = ps.DataFrame( ... { ... "A": [1, 2, 1, 2], ... "B": [3.1, 4.1, 4.1, 3.1], ... "C": ["a", "b", "b", "a"], ... "D": [True, False, False, True], ... } ... ) >>> psdf AB C D 0 1 3.1 a True 1 2 4.1 b False 2 1 4.1 b False 3 2 3.1 a True ``` **Before fixing** ```python >>> psdf.groupby("A").nth(-1) B C D A 1 4.1 b False 2 3.1 a True >>> psdf.groupby("A")[["C"]].nth(-1) C A 1 b 2 a >>> psdf.groupby("A")["B"].nth(-1) A 14.1 23.1 Name: B, dtype: float64 ``` **After fixing** ```python >>> psdf.groupby("A").nth(-1) AB C D 2 1 4.1 b False 3 2 3.1 a True >>> psdf.groupby("A")[["C"]].nth(-1) C 2 b 3 a >>> psdf.groupby("A")["B"].nth(-1) 24.1 33.1 Name: B, dtype: float64 ``` ### How was this patch tested? Enabling the existing tests & updating the doctests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42994 from itholic/SPARK-43552. Authored-by: Haejoon Lee Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/groupby.py | 89 +++- python/pyspark/pandas/tests/groupby/test_stat.py | 4 -- 2 files changed, 58 insertions(+), 35 deletions(-) diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index c7924fa3345..7bd64376152 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -143,7 +143,9 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): pass @abstractmethod -def _handle_output(self, psdf: DataFrame) -> FrameLike: +def _handle_output( +self, psdf: DataFrame, agg_column_names: Optional[List[str]] = None +) -> FrameLike: pass # TODO: Series support is not implemented yet. @@ -1091,24 +1093,22 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): Examples +>>> import numpy as np >>> df = ps.DataFrame({'A': [1, 1, 2, 1, 2], ...'B': [np.nan, 2, 3, 4, 5]}, columns=['A', 'B']) >>> g = df.groupby('A') >>> g.nth(0) - B -A -1 NaN -2 3.0 + AB +0 1 NaN +2 2 3.0 >>> g.nth(1) - B -A -1 2.0 -2 5.0 + AB +1 1 2.0 +4 2 5.0 >>> g.nth(-1) - B -A -1 4.0 -2 5.0 + AB +3 1 4.0 +4 2 5.0 See Also @@ -1120,13 +1120,10 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): if not isinstance(n, int): raise TypeError("Invalid index %s" % type(n).__name__) -groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))] -internal, agg_columns, sdf = self._prepare_reduce( -groupkey_names=groupkey_names, -accepted_spark_types=None, -bool_to_numeric=False, -) -psdf: DataFrame = DataFrame(internal) +groupkey_names: List[str] = [str(groupkey.name) for groupkey in self._groupkeys] +psdf = self._psdf +internal = psdf._internal +sdf = internal.spark_frame if len(psdf._internal.column_labels) > 0: window1 = Window.partitionBy(*groupkey_names).orderBy(NATURAL_ORDER_COLUMN_NAME) @@ -1155,14 +1152,32 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): else: sdf = sdf.select(*groupkey_names).distinct() -internal = internal.copy( +agg_columns = [] +if not self._agg_columns_selected: +for psser in self._groupkeys: +agg_columns.append(psser) +for psser in self._agg_columns: +
[GitHub] [spark-website] panbingkun commented on pull request #474: [SPARK-44820][DOCS] Switch languages consistently across docs for all code snippets
panbingkun commented on PR #474: URL: https://github.com/apache/spark-website/pull/474#issuecomment-1730757033 > Yea we should apply the change in the `spark` repo to the actual released Spark website docs here. @panbingkun which option do you think is better? If we are not suitable for republishing historically published documents, we can only manually update them on Spark website. If possible, I can complete it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45093][CONNECT][PYTHON] Error reporting for addArtifacts query
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 555aece74d2 [SPARK-45093][CONNECT][PYTHON] Error reporting for addArtifacts query 555aece74d2 is described below commit 555aece74d2a22d312e815ec07f5553800e14b9d Author: Alice Sayutina AuthorDate: Fri Sep 22 12:31:23 2023 +0900 [SPARK-45093][CONNECT][PYTHON] Error reporting for addArtifacts query ### What changes were proposed in this pull request? Add error logging into `addArtifact` (see example in "How this is tested"). The logging code is moved into separate file to avoid circular dependency. ### Why are the changes needed? Currently, in case `addArtifact` is executed with the file which doesn't exist, the user gets cryptic error ```grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNKNOWN details = "Exception iterating requests!" debug_error_string = "None" > ``` Which is hard to debug without deep digging into the subject. This happens because addArtifact is implemented as client-side streaming and the actual error happens during grpc consuming iterator generating requests. Unfortunately grpc doesn't print any debug information for user to understand the problem. ### Does this PR introduce _any_ user-facing change? Additional logging which is opt-in same way as before with `SPARK_CONNECT_LOG_LEVEL` environment variable. ### How was this patch tested? ``` >>> s.addArtifact("XYZ", file=True) [New:] 2023-09-15 17:06:40,078 11789 ERROR _create_requests Failed to execute addArtifact: [Errno 2] No such file or directory: '/Users/alice.sayutina/apache_spark/python/XYZ' Traceback (most recent call last): File "", line 1, in File "/Users/alice.sayutina/apache_spark/python/pyspark/sql/connect/session.py", line 743, in addArtifacts self._client.add_artifacts(*path, pyfile=pyfile, archive=archive, file=file) [] File "/Users/alice.sayutina/oss-venv/lib/python3.11/site-packages/grpc/_channel.py", line 910, in _end_unary_response_blocking raise _InactiveRpcError(state) # pytype: disable=not-instantiable ^^ grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNKNOWN details = "Exception iterating requests!" debug_error_string = "None" > ``` Closes #42949 from cdkrot/SPARK-45093. Lead-authored-by: Alice Sayutina Co-authored-by: Alice Sayutina Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/client/__init__.py | 1 + python/pyspark/sql/connect/client/artifact.py | 16 +-- python/pyspark/sql/connect/client/core.py | 38 + python/pyspark/sql/connect/client/logging.py | 60 +++ 4 files changed, 74 insertions(+), 41 deletions(-) diff --git a/python/pyspark/sql/connect/client/__init__.py b/python/pyspark/sql/connect/client/__init__.py index 469d1c519a5..38523352e5b 100644 --- a/python/pyspark/sql/connect/client/__init__.py +++ b/python/pyspark/sql/connect/client/__init__.py @@ -20,3 +20,4 @@ from pyspark.sql.connect.utils import check_dependencies check_dependencies(__name__) from pyspark.sql.connect.client.core import * # noqa: F401,F403 +from pyspark.sql.connect.client.logging import getLogLevel # noqa: F401 diff --git a/python/pyspark/sql/connect/client/artifact.py b/python/pyspark/sql/connect/client/artifact.py index c858768ccbf..fb31a57e0f6 100644 --- a/python/pyspark/sql/connect/client/artifact.py +++ b/python/pyspark/sql/connect/client/artifact.py @@ -15,6 +15,7 @@ # limitations under the License. # from pyspark.sql.connect.utils import check_dependencies +from pyspark.sql.connect.client.logging import logger check_dependencies(__name__) @@ -243,11 +244,18 @@ class ArtifactManager: self, *path: str, pyfile: bool, archive: bool, file: bool ) -> Iterator[proto.AddArtifactsRequest]: """Separated for the testing purpose.""" -return self._add_artifacts( -chain( -*(self._parse_artifacts(p, pyfile=pyfile, archive=archive, file=file) for p in path) +try: +yield from self._add_artifacts( +chain( +*( +self._parse_artifacts(p, pyfile=pyfile, archive=archive, file=file) +for p in path +) +) ) -) +except Exception as e: +logger.error(f"Failed to submit addArtifacts request: {e}") +raise def
[GitHub] [spark-website] allisonwang-db commented on pull request #474: [SPARK-44820][DOCS] Switch languages consistently across docs for all code snippets
allisonwang-db commented on PR #474: URL: https://github.com/apache/spark-website/pull/474#issuecomment-1730748815 Yea we should apply the change in the `spark` repo to the actual released Spark website docs here. @panbingkun which option do you think is better? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45257][CORE] Enable `spark.eventLog.compress` by default
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8d599972872 [SPARK-45257][CORE] Enable `spark.eventLog.compress` by default 8d599972872 is described below commit 8d599972872225e336467700715b1d4771624efe Author: Dongjoon Hyun AuthorDate: Thu Sep 21 20:09:16 2023 -0700 [SPARK-45257][CORE] Enable `spark.eventLog.compress` by default ### What changes were proposed in this pull request? This PR aims to enable `spark.eventLog.compress` by default for Apache Spark 4.0.0. ### Why are the changes needed? - To save the event log storage cost by compressing the logs with ZStandard codec by default ### Does this PR introduce _any_ user-facing change? Although we added a migration guide, the old Spark history servers are able to read the compressed logs. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43036 from dongjoon-hyun/SPARK-45257. Lead-authored-by: Dongjoon Hyun Co-authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/internal/config/package.scala | 2 +- docs/configuration.md | 2 +- docs/core-migration-guide.md | 4 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 05b2624b403..2dcd3af7a52 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -165,7 +165,7 @@ package object config { ConfigBuilder("spark.eventLog.compress") .version("1.0.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) private[spark] val EVENT_LOG_BLOCK_UPDATES = ConfigBuilder("spark.eventLog.logBlockUpdates.enabled") diff --git a/docs/configuration.md b/docs/configuration.md index 8fda9317bc7..e9ed2a8aa37 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1311,7 +1311,7 @@ Apart from these, the following properties are also available, and may be useful spark.eventLog.compress - false + true Whether to compress logged events, if spark.eventLog.enabled is true. diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 3f97a484e1a..765c3494f66 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -22,6 +22,10 @@ license: | * Table of contents {:toc} +## Upgrading from Core 3.4 to 4.0 + +- Since Spark 4.0, Spark will compress event logs. To restore the behavior before Spark 4.0, you can set `spark.eventLog.compress` to `false`. + ## Upgrading from Core 3.3 to 3.4 - Since Spark 3.4, Spark driver will own `PersistentVolumnClaim`s and try to reuse if they are not assigned to live executors. To restore the behavior before Spark 3.4, you can set `spark.kubernetes.driver.ownPersistentVolumeClaim` to `false` and `spark.kubernetes.driver.reusePersistentVolumeClaim` to `false`. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-41086][SQL] Use DataFrame ID to semantically validate CollectMetrics
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 5af0819654a [SPARK-41086][SQL] Use DataFrame ID to semantically validate CollectMetrics 5af0819654a is described below commit 5af0819654aca896d73c16875b07b2143cb1132c Author: Rui Wang AuthorDate: Fri Sep 22 11:07:25 2023 +0800 [SPARK-41086][SQL] Use DataFrame ID to semantically validate CollectMetrics ### What changes were proposed in this pull request? In existing code, plan matching is used to validate if two CollectMetrics have the same name but different semantic. However, plan matching approach is fragile. A better way to tackle this is to just utilize the unique DataFrame Id. This is because observe API is only supported by DataFrame API. SQL does not have such syntax. So two CollectMetric are semantic the same if and only if they have same name and same DataFrame id. ### Why are the changes needed? This is to use a more stable approach to replace a fragile approach. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? NO Closes #43010 from amaliujia/another_approch_for_collect_metrics. Authored-by: Rui Wang Signed-off-by: Wenchen Fan (cherry picked from commit 7c3c7c5a4bd94c9e05b5e680a5242c2485875633) Signed-off-by: Wenchen Fan --- .../sql/connect/planner/SparkConnectPlanner.scala | 6 +-- python/pyspark/sql/connect/plan.py | 1 + .../spark/sql/catalyst/analysis/Analyzer.scala | 4 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 36 ++ .../plans/logical/basicLogicalOperators.scala | 3 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 55 +- .../main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 2 +- 8 files changed, 35 insertions(+), 74 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 641dfc5dcd3..50a55f5e641 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -164,7 +164,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { case proto.Relation.RelTypeCase.CACHED_REMOTE_RELATION => transformCachedRemoteRelation(rel.getCachedRemoteRelation) case proto.Relation.RelTypeCase.COLLECT_METRICS => -transformCollectMetrics(rel.getCollectMetrics) +transformCollectMetrics(rel.getCollectMetrics, rel.getCommon.getPlanId) case proto.Relation.RelTypeCase.PARSE => transformParse(rel.getParse) case proto.Relation.RelTypeCase.RELTYPE_NOT_SET => throw new IndexOutOfBoundsException("Expected Relation to be set, but is empty.") @@ -1054,12 +1054,12 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { numPartitionsOpt) } - private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = { + private def transformCollectMetrics(rel: proto.CollectMetrics, planId: Long): LogicalPlan = { val metrics = rel.getMetricsList.asScala.toSeq.map { expr => Column(transformExpression(expr)) } -CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput)) +CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput), planId) } private def transformDeduplicate(rel: proto.Deduplicate): LogicalPlan = { diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 196b1f119ba..b7ea1f94993 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -1196,6 +1196,7 @@ class CollectMetrics(LogicalPlan): assert self._child is not None plan = proto.Relation() +plan.common.plan_id = self._child._plan_id plan.collect_metrics.input.CopyFrom(self._child.plan(session)) plan.collect_metrics.name = self._name plan.collect_metrics.metrics.extend([self.col_to_expr(x, session) for x in self._exprs]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6c5d19f58ac..8e3c9b30c61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++
[spark] branch master updated: [SPARK-41086][SQL] Use DataFrame ID to semantically validate CollectMetrics
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7c3c7c5a4bd [SPARK-41086][SQL] Use DataFrame ID to semantically validate CollectMetrics 7c3c7c5a4bd is described below commit 7c3c7c5a4bd94c9e05b5e680a5242c2485875633 Author: Rui Wang AuthorDate: Fri Sep 22 11:07:25 2023 +0800 [SPARK-41086][SQL] Use DataFrame ID to semantically validate CollectMetrics ### What changes were proposed in this pull request? In existing code, plan matching is used to validate if two CollectMetrics have the same name but different semantic. However, plan matching approach is fragile. A better way to tackle this is to just utilize the unique DataFrame Id. This is because observe API is only supported by DataFrame API. SQL does not have such syntax. So two CollectMetric are semantic the same if and only if they have same name and same DataFrame id. ### Why are the changes needed? This is to use a more stable approach to replace a fragile approach. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT ### Was this patch authored or co-authored using generative AI tooling? NO Closes #43010 from amaliujia/another_approch_for_collect_metrics. Authored-by: Rui Wang Signed-off-by: Wenchen Fan --- .../sql/connect/planner/SparkConnectPlanner.scala | 6 +-- python/pyspark/sql/connect/plan.py | 1 + .../spark/sql/catalyst/analysis/Analyzer.scala | 4 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 36 ++ .../plans/logical/basicLogicalOperators.scala | 3 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 55 +- .../main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 2 +- 8 files changed, 35 insertions(+), 74 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 924169715f7..dda7a713fa0 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -164,7 +164,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { case proto.Relation.RelTypeCase.CACHED_REMOTE_RELATION => transformCachedRemoteRelation(rel.getCachedRemoteRelation) case proto.Relation.RelTypeCase.COLLECT_METRICS => -transformCollectMetrics(rel.getCollectMetrics) +transformCollectMetrics(rel.getCollectMetrics, rel.getCommon.getPlanId) case proto.Relation.RelTypeCase.PARSE => transformParse(rel.getParse) case proto.Relation.RelTypeCase.RELTYPE_NOT_SET => throw new IndexOutOfBoundsException("Expected Relation to be set, but is empty.") @@ -1048,12 +1048,12 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder) extends Logging { numPartitionsOpt) } - private def transformCollectMetrics(rel: proto.CollectMetrics): LogicalPlan = { + private def transformCollectMetrics(rel: proto.CollectMetrics, planId: Long): LogicalPlan = { val metrics = rel.getMetricsList.asScala.toSeq.map { expr => Column(transformExpression(expr)) } -CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput)) +CollectMetrics(rel.getName, metrics.map(_.named), transformRelation(rel.getInput), planId) } private def transformDeduplicate(rel: proto.Deduplicate): LogicalPlan = { diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index d069081e1af..219545cf646 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -1192,6 +1192,7 @@ class CollectMetrics(LogicalPlan): assert self._child is not None plan = proto.Relation() +plan.common.plan_id = self._child._plan_id plan.collect_metrics.input.CopyFrom(self._child.plan(session)) plan.collect_metrics.name = self._name plan.collect_metrics.metrics.extend([self.col_to_expr(x, session) for x in self._exprs]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index cff29de858e..aac85e19721 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3893,9 +3893,9 @@ object
[spark] branch master updated: [SPARK-45244][TESTS] Correct spelling in VolcanoTestsSuite
This is an automated email from the ASF dual-hosted git repository. ulyssesyou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ebfc5b55fc0 [SPARK-45244][TESTS] Correct spelling in VolcanoTestsSuite ebfc5b55fc0 is described below commit ebfc5b55fc096722547023d76a70796f39947959 Author: zwangsheng AuthorDate: Fri Sep 22 10:12:14 2023 +0800 [SPARK-45244][TESTS] Correct spelling in VolcanoTestsSuite ### What changes were proposed in this pull request? ### Why are the changes needed? Correct typo in VolcanoTestsSuite, which naming methods with `checkAnnotaion`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Exited UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #43026 from zwangsheng/SPARK-45244. Authored-by: zwangsheng Signed-off-by: Xiduo You --- .../spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala index 06d6f7dc100..35da48f61b3 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala @@ -123,7 +123,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku assert(pod.getSpec.getSchedulerName === "volcano") } - protected def checkAnnotaion(pod: Pod): Unit = { + protected def checkAnnotation(pod: Pod): Unit = { val appId = pod.getMetadata.getLabels.get("spark-app-selector") val annotations = pod.getMetadata.getAnnotations assert(annotations.get("scheduling.k8s.io/group-name") === s"$appId-podgroup") @@ -218,7 +218,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku runSparkDriverSubmissionAndVerifyCompletion( driverPodChecker = (driverPod: Pod) => { checkScheduler(driverPod) - checkAnnotaion(driverPod) + checkAnnotation(driverPod) checkPodGroup(driverPod, queue) }, customSparkConf = Option(conf), @@ -228,12 +228,12 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku runSparkPiAndVerifyCompletion( driverPodChecker = (driverPod: Pod) => { checkScheduler(driverPod) - checkAnnotaion(driverPod) + checkAnnotation(driverPod) checkPodGroup(driverPod, queue) }, executorPodChecker = (executorPod: Pod) => { checkScheduler(executorPod) - checkAnnotaion(executorPod) + checkAnnotation(executorPod) }, customSparkConf = Option(conf), customAppLocator = Option(appLoc) @@ -314,13 +314,13 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku driverPodChecker = (driverPod: Pod) => { doBasicDriverPodCheck(driverPod) checkScheduler(driverPod) -checkAnnotaion(driverPod) +checkAnnotation(driverPod) checkPodGroup(driverPod) }, executorPodChecker = (executorPod: Pod) => { doBasicExecutorPodCheck(executorPod) checkScheduler(executorPod) -checkAnnotaion(executorPod) +checkAnnotation(executorPod) } ) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] panbingkun commented on pull request #474: [SPARK-44820][DOCS] Switch languages consistently across docs for all code snippets
panbingkun commented on PR #474: URL: https://github.com/apache/spark-website/pull/474#issuecomment-1730705788 > Hi @panbingkun this is an important bug fix and we should merge it! Shall we re-open this? Actually, this feature has been fixed in `Spark` project, https://github.com/apache/spark/pull/42657 The current fixes are: master, branch-3.5 (https://github.com/apache/spark/pull/42657), and branch-3.4 (https://github.com/apache/spark/pull/42989) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45191][SQL] InMemoryTableScanExec simpleStringWithNodeId adds columnar info
This is an automated email from the ASF dual-hosted git repository. ulyssesyou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 47e8205628a [SPARK-45191][SQL] InMemoryTableScanExec simpleStringWithNodeId adds columnar info 47e8205628a is described below commit 47e8205628a0aed54ad638a53a5881efa2306455 Author: ulysses-you AuthorDate: Fri Sep 22 09:51:32 2023 +0800 [SPARK-45191][SQL] InMemoryTableScanExec simpleStringWithNodeId adds columnar info ### What changes were proposed in this pull request? InMemoryTableScanExec supports both row-based and columnar input and output which is based on the cache serialzier. It would be more friendly for user if we can provide the columnar info to show whether it is columnar in/out. ### Why are the changes needed? Add columnar info for InMemoryTableScanExec explain. ### Does this PR introduce _any_ user-facing change? no, if no columnar input or output. ### How was this patch tested? manually test a columnar supporting example. before: https://github.com/apache/spark/assets/12025282/289cba7e-51af-4b01-b591-bc9c8328801d;> after: https://github.com/apache/spark/assets/12025282/e1514a2c-bf47-47c3-b311-23e49c9db222;> ### Was this patch authored or co-authored using generative AI tooling? no Closes #42967 from ulysses-you/cache. Authored-by: ulysses-you Signed-off-by: Xiduo You --- .../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 8 ++-- .../spark/sql/execution/columnar/InMemoryTableScanExec.scala | 9 + 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 45d006b58e8..27860f23d9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -217,6 +217,11 @@ case class CachedRDDBuilder( val cachedName = tableName.map(n => s"In-memory table $n") .getOrElse(StringUtils.abbreviate(cachedPlan.toString, 1024)) + val supportsColumnarInput: Boolean = { +cachedPlan.supportsColumnar && + serializer.supportsColumnarInput(cachedPlan.output) + } + def cachedColumnBuffers: RDD[CachedBatch] = { if (_cachedColumnBuffers == null) { synchronized { @@ -264,8 +269,7 @@ case class CachedRDDBuilder( } private def buildBuffers(): RDD[CachedBatch] = { -val cb = if (cachedPlan.supportsColumnar && -serializer.supportsColumnarInput(cachedPlan.output)) { +val cb = if (supportsColumnarInput) { serializer.convertColumnarBatchToCachedBatch( cachedPlan.executeColumnar(), cachedPlan.output, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 08244a4f84f..064a4636905 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -46,6 +46,15 @@ case class InMemoryTableScanExec( } } + override def simpleStringWithNodeId(): String = { +val columnarInfo = if (relation.cacheBuilder.supportsColumnarInput || supportsColumnar) { + s" (columnarIn=${relation.cacheBuilder.supportsColumnarInput}, columnarOut=$supportsColumnar)" +} else { + "" +} +super.simpleStringWithNodeId() + columnarInfo + } + override def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren override def doCanonicalize(): SparkPlan = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45163][SQL] Merge UNSUPPORTED_VIEW_OPERATION & UNSUPPORTED_TABLE_OPERATION & fix some issue
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 39d6fdabb48 [SPARK-45163][SQL] Merge UNSUPPORTED_VIEW_OPERATION & UNSUPPORTED_TABLE_OPERATION & fix some issue 39d6fdabb48 is described below commit 39d6fdabb48c700ab5d9fe33437341f5dbf3d1d7 Author: panbingkun AuthorDate: Fri Sep 22 09:24:48 2023 +0800 [SPARK-45163][SQL] Merge UNSUPPORTED_VIEW_OPERATION & UNSUPPORTED_TABLE_OPERATION & fix some issue ### What changes were proposed in this pull request? A.The pr aims to - Merge `UNSUPPORTED_VIEW_OPERATION.WITH_SUGGESTION` into `EXPECT_TABLE_NOT_VIEW.USE_ALTER_VIEW` (new added) - Merge `UNSUPPORTED_VIEW_OPERATION.WITHOUT_SUGGESTION ` into `EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE` - Merge `UNSUPPORTED_TABLE_OPERATION.WITH_SUGGESTION` into `EXPECT_VIEW_NOT_TABLE.USE_ALTER_TABLE` - Merge `UNSUPPORTED_TABLE_OPERATION.WITHOUT_SUGGESTION` into `EXPECT_VIEW_NOT_TABLE.NO_ALTERNATIVE` - Merge `UNSUPPORTED_TABLE_OPERATION.WITH_SUGGESTION` into `EXPECT_VIEW_NOT_TABLE.USE_ALTER_VIEW` - Add `EXPECT_PERMANENT_VIEW_NOT_TEMP` - Fix some naming issues based on the suggestions of [PR](https://github.com/apache/spark/pull/42824) reviewers. B.The pr is also follow up https://github.com/apache/spark/pull/42824. ### Why are the changes needed? - Better code readability. - Fix some error message prompt. - Fix a type: "ALTER COLUMN ... FIRST | ALTER" -> "ALTER COLUMN ... FIRST | AFTER" ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42917 from panbingkun/SPARK-45085_FOLLOWUP. Authored-by: panbingkun Signed-off-by: Wenchen Fan --- R/pkg/tests/fulltests/test_sparkSQL.R | 2 +- .../src/main/resources/error/error-classes.json| 73 -- ...onditions-expect-table-not-view-error-class.md} | 10 +-- ...onditions-expect-view-not-table-error-class.md} | 8 +-- docs/sql-error-conditions.md | 38 ++- .../spark/sql/catalyst/analysis/Analyzer.scala | 10 +-- .../sql/catalyst/analysis/v2ResolutionPlans.scala | 2 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 27 .../spark/sql/errors/QueryCompilationErrors.scala | 53 +++- .../catalyst/analysis/ResolveSessionCatalog.scala | 2 +- .../apache/spark/sql/execution/command/views.scala | 2 +- .../analyzer-results/change-column.sql.out | 8 +-- .../sql-tests/results/change-column.sql.out| 8 +-- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 2 +- .../apache/spark/sql/execution/SQLViewSuite.scala | 45 +++-- .../spark/sql/execution/SQLViewTestSuite.scala | 2 +- .../AlterTableAddPartitionParserSuite.scala| 6 +- .../AlterTableDropPartitionParserSuite.scala | 12 ++-- .../AlterTableRecoverPartitionsParserSuite.scala | 12 ++-- .../AlterTableRenamePartitionParserSuite.scala | 6 +- .../command/AlterTableSetLocationParserSuite.scala | 4 +- .../spark/sql/execution/command/DDLSuite.scala | 6 +- .../execution/command/TruncateTableSuiteBase.scala | 6 +- .../execution/command/v1/ShowPartitionsSuite.scala | 6 +- .../apache/spark/sql/internal/CatalogSuite.scala | 2 +- .../spark/sql/hive/execution/HiveDDLSuite.scala| 24 +++ 26 files changed, 196 insertions(+), 180 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 4d3c2349f16..f2bef7a0044 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -4199,7 +4199,7 @@ test_that("catalog APIs, listTables, getTable, listColumns, listFunctions, funct # recoverPartitions does not work with temporary view expect_error(recoverPartitions("cars"), - "[UNSUPPORTED_VIEW_OPERATION.WITH_SUGGESTION]*`cars`*") + "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]*`cars`*") expect_error(refreshTable("cars"), NA) expect_error(refreshByPath("/"), NA) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 8942d3755e9..9bcbcbc1962 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -877,6 +877,45 @@ "Exceeds char/varchar type length limitation: ." ] }, + "EXPECT_PERMANENT_VIEW_NOT_TEMP" : { +"message" : [ + "'' expects a permanent view but is a temp view." +] + }, + "EXPECT_TABLE_NOT_VIEW" : { +"message" : [ + "'' expects a table but is a view." +], +"subClass" : { +
[spark] branch master updated: [SPARK-45261][CORE][FOLLOWUP] Avoid `transform` of conf value
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f4f3b28aaac [SPARK-45261][CORE][FOLLOWUP] Avoid `transform` of conf value f4f3b28aaac is described below commit f4f3b28aaacc66692717584962f27d18bc6e Author: Dongjoon Hyun AuthorDate: Thu Sep 21 17:58:04 2023 -0700 [SPARK-45261][CORE][FOLLOWUP] Avoid `transform` of conf value ### What changes were proposed in this pull request? This is a follow-up of #43038 to preserve the config value. ### Why are the changes needed? `spark.eventLog.compression.codec` allows fully-quilified class names which are case-sensitive. ### Does this PR introduce _any_ user-facing change? To preserve the existing behavior. ### How was this patch tested? Pass the CIs. Currently, `ReplayListenerSuite` is broken. ``` [info] ReplayListenerSuite: [info] - Simple replay (25 milliseconds) [info] - Replay compressed inprogress log file succeeding on partial read (35 milliseconds) [info] - Replay incompatible event log (19 milliseconds) [info] - End-to-end replay (11 seconds, 58 milliseconds) [info] - End-to-end replay with compression *** FAILED *** (29 milliseconds) [info] org.apache.spark.SparkIllegalArgumentException: [CODEC_SHORT_NAME_NOT_FOUND] Cannot find a short name for the codec org.apache.spark.io.lz4compressioncodec. ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43041 from dongjoon-hyun/SPARK-45261-FOLLOWUP. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/deploy/history/EventLogFileWriters.scala | 2 +- core/src/main/scala/org/apache/spark/internal/config/package.scala | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 39c8e483ebd..640cfc2afaa 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -465,7 +465,7 @@ class SparkContext(config: SparkConf) extends Logging { _eventLogCodec = { val compress = _conf.get(EVENT_LOG_COMPRESS) && - !_conf.get(EVENT_LOG_COMPRESSION_CODEC).equals("none") + !_conf.get(EVENT_LOG_COMPRESSION_CODEC).equalsIgnoreCase("none") if (compress && isEventLogEnabled) { Some(_conf.get(EVENT_LOG_COMPRESSION_CODEC)).map(CompressionCodec.getShortName) } else { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 418d9171842..144dadf29bc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -55,7 +55,7 @@ abstract class EventLogFileWriter( hadoopConf: Configuration) extends Logging { protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) && - !sparkConf.get(EVENT_LOG_COMPRESSION_CODEC).equals("none") + !sparkConf.get(EVENT_LOG_COMPRESSION_CODEC).equalsIgnoreCase("none") protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 3da61f6c81d..05b2624b403 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1907,7 +1907,6 @@ package object config { "the codec.") .version("3.0.0") .stringConf - .transform(_.toLowerCase(Locale.ROOT)) .createWithDefault("zstd") private[spark] val BUFFER_SIZE = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] srowen commented on pull request #479: Add Matomo analytics
srowen commented on PR #479: URL: https://github.com/apache/spark-website/pull/479#issuecomment-1730529025 So, analytics have started flowing; see https://analytics.apache.org/index.php?module=CoreHome=index=yesterday=day=40#?period=day=yesterday=Dashboard_Dashboard=1 for example -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] srowen closed pull request #479: Add Matomo analytics
srowen closed pull request #479: Add Matomo analytics URL: https://github.com/apache/spark-website/pull/479 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] srowen commented on pull request #479: Add Matomo analytics
srowen commented on PR #479: URL: https://github.com/apache/spark-website/pull/479#issuecomment-1730508994 No this is just going to affect the overall website now, not release-specific docs. If all is well I'd look at putting this back in the doc release too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45252][CORE] Escape the greater/less than symbols in the comments to make `sbt doc` execute successfully
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fdedec1de54 [SPARK-45252][CORE] Escape the greater/less than symbols in the comments to make `sbt doc` execute successfully fdedec1de54 is described below commit fdedec1de54fa7619e1d1b59eb6f15a49e727428 Author: yangjie01 AuthorDate: Thu Sep 21 16:06:37 2023 -0700 [SPARK-45252][CORE] Escape the greater/less than symbols in the comments to make `sbt doc` execute successfully ### What changes were proposed in this pull request? This pr escape the greater than and less than symbols in the comments to make `sbt doc` execute successfully. ### Why are the changes needed? The `sbt doc` command should be able to execute successfully. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual check: run ``` build/sbt clean doc -Phadoop-3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pvolcano ``` or ``` dev/change-scala-version.sh 2.13 build/sbt clean doc -Phadoop-3 -Phadoop-cloud -Pmesos -Pyarn -Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive -Pvolcano -Pscala-2.13 ``` Before The `sbt doc` command execution failed like: ``` [info] Main Scala API documentation successful. [error] sbt.inc.Doc$JavadocGenerationFailed [error] at sbt.inc.Doc$.sbt$inc$Doc$$$anonfun$cachedJavadoc$1(Doc.scala:51) [error] at sbt.inc.Doc$$anonfun$cachedJavadoc$2.run(Doc.scala:41) [error] at sbt.inc.Doc$.sbt$inc$Doc$$$anonfun$prepare$1(Doc.scala:62) [error] at sbt.inc.Doc$$anonfun$prepare$5.run(Doc.scala:57) [error] at sbt.inc.Doc$.go$1(Doc.scala:73) [error] at sbt.inc.Doc$.$anonfun$cached$5(Doc.scala:82) [error] at sbt.inc.Doc$.$anonfun$cached$5$adapted(Doc.scala:81) [error] at sbt.util.Tracked$.$anonfun$inputChangedW$1(Tracked.scala:220) [error] at sbt.inc.Doc$.sbt$inc$Doc$$$anonfun$cached$1(Doc.scala:85) [error] at sbt.inc.Doc$$anonfun$cached$7.run(Doc.scala:68) [error] at sbt.Defaults$.$anonfun$docTaskSettings$4(Defaults.scala:2178) [error] at scala.Function1.$anonfun$compose$1(Function1.scala:49) [error] at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:63) [error] at sbt.std.Transform$$anon$4.work(Transform.scala:69) [error] at sbt.Execute.$anonfun$submit$2(Execute.scala:283) [error] at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:24) [error] at sbt.Execute.work(Execute.scala:292) [error] at sbt.Execute.$anonfun$submit$1(Execute.scala:283) [error] at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265) [error] at sbt.CompletionService$$anon$2.call(CompletionService.scala:65) [error] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [error] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [error] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [error] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [error] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [error] at java.lang.Thread.run(Thread.java:750) [error] sbt.inc.Doc$JavadocGenerationFailed [error] at sbt.inc.Doc$.sbt$inc$Doc$$$anonfun$cachedJavadoc$1(Doc.scala:51) [error] at sbt.inc.Doc$$anonfun$cachedJavadoc$2.run(Doc.scala:41) [error] at sbt.inc.Doc$.sbt$inc$Doc$$$anonfun$prepare$1(Doc.scala:62) [error] at sbt.inc.Doc$$anonfun$prepare$5.run(Doc.scala:57) [error] at sbt.inc.Doc$.go$1(Doc.scala:73) [error] at sbt.inc.Doc$.$anonfun$cached$5(Doc.scala:82) [error] at sbt.inc.Doc$.$anonfun$cached$5$adapted(Doc.scala:81) [error] at sbt.util.Tracked$.$anonfun$inputChangedW$1(Tracked.scala:220) [error] at sbt.inc.Doc$.sbt$inc$Doc$$$anonfun$cached$1(Doc.scala:85) [error] at sbt.inc.Doc$$anonfun$cached$7.run(Doc.scala:68) [error] at sbt.Defaults$.$anonfun$docTaskSettings$4(Defaults.scala:2178) [error] at scala.Function1.$anonfun$compose$1(Function1.scala:49) [error] at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:63) [error] at sbt.std.Transform$$anon$4.work(Transform.scala:69) [error] at
[spark] branch master updated: [SPARK-45263][CORE][TESTS] Make `EventLoggingListenerSuite` independent from `spark.eventLog.compress` conf
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 39507b7f537 [SPARK-45263][CORE][TESTS] Make `EventLoggingListenerSuite` independent from `spark.eventLog.compress` conf 39507b7f537 is described below commit 39507b7f537dc06af0ebf49afbd53c1e36c11776 Author: Dongjoon Hyun AuthorDate: Thu Sep 21 15:31:35 2023 -0700 [SPARK-45263][CORE][TESTS] Make `EventLoggingListenerSuite` independent from `spark.eventLog.compress` conf ### What changes were proposed in this pull request? This is a test-only PR to make `EventLoggingListenerSuite` independent from `spark.eventLog.compress` conf's default value. ### Why are the changes needed? Currently, `EventLoggingListenerSuite` test code has an assumption that the default value of `spark.eventLog.compress` is `false`. We had better make the assumption explicit. https://github.com/apache/spark/blob/892fdc532696e703b353c4758320d69162fffe8c/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala#L178 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. Since we only clarify the assumption, the test suite should pass like the following. ``` [info] EventLoggingListenerSuite: [info] - Basic event logging with compression (837 milliseconds) [info] - End-to-end event logging (2 seconds, 99 milliseconds) [info] - End-to-end event logging with compression (6 seconds, 966 milliseconds) [info] - Event logging with password redaction (8 milliseconds) [info] - Spark-33504 sensitive attributes redaction in properties (15 milliseconds) [info] - Executor metrics update (32 milliseconds) [info] - SPARK-31764: isBarrier should be logged in event log (262 milliseconds) [info] Run completed in 11 seconds, 242 milliseconds. [info] Total number of tests run: 7 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 7, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 18 s, completed Sep 21, 2023, 2:34:50 PM ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43040 from dongjoon-hyun/SPARK-45263. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 31db0328f81..edc54e60654 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.{EventLogFileReader, SingleEventLogFileWriter} import org.apache.spark.deploy.history.EventLogTestHelper._ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} -import org.apache.spark.internal.config.{EVENT_LOG_DIR, EVENT_LOG_ENABLED} +import org.apache.spark.internal.config.{EVENT_LOG_COMPRESS, EVENT_LOG_DIR, EVENT_LOG_ENABLED} import org.apache.spark.io._ import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem} import org.apache.spark.resource.ResourceProfile @@ -163,6 +163,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit test("SPARK-31764: isBarrier should be logged in event log") { val conf = new SparkConf() conf.set(EVENT_LOG_ENABLED, true) +conf.set(EVENT_LOG_COMPRESS, false) conf.set(EVENT_LOG_DIR, testDirPath.toString) val sc = new SparkContext("local", "test-SPARK-31764", conf) val appId = sc.applicationId - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] [spark-website] allisonwang-db commented on pull request #474: [SPARK-44820][DOCS] Switch languages consistently across docs for all code snippets
allisonwang-db commented on PR #474: URL: https://github.com/apache/spark-website/pull/474#issuecomment-1730342331 Hi @panbingkun this is an important bug fix and we should merge it! Shall we re-open this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45261][CORE] Fix `EventLogFileWriters` to handle `none` as a codec
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2f3cb36cb99 [SPARK-45261][CORE] Fix `EventLogFileWriters` to handle `none` as a codec 2f3cb36cb99 is described below commit 2f3cb36cb99a4b8d1ddec74696c4ed036c5df5b2 Author: Dongjoon Hyun AuthorDate: Thu Sep 21 14:44:11 2023 -0700 [SPARK-45261][CORE] Fix `EventLogFileWriters` to handle `none` as a codec ### What changes were proposed in this pull request? This PR aims to support `none` as a codec instead of throwing exception. Currrently, our unit test is supposed to test it, but actually it's not tested at all. https://github.com/apache/spark/blob/892fdc532696e703b353c4758320d69162fffe8c/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala#L120-L124 ``` $ build/sbt "core/testOnly *EventLogFileReaderSuite*" ... [info] - get information, list event log files, zip log files - with codec None (33 milliseconds) [info] - get information, list event log files, zip log files - with codec Some(lz4) (125 milliseconds) ... ``` ### Why are the changes needed? ``` $ bin/spark-shell \ -c spark.eventLog.enabled=true \ -c spark.eventLog.compress=true \ -c spark.eventLog.compression.codec=none ... 23/09/21 13:26:45 ERROR SparkContext: Error initializing SparkContext. org.apache.spark.SparkIllegalArgumentException: [CODEC_SHORT_NAME_NOT_FOUND] Cannot find a short name for the codec none. ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the revised CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43038 from dongjoon-hyun/SPARK-45261. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/SparkContext.scala| 3 ++- .../org/apache/spark/deploy/history/EventLogFileWriters.scala | 3 ++- core/src/main/scala/org/apache/spark/internal/config/package.scala | 1 + .../scala/org/apache/spark/deploy/history/EventLogTestHelper.scala | 7 --- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b90601a5bbb..39c8e483ebd 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -464,7 +464,8 @@ class SparkContext(config: SparkConf) extends Logging { } _eventLogCodec = { - val compress = _conf.get(EVENT_LOG_COMPRESS) + val compress = _conf.get(EVENT_LOG_COMPRESS) && + !_conf.get(EVENT_LOG_COMPRESSION_CODEC).equals("none") if (compress && isEventLogEnabled) { Some(_conf.get(EVENT_LOG_COMPRESSION_CODEC)).map(CompressionCodec.getShortName) } else { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala index 7d44cbd9f64..418d9171842 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala @@ -54,7 +54,8 @@ abstract class EventLogFileWriter( sparkConf: SparkConf, hadoopConf: Configuration) extends Logging { - protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) + protected val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS) && + !sparkConf.get(EVENT_LOG_COMPRESSION_CODEC).equals("none") protected val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE) protected val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt protected val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 05b2624b403..3da61f6c81d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1907,6 +1907,7 @@ package object config { "the codec.") .version("3.0.0") .stringConf + .transform(_.toLowerCase(Locale.ROOT)) .createWithDefault("zstd") private[spark] val BUFFER_SIZE = diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala index a68086256d1..ea8da010859 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala +++
[spark] branch master updated: [SPARK-45108][SQL] Improve the InjectRuntimeFilter for check probably shuffle
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 892fdc53269 [SPARK-45108][SQL] Improve the InjectRuntimeFilter for check probably shuffle 892fdc53269 is described below commit 892fdc532696e703b353c4758320d69162fffe8c Author: Jiaan Geng AuthorDate: Thu Sep 21 19:52:40 2023 +0800 [SPARK-45108][SQL] Improve the InjectRuntimeFilter for check probably shuffle ### What changes were proposed in this pull request? `InjectRuntimeFilter` needs to check probably shuffle. But the current code may lead to duplicate call of `isProbablyShuffleJoin` if we need the right side of `Join` node as the application side. ### Why are the changes needed? To avoid the duplicate call of `isProbablyShuffleJoin`. ### Does this PR introduce _any_ user-facing change? 'No'. Just update the inner implementation. ### How was this patch tested? Exists test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #42861 from beliefer/SPARK-45108. Authored-by: Jiaan Geng Signed-off-by: Wenchen Fan --- .../sql/catalyst/optimizer/InjectRuntimeFilter.scala | 20 +++- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 44c55860375..13554908379 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -229,19 +229,15 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J * - The filterApplicationSideJoinExp can be pushed down through joins, aggregates and windows * (ie the expression references originate from a single leaf node) * - The filter creation side has a selective predicate - * - The current join is a shuffle join or a broadcast join that has a shuffle below it * - The max filterApplicationSide scan size is greater than a configurable threshold */ private def extractBeneficialFilterCreatePlan( filterApplicationSide: LogicalPlan, filterCreationSide: LogicalPlan, filterApplicationSideExp: Expression, - filterCreationSideExp: Expression, - hint: JoinHint): Option[LogicalPlan] = { + filterCreationSideExp: Expression): Option[LogicalPlan] = { if (findExpressionAndTrackLineageDown( filterApplicationSideExp, filterApplicationSide).isDefined && - (isProbablyShuffleJoin(filterApplicationSide, filterCreationSide, hint) || -probablyHasShuffle(filterApplicationSide)) && satisfyByteSizeRequirement(filterApplicationSide)) { extractSelectiveFilterOverScan(filterCreationSide, filterCreationSideExp) } else { @@ -326,15 +322,21 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J isSimpleExpression(l) && isSimpleExpression(r)) { val oldLeft = newLeft val oldRight = newRight -if (canPruneLeft(joinType)) { - extractBeneficialFilterCreatePlan(left, right, l, r, hint).foreach { +// Check if the current join is a shuffle join or a broadcast join that +// has a shuffle below it +val hasShuffle = isProbablyShuffleJoin(left, right, hint) +if (canPruneLeft(joinType) && (hasShuffle || probablyHasShuffle(left))) { + extractBeneficialFilterCreatePlan(left, right, l, r).foreach { filterCreationSidePlan => newLeft = injectFilter(l, newLeft, r, filterCreationSidePlan) } } // Did we actually inject on the left? If not, try on the right -if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType)) { - extractBeneficialFilterCreatePlan(right, left, r, l, hint).foreach { +// Check if the current join is a shuffle join or a broadcast join that +// has a shuffle below it +if (newLeft.fastEquals(oldLeft) && canPruneRight(joinType) && + (hasShuffle || probablyHasShuffle(right))) { + extractBeneficialFilterCreatePlan(right, left, r, l).foreach { filterCreationSidePlan => newRight = injectFilter(r, newRight, l, filterCreationSidePlan) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43254][SQL] Assign a name to the error _LEGACY_ERROR_TEMP_2018
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8b967e191b7 [SPARK-43254][SQL] Assign a name to the error _LEGACY_ERROR_TEMP_2018 8b967e191b7 is described below commit 8b967e191b755d7f2830c15d382c83ce7aeb69c1 Author: dengziming AuthorDate: Thu Sep 21 10:22:37 2023 +0300 [SPARK-43254][SQL] Assign a name to the error _LEGACY_ERROR_TEMP_2018 ### What changes were proposed in this pull request? Assign the name `CLASS_UNSUPPORTED_BY_MAP_OBJECTS` to the legacy error class `_LEGACY_ERROR_TEMP_2018`. ### Why are the changes needed? To assign proper name as a part of activity in SPARK-37935 ### Does this PR introduce _any_ user-facing change? Yes, the error message will include the error class name ### How was this patch tested? Add a unit test to produce the error from user code. ### Was this patch authored or co-authored using generative AI tooling? No Closes #42939 from dengziming/SPARK-43254. Authored-by: dengziming Signed-off-by: Max Gekk --- .../src/main/resources/error/error-classes.json| 10 +++--- docs/sql-error-conditions.md | 6 .../sql/catalyst/encoders/ExpressionEncoder.scala | 2 +- .../spark/sql/errors/QueryExecutionErrors.scala| 2 +- .../expressions/ObjectExpressionsSuite.scala | 11 +++--- .../scala/org/apache/spark/sql/DatasetSuite.scala | 40 -- 6 files changed, 57 insertions(+), 14 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index d92ccfce5c5..8942d3755e9 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -344,6 +344,11 @@ ], "sqlState" : "22003" }, + "CLASS_UNSUPPORTED_BY_MAP_OBJECTS" : { +"message" : [ + "`MapObjects` does not support the class as resulting collection." +] + }, "CODEC_NOT_AVAILABLE" : { "message" : [ "The codec is not available. Consider to set the config to ." @@ -4944,11 +4949,6 @@ "not resolved." ] }, - "_LEGACY_ERROR_TEMP_2018" : { -"message" : [ - "class `` is not supported by `MapObjects` as resulting collection." -] - }, "_LEGACY_ERROR_TEMP_2020" : { "message" : [ "Couldn't find a valid constructor on ." diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 1df00f72bc9..f6f94efc2b0 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -297,6 +297,12 @@ The value `` of the type `` cannot be cast to `` Fail to assign a value of `` type to the `` type column or variable `` due to an overflow. Use `try_cast` on the input value to tolerate overflow and return NULL instead. +### CLASS_UNSUPPORTED_BY_MAP_OBJECTS + +SQLSTATE: none assigned + +`MapObjects` does not support the class `` as resulting collection. + ### CODEC_NOT_AVAILABLE SQLSTATE: none assigned diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index ff72b5a0d96..74d7a5e7a67 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -170,7 +170,7 @@ object ExpressionEncoder { * Function that deserializes an [[InternalRow]] into an object of type `T`. This class is not * thread-safe. */ - class Deserializer[T](private val expressions: Seq[Expression]) + class Deserializer[T](val expressions: Seq[Expression]) extends (InternalRow => T) with Serializable { @transient private[this] var constructProjection: Projection = _ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index e14fef1fad7..84472490128 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -422,7 +422,7 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE def classUnsupportedByMapObjectsError(cls: Class[_]): SparkRuntimeException = { new SparkRuntimeException( - errorClass = "_LEGACY_ERROR_TEMP_2018", + errorClass = "CLASS_UNSUPPORTED_BY_MAP_OBJECTS", messageParameters = Map("cls" -> cls.getName)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
[spark] branch master updated: [SPARK-45219][PYTHON][DOCS] Refine docstring of withColumn(s)Renamed
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new cf04bd93b75 [SPARK-45219][PYTHON][DOCS] Refine docstring of withColumn(s)Renamed cf04bd93b75 is described below commit cf04bd93b7522975c44c54f9519bb77f7833c566 Author: allisonwang-db AuthorDate: Thu Sep 21 15:50:36 2023 +0900 [SPARK-45219][PYTHON][DOCS] Refine docstring of withColumn(s)Renamed ### What changes were proposed in this pull request? This PR refines the docstring of `DataFrame.withColumnRenamed` and `DataFrame.withColumnsRenamed`. ### Why are the changes needed? To improve PySpark documentations. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? doctest ### Was this patch authored or co-authored using generative AI tooling? No Closes #43019 from allisonwang-db/spark-45219-refine-withcolumnrenamed. Lead-authored-by: allisonwang-db Co-authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/dataframe.py | 92 + 1 file changed, 74 insertions(+), 18 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 6b1a6df1618..bcdae5e40b9 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -5786,7 +5786,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): return DataFrame(self._jdf.withColumn(colName, col._jc), self.sparkSession) def withColumnRenamed(self, existing: str, new: str) -> "DataFrame": -"""Returns a new :class:`DataFrame` by renaming an existing column. +""" +Returns a new :class:`DataFrame` by renaming an existing column. This is a no-op if the schema doesn't contain the given column name. .. versionadded:: 1.3.0 @@ -5797,25 +5798,52 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): Parameters -- existing : str -string, name of the existing column to rename. +The name of the existing column to be renamed. new : str -string, new name of the column. +The new name to be assigned to the column. Returns --- :class:`DataFrame` -DataFrame with renamed column. +A new DataFrame with renamed column. + +See Also + +:meth:`withColumnsRenamed` Examples >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) ->>> df.withColumnRenamed('age', 'age2').show() + +Example 1: Rename a single column + +>>> df.withColumnRenamed("age", "age2").show() ++-+ |age2| name| ++-+ | 2|Alice| | 5| Bob| ++-+ + +Example 2: Rename a column that does not exist (no-op) + +>>> df.withColumnRenamed("non_existing", "new_name").show() ++---+-+ +|age| name| ++---+-+ +| 2|Alice| +| 5| Bob| ++---+-+ + +Example 3: Rename multiple columns + +>>> df.withColumnRenamed("age", "age2").withColumnRenamed("name", "name2").show() +++-+ +|age2|name2| +++-+ +| 2|Alice| +| 5| Bob| +++-+ """ return DataFrame(self._jdf.withColumnRenamed(existing, new), self.sparkSession) @@ -5830,7 +5858,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): Parameters -- colsMap : dict -a dict of existing column names and corresponding desired column names. +A dict of existing column names and corresponding desired column names. Currently, only a single map is supported. Returns @@ -5842,21 +5870,49 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): :meth:`withColumnRenamed` -Notes -- -Support Spark Connect - Examples >>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")], schema=["age", "name"]) ->>> df = df.withColumns({'age2': df.age + 2, 'age3': df.age + 3}) ->>> df.withColumnsRenamed({'age2': 'age4', 'age3': 'age5'}).show() -+---+-+++ -|age| name|age4|age5| -+---+-+++ -| 2|Alice| 4| 5| -| 5| Bob| 7| 8| -+---+-+++ + +Example 1: Rename a single column + +>>> df.withColumnsRenamed({"age": "age2"}).show() +++-+ +|age2| name| +++-+ +
[spark] branch master updated: [SPARK-45235][CONNECT][PYTHON] Support map and array parameters by `sql()`
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a2bab5efc5b [SPARK-45235][CONNECT][PYTHON] Support map and array parameters by `sql()` a2bab5efc5b is described below commit a2bab5efc5b5f0e841e9b34ccbfd2cb99af5923e Author: Max Gekk AuthorDate: Thu Sep 21 09:05:30 2023 +0300 [SPARK-45235][CONNECT][PYTHON] Support map and array parameters by `sql()` ### What changes were proposed in this pull request? In the PR, I propose to change the Python connect client to support `Column` as a parameter of `sql()`. ### Why are the changes needed? To achieve feature parity w/ regular PySpark which supports map and arrays as parameters of `sql()`, see https://github.com/apache/spark/pull/42996. ### Does this PR introduce _any_ user-facing change? No. It fixes a bug. ### How was this patch tested? By running the modified tests: ``` $ python/run-tests --parallelism=1 --testnames 'pyspark.sql.tests.connect.test_connect_basic SparkConnectBasicTests.test_sql_with_named_args' $ python/run-tests --parallelism=1 --testnames 'pyspark.sql.tests.connect.test_connect_basic SparkConnectBasicTests.test_sql_with_pos_args' ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43014 from MaxGekk/map-sql-parameterized-python-connect-2. Authored-by: Max Gekk Signed-off-by: Max Gekk --- python/pyspark/sql/connect/plan.py | 22 ++ python/pyspark/sql/connect/session.py | 2 +- .../sql/tests/connect/test_connect_basic.py| 12 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/python/pyspark/sql/connect/plan.py b/python/pyspark/sql/connect/plan.py index 3e8db2aae09..d069081e1af 100644 --- a/python/pyspark/sql/connect/plan.py +++ b/python/pyspark/sql/connect/plan.py @@ -1049,6 +1049,12 @@ class SQL(LogicalPlan): self._query = query self._args = args +def _to_expr(self, session: "SparkConnectClient", v: Any) -> proto.Expression: +if isinstance(v, Column): +return v.to_plan(session) +else: +return LiteralExpression._from_value(v).to_plan(session) + def plan(self, session: "SparkConnectClient") -> proto.Relation: plan = self._create_proto_relation() plan.sql.query = self._query @@ -1056,14 +1062,10 @@ class SQL(LogicalPlan): if self._args is not None and len(self._args) > 0: if isinstance(self._args, Dict): for k, v in self._args.items(): -plan.sql.args[k].CopyFrom( - LiteralExpression._from_value(v).to_plan(session).literal -) + plan.sql.named_arguments[k].CopyFrom(self._to_expr(session, v)) else: for v in self._args: -plan.sql.pos_args.append( - LiteralExpression._from_value(v).to_plan(session).literal -) +plan.sql.pos_arguments.append(self._to_expr(session, v)) return plan @@ -1073,14 +1075,10 @@ class SQL(LogicalPlan): if self._args is not None and len(self._args) > 0: if isinstance(self._args, Dict): for k, v in self._args.items(): -cmd.sql_command.args[k].CopyFrom( - LiteralExpression._from_value(v).to_plan(session).literal -) + cmd.sql_command.named_arguments[k].CopyFrom(self._to_expr(session, v)) else: for v in self._args: -cmd.sql_command.pos_args.append( - LiteralExpression._from_value(v).to_plan(session).literal -) + cmd.sql_command.pos_arguments.append(self._to_expr(session, v)) return cmd diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 7582fe86ff2..e5d1d95a699 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -557,7 +557,7 @@ class SparkSession: if "sql_command_result" in properties: return DataFrame.withPlan(CachedRelation(properties["sql_command_result"]), self) else: -return DataFrame.withPlan(SQL(sqlQuery, args), self) +return DataFrame.withPlan(cmd, self) sql.__doc__ = PySparkSession.sql.__doc__ diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index 2b979570618..c5a127136d6 100644 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++