[spark] branch master updated (94db71a -> 7f17127)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 94db71a [SPARK-37010][PYTHON] Remove unnecessary "noqa: F401" comments in pandas-on-Spark add 7f17127 [SPARK-36337][PYTHON][CORE] Switch pyrolite v4.30 to pickle v1.2 to fix decimal NaN issue No new revisions were added by this update. Summary of changes: LICENSE-binary | 2 +- core/pom.xml | 10 ++ .../spark/api/python/WriteInputFormatTestDataGenerator.scala | 2 +- dev/deps/spark-deps-hadoop-2.7-hive-2.3| 2 +- dev/deps/spark-deps-hadoop-3.2-hive-2.3| 2 +- docs/rdd-programming-guide.md | 4 ++-- python/pyspark/context.py | 2 +- python/pyspark/ml/common.py| 2 +- python/pyspark/mllib/classification.py | 2 +- python/pyspark/mllib/common.py | 2 +- python/pyspark/rdd.py | 4 ++-- python/pyspark/sql/tests/test_dataframe.py | 8 .../org/apache/spark/sql/execution/python/EvaluatePython.scala | 2 +- 13 files changed, 23 insertions(+), 21 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e7faa85 -> 94db71a)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e7faa85 [SPARK-36942][PYTHON] Inline type hints for python/pyspark/sql/readwriter.py add 94db71a [SPARK-37010][PYTHON] Remove unnecessary "noqa: F401" comments in pandas-on-Spark No new revisions were added by this update. Summary of changes: python/pyspark/pandas/_typing.py | 10 +- python/pyspark/pandas/accessors.py | 4 ++-- python/pyspark/pandas/base.py | 8 python/pyspark/pandas/categorical.py | 2 +- python/pyspark/pandas/config.py| 2 +- python/pyspark/pandas/datetimes.py | 4 ++-- python/pyspark/pandas/extensions.py| 6 +++--- python/pyspark/pandas/frame.py | 13 ++--- python/pyspark/pandas/generic.py | 12 ++-- python/pyspark/pandas/groupby.py | 7 ++- python/pyspark/pandas/indexes/multi.py | 2 +- python/pyspark/pandas/indexing.py | 6 +++--- python/pyspark/pandas/internal.py | 3 +-- python/pyspark/pandas/ml.py| 2 +- python/pyspark/pandas/mlflow.py| 4 ++-- python/pyspark/pandas/namespace.py | 5 ++--- python/pyspark/pandas/plot/plotly.py | 2 +- python/pyspark/pandas/series.py| 6 +++--- python/pyspark/pandas/spark/accessors.py | 8 python/pyspark/pandas/sql_processor.py | 4 ++-- python/pyspark/pandas/strings.py | 2 +- python/pyspark/pandas/typedef/typehints.py | 3 +-- python/pyspark/pandas/utils.py | 12 +--- python/pyspark/pandas/window.py| 2 +- 24 files changed, 60 insertions(+), 69 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (c4e975e -> e7faa85)
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c4e975e [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler add e7faa85 [SPARK-36942][PYTHON] Inline type hints for python/pyspark/sql/readwriter.py No new revisions were added by this update. Summary of changes: python/pyspark/sql/readwriter.py | 474 ++ python/pyspark/sql/readwriter.pyi | 257 - 2 files changed, 373 insertions(+), 358 deletions(-) delete mode 100644 python/pyspark/sql/readwriter.pyi - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler
This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 1709265 [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler 1709265 is described below commit 1709265af1589ffa9e44d050bfa913aa0fd27dea Author: Josh Rosen AuthorDate: Thu Oct 14 14:34:24 2021 -0700 [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler ### What changes were proposed in this pull request? This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner. With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop. Background The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations. The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.sc [...] However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior [...] Correctness: proving that we make no excess `.partitions` calls This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered. I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution: - Assume that this is the first time we are computing every RDD in the DAG. - Every RDD appears in some stage. - [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD. - [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD. - [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited. - Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed. - Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded). Ordering of `.partitions` calls I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many lots of out-of-order `.partition` calls occurring elsewhere in the codebase. Handling of exceptions in `.partitions` I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job. It's sometimes important to preserve exception wrapping behavior, but I
[spark] branch branch-3.1 updated: [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler
This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new c43f355 [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler c43f355 is described below commit c43f35579bc2f95cbd3334a19ed334d8e706082d Author: Josh Rosen AuthorDate: Thu Oct 14 14:34:24 2021 -0700 [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler ### What changes were proposed in this pull request? This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner. With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop. Background The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations. The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.sc [...] However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior [...] Correctness: proving that we make no excess `.partitions` calls This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered. I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution: - Assume that this is the first time we are computing every RDD in the DAG. - Every RDD appears in some stage. - [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD. - [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD. - [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited. - Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed. - Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded). Ordering of `.partitions` calls I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many lots of out-of-order `.partition` calls occurring elsewhere in the codebase. Handling of exceptions in `.partitions` I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job. It's sometimes important to preserve exception wrapping behavior, but I
[spark] branch branch-3.2 updated: [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler
This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 01ee46e [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler 01ee46e is described below commit 01ee46e03a7f5c6f8656690fae96f39a897b9799 Author: Josh Rosen AuthorDate: Thu Oct 14 14:34:24 2021 -0700 [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler ### What changes were proposed in this pull request? This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner. With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop. Background The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations. The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.sc [...] However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior [...] Correctness: proving that we make no excess `.partitions` calls This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered. I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution: - Assume that this is the first time we are computing every RDD in the DAG. - Every RDD appears in some stage. - [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD. - [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD. - [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited. - Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed. - Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded). Ordering of `.partitions` calls I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many lots of out-of-order `.partition` calls occurring elsewhere in the codebase. Handling of exceptions in `.partitions` I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job. It's sometimes important to preserve exception wrapping behavior, but I
[spark] branch master updated: [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler
This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c4e975e [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler c4e975e is described below commit c4e975e175c01f67ece7ae492a79554ad1b44106 Author: Josh Rosen AuthorDate: Thu Oct 14 14:34:24 2021 -0700 [SPARK-23626][CORE] Eagerly compute RDD.partitions on entire DAG when submitting job to DAGScheduler ### What changes were proposed in this pull request? This PR fixes a longstanding issue where the `DAGScheduler'`s single-threaded event processing loop could become blocked by slow `RDD.getPartitions()` calls, preventing other events (like task completions and concurrent job submissions) from being processed in a timely manner. With this patch's change, Spark will now call `.partitions` on every RDD in the DAG before submitting a job to the scheduler, ensuring that the expensive `getPartitions()` calls occur outside of the scheduler event loop. Background The `RDD.partitions` method lazily computes an RDD's partitions by calling `RDD.getPartitions()`. The `getPartitions()` method is invoked only once per RDD and its result is cached in the `RDD.partitions_` private field. Sometimes the `getPartitions()` call can be expensive: for example, `HadoopRDD.getPartitions()` performs file listing operations. The `.partitions` method is invoked at many different places in Spark's code, including many existing call sites that are outside of the scheduler event loop. As a result, it's _often_ the case that an RDD's partitions will have been computed before the RDD is submitted to the DAGScheduler. For example, [`submitJob` calls `rdd.partitions.length`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.sc [...] However, there's still some cases where `partitions` gets evaluated for the first time inside of the `DAGScheduler` internals. For example, [`ShuffledRDD.getPartitions`](https://github.com/apache/spark/blob/3ba57f5edc5594ee676249cd309b8f0d8248462e/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala#L92-L94) doesn't call `.partitions` on the RDD being shuffled, so a plan with a ShuffledRDD at the root won't necessarily result in `.partitions` having been called on all RDDs prior [...] Correctness: proving that we make no excess `.partitions` calls This PR adds code to traverse the DAG prior to job submission and call `.partitions` on every RDD encountered. I'd like to argue that this results in no _excess_ `.partitions` calls: in every case where the new code calls `.partitions` there is existing code which would have called `.partitions` at some point during a successful job execution: - Assume that this is the first time we are computing every RDD in the DAG. - Every RDD appears in some stage. - [`submitStage` will call `submitMissingTasks`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1438) on every stage root RDD. - [`submitStage` calls `getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1687-L1696) on every stage root RDD. - [`getPreferredLocsInternal`](https://github.com/databricks/runtime/blob/1e83dfe4f685bad7f260621e77282b1b4cf9bca4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2995-L3043) visits the RDD and all of its parents RDDs that are computed in the same stage (via narrow dependencies) and calls `.partitions` on each RDD visited. - Therefore `.partitions` is invoked on every RDD in the DAG by the time the job has successfully completed. - Therefore this patch's change does not introduce any new calls to `.partitions` which would not have otherwise occurred (assuming the job succeeded). Ordering of `.partitions` calls I don't think the order in which `.partitions` calls occur matters for correctness: the DAGScheduler happens to invoke `.partitions` in a particular order today (defined by the DAG traversal order in internal scheduler methods), but there's many lots of out-of-order `.partition` calls occurring elsewhere in the codebase. Handling of exceptions in `.partitions` I've chosen **not** to add special error-handling for the new `.partitions` calls: if exceptions occur then they'll bubble up, unwrapped, to the user code submitting the Spark job. It's sometimes important to preserve exception wrapping behavior, but I don't
[spark] branch master updated (bb83f27 -> 2267d7f)
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from bb83f27 [SPARK-36938][PYTHON] Inline type hints for group.py in python/pyspark/sql add 2267d7f [SPARK-37000][PYTHON] Add type hints to python/pyspark/sql/util.py No new revisions were added by this update. Summary of changes: python/mypy.ini | 12 --- python/pyspark/sql/functions.py | 2 +- python/pyspark/sql/utils.py | 80 - 3 files changed, 49 insertions(+), 45 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36938][PYTHON] Inline type hints for group.py in python/pyspark/sql
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bb83f27 [SPARK-36938][PYTHON] Inline type hints for group.py in python/pyspark/sql bb83f27 is described below commit bb83f27ea7f85de9c02ee7ff6fa5b2c24efc9584 Author: dch nguyen AuthorDate: Thu Oct 14 11:15:32 2021 -0700 [SPARK-36938][PYTHON] Inline type hints for group.py in python/pyspark/sql ### What changes were proposed in this pull request? Inline type hints for python/pyspark/sql/group.py from Inline type hints for python/pyspark/sql/group.pyi. ### Why are the changes needed? Currently, there is type hint stub files python/pyspark/sql/group.pyi to show the expected types for functions, but we can also take advantage of static type checking within the functions by inlining the type hints. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existed test. Closes #34197 from dchvn/SPARK-36938. Authored-by: dch nguyen Signed-off-by: Takuya UESHIN --- python/pyspark/sql/group.py | 61 ++-- python/pyspark/sql/group.pyi | 44 2 files changed, 42 insertions(+), 63 deletions(-) diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py index ab0566e..183041f 100644 --- a/python/pyspark/sql/group.py +++ b/python/pyspark/sql/group.py @@ -17,16 +17,24 @@ import sys -from pyspark.sql.column import Column, _to_seq +from typing import Callable, List, Optional, TYPE_CHECKING, overload, Dict, Union, cast, Tuple + +from py4j.java_gateway import JavaObject # type: ignore[import] + +from pyspark.sql.column import Column, _to_seq # type: ignore[attr-defined] +from pyspark.sql.context import SQLContext from pyspark.sql.dataframe import DataFrame from pyspark.sql.pandas.group_ops import PandasGroupedOpsMixin from pyspark.sql.types import StructType, StructField, IntegerType, StringType +if TYPE_CHECKING: +from pyspark.sql._typing import LiteralType + __all__ = ["GroupedData"] -def dfapi(f): -def _api(self): +def dfapi(f: Callable) -> Callable: +def _api(self: "GroupedData") -> DataFrame: name = f.__name__ jdf = getattr(self._jgd, name)() return DataFrame(jdf, self.sql_ctx) @@ -35,10 +43,13 @@ def dfapi(f): return _api -def df_varargs_api(f): -def _api(self, *cols): +def df_varargs_api(f: Callable) -> Callable: +def _api(self: "GroupedData", *cols: str) -> DataFrame: name = f.__name__ -jdf = getattr(self._jgd, name)(_to_seq(self.sql_ctx._sc, cols)) +# TODO: ignore[attr-defined] will be removed, once SparkContext is inlined +jdf = getattr(self._jgd, name)( +_to_seq(self.sql_ctx._sc, cols) # type: ignore[attr-defined] +) return DataFrame(jdf, self.sql_ctx) _api.__name__ = f.__name__ _api.__doc__ = f.__doc__ @@ -53,12 +64,20 @@ class GroupedData(PandasGroupedOpsMixin): .. versionadded:: 1.3 """ -def __init__(self, jgd, df): +def __init__(self, jgd: JavaObject, df: DataFrame): self._jgd = jgd self._df = df -self.sql_ctx = df.sql_ctx +self.sql_ctx: SQLContext = df.sql_ctx + +@overload +def agg(self, *exprs: Column) -> DataFrame: +... + +@overload +def agg(self, __exprs: Dict[str, str]) -> DataFrame: +... -def agg(self, *exprs): +def agg(self, *exprs: Union[Column, Dict[str, str]]) -> DataFrame: """Compute aggregates and returns the result as a :class:`DataFrame`. The available aggregate functions can be: @@ -115,12 +134,16 @@ class GroupedData(PandasGroupedOpsMixin): else: # Columns assert all(isinstance(c, Column) for c in exprs), "all exprs should be Column" -jdf = self._jgd.agg(exprs[0]._jc, -_to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]])) +exprs = cast(Tuple[Column, ...], exprs) +# TODO: ignore[attr-defined] will be removed, once SparkContext is inlined +jdf = self._jgd.agg( +exprs[0]._jc, +_to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]]) # type: ignore[attr-defined] +) return DataFrame(jdf, self.sql_ctx) @dfapi -def count(self): +def count(self) -> DataFrame: """Counts the number of records for each group. .. versionadded:: 1.3.0 @@ -132,7 +155,7 @@ class GroupedData(PandasGroupedOpsMixin): """ @df_varargs_api -def mean(self, *cols): +def mean(self, *cols: str) -> DataFrame: """Computes average values for each numeric columns for each group. :func:`mean`
[spark] branch branch-3.2 updated: [SPARK-36905] Fix reading hive views without explicit column names
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 596b60e [SPARK-36905] Fix reading hive views without explicit column names 596b60e is described below commit 596b60e3e6b5d833b1bbac2f1d51b3c8af25a185 Author: Linhong Liu AuthorDate: Thu Oct 14 22:22:28 2021 +0800 [SPARK-36905] Fix reading hive views without explicit column names ### What changes were proposed in this pull request? When a hive view is created without explicit column names, spark couldn't read it correctly. For example: ``` -- use hive to create the view CREATE VIEW test_view AS SELECT 1 FROM t -- use spark to read the view SELECT * FROM test_view ``` We will get an exception about: `cannot resolve '_c0' given input columns: [1]` The problematic plan is: ``` 'Project [upcast('_c0, IntegerType) AS _c0#3] +- Project [1 AS 1#4] +- SubqueryAlias spark_catalog.default.some_table +- Relation default.some_table[id#1L] orc ``` This PR handles the views created by Hive separately to fix this issue. ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? No, this is a regression. ### How was this patch tested? newly added UT Closes #34254 from linhongliu-db/SPARK-36905. Lead-authored-by: Linhong Liu Co-authored-by: Linhong Liu <67896261+linhongliu...@users.noreply.github.com> Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan (cherry picked from commit 2cc3aead5b17b2ebfb7f88f60291d412643240c7) Signed-off-by: Wenchen Fan --- .../sql/catalyst/catalog/SessionCatalog.scala | 85 +- .../spark/sql/hive/execution/SQLQuerySuite.scala | 10 +++ 2 files changed, 61 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 4860f46..8bba6bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -862,6 +862,13 @@ class SessionCatalog( } } + private def isHiveCreatedView(metadata: CatalogTable): Boolean = { +// For views created by hive without explicit column names, there will be auto-generated +// column names like "_c0", "_c1", "_c2"... +metadata.viewQueryColumnNames.isEmpty && + metadata.schema.fieldNames.exists(_.matches("_c[0-9]+")) + } + private def fromCatalogTable(metadata: CatalogTable, isTempView: Boolean): View = { val viewText = metadata.viewText.getOrElse { throw new IllegalStateException("Invalid view without text.") @@ -870,42 +877,52 @@ class SessionCatalog( val parsedPlan = SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView)) { parser.parsePlan(viewText) } -val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) { - // For view created before Spark 2.2.0, the view text is already fully qualified, the plan - // output is the same with the view output. - metadata.schema.fieldNames.toSeq -} else { - assert(metadata.viewQueryColumnNames.length == metadata.schema.length) - metadata.viewQueryColumnNames -} +val projectList = if (!isHiveCreatedView(metadata)) { + val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) { +// For view created before Spark 2.2.0, the view text is already fully qualified, the plan +// output is the same with the view output. +metadata.schema.fieldNames.toSeq + } else { +assert(metadata.viewQueryColumnNames.length == metadata.schema.length) +metadata.viewQueryColumnNames + } -// For view queries like `SELECT * FROM t`, the schema of the referenced table/view may -// change after the view has been created. We need to add an extra SELECT to pick the columns -// according to the recorded column names (to get the correct view column ordering and omit -// the extra columns that we don't require), with UpCast (to make sure the type change is -// safe) and Alias (to respect user-specified view column names) according to the view schema -// in the catalog. -// Note that, the column names may have duplication, e.g. `CREATE VIEW v(x, y) AS -// SELECT 1 col, 2 col`. We need to make sure that the matching attributes have the same -// number of duplications, and pick the corresponding attribute by ordinal. -val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, isTempView) -val normalizeColName: String => String = if
[spark] branch master updated: [SPARK-36905] Fix reading hive views without explicit column names
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2cc3aea [SPARK-36905] Fix reading hive views without explicit column names 2cc3aea is described below commit 2cc3aead5b17b2ebfb7f88f60291d412643240c7 Author: Linhong Liu AuthorDate: Thu Oct 14 22:22:28 2021 +0800 [SPARK-36905] Fix reading hive views without explicit column names ### What changes were proposed in this pull request? When a hive view is created without explicit column names, spark couldn't read it correctly. For example: ``` -- use hive to create the view CREATE VIEW test_view AS SELECT 1 FROM t -- use spark to read the view SELECT * FROM test_view ``` We will get an exception about: `cannot resolve '_c0' given input columns: [1]` The problematic plan is: ``` 'Project [upcast('_c0, IntegerType) AS _c0#3] +- Project [1 AS 1#4] +- SubqueryAlias spark_catalog.default.some_table +- Relation default.some_table[id#1L] orc ``` This PR handles the views created by Hive separately to fix this issue. ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? No, this is a regression. ### How was this patch tested? newly added UT Closes #34254 from linhongliu-db/SPARK-36905. Lead-authored-by: Linhong Liu Co-authored-by: Linhong Liu <67896261+linhongliu...@users.noreply.github.com> Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../sql/catalyst/catalog/SessionCatalog.scala | 85 +- .../spark/sql/hive/execution/SQLQuerySuite.scala | 10 +++ 2 files changed, 61 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 4860f46..8bba6bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -862,6 +862,13 @@ class SessionCatalog( } } + private def isHiveCreatedView(metadata: CatalogTable): Boolean = { +// For views created by hive without explicit column names, there will be auto-generated +// column names like "_c0", "_c1", "_c2"... +metadata.viewQueryColumnNames.isEmpty && + metadata.schema.fieldNames.exists(_.matches("_c[0-9]+")) + } + private def fromCatalogTable(metadata: CatalogTable, isTempView: Boolean): View = { val viewText = metadata.viewText.getOrElse { throw new IllegalStateException("Invalid view without text.") @@ -870,42 +877,52 @@ class SessionCatalog( val parsedPlan = SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView)) { parser.parsePlan(viewText) } -val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) { - // For view created before Spark 2.2.0, the view text is already fully qualified, the plan - // output is the same with the view output. - metadata.schema.fieldNames.toSeq -} else { - assert(metadata.viewQueryColumnNames.length == metadata.schema.length) - metadata.viewQueryColumnNames -} +val projectList = if (!isHiveCreatedView(metadata)) { + val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) { +// For view created before Spark 2.2.0, the view text is already fully qualified, the plan +// output is the same with the view output. +metadata.schema.fieldNames.toSeq + } else { +assert(metadata.viewQueryColumnNames.length == metadata.schema.length) +metadata.viewQueryColumnNames + } -// For view queries like `SELECT * FROM t`, the schema of the referenced table/view may -// change after the view has been created. We need to add an extra SELECT to pick the columns -// according to the recorded column names (to get the correct view column ordering and omit -// the extra columns that we don't require), with UpCast (to make sure the type change is -// safe) and Alias (to respect user-specified view column names) according to the view schema -// in the catalog. -// Note that, the column names may have duplication, e.g. `CREATE VIEW v(x, y) AS -// SELECT 1 col, 2 col`. We need to make sure that the matching attributes have the same -// number of duplications, and pick the corresponding attribute by ordinal. -val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, isTempView) -val normalizeColName: String => String = if (viewConf.caseSensitiveAnalysis) { - identity + // For view queries like `SELECT * FROM t`, the schema of
[spark] branch master updated: [SPARK-37003][DOC] Merge INSERT related doc into two part: 1. table 2. directory
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2ac4f71 [SPARK-37003][DOC] Merge INSERT related doc into two part: 1. table 2. directory 2ac4f71 is described below commit 2ac4f71a088c965f580489fb44fc4a2c8b8d6b1f Author: Angerszh AuthorDate: Thu Oct 14 22:20:02 2021 +0800 [SPARK-37003][DOC] Merge INSERT related doc into two part: 1. table 2. directory ### What changes were proposed in this pull request? Current doc about insert have too much repeated content, this pr is to merge two kind of docs. ### Why are the changes needed? Merge related docs make it clear ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Insert Table ![image](https://user-images.githubusercontent.com/46485123/137255501-e9010a18-d177-4eb4-945c-7eeb8936ba00.png) insert overwrite directory ![image](https://user-images.githubusercontent.com/46485123/137294828-31fbac70-31df-45c2-9928-26cffd2a3186.png) Closes #34282 from AngersZh/SPARK-37003. Authored-by: Angerszh Signed-off-by: Wenchen Fan --- ...f-syntax-dml-insert-overwrite-directory-hive.md | 77 --- ...ql-ref-syntax-dml-insert-overwrite-directory.md | 48 - docs/sql-ref-syntax-dml-insert-overwrite-table.md | 237 - ...-into.md => sql-ref-syntax-dml-insert-table.md} | 194 +++-- docs/sql-ref-syntax.md | 4 +- 5 files changed, 223 insertions(+), 337 deletions(-) diff --git a/docs/sql-ref-syntax-dml-insert-overwrite-directory-hive.md b/docs/sql-ref-syntax-dml-insert-overwrite-directory-hive.md deleted file mode 100644 index 8ed6a3c..000 --- a/docs/sql-ref-syntax-dml-insert-overwrite-directory-hive.md +++ /dev/null @@ -1,77 +0,0 @@ -layout: global -title: INSERT OVERWRITE DIRECTORY with Hive format -displayTitle: INSERT OVERWRITE DIRECTORY with Hive format -license: | - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - -### Description - -The `INSERT OVERWRITE DIRECTORY` with Hive format overwrites the existing data in the directory with the new values using Hive `SerDe`. -Hive support must be enabled to use this command. The inserted rows can be specified by value expressions or result from a query. - -### Syntax - -```sql -INSERT OVERWRITE [ LOCAL ] DIRECTORY directory_path -[ ROW FORMAT row_format ] [ STORED AS file_format ] -{ VALUES ( { value | NULL } [ , ... ] ) [ , ( ... ) ] | query } -``` - -### Parameters - -* **directory_path** - -Specifies the destination directory. The `LOCAL` keyword is used to specify that the directory is on the local file system. - -* **row_format** - -Specifies the row format for this insert. Valid options are `SERDE` clause and `DELIMITED` clause. `SERDE` clause can be used to specify a custom `SerDe` for this insert. Alternatively, `DELIMITED` clause can be used to specify the native `SerDe` and state the delimiter, escape character, null character, and so on. - -* **file_format** - -Specifies the file format for this insert. Valid options are `TEXTFILE`, `SEQUENCEFILE`, `RCFILE`, `ORC`, `PARQUET`, and `AVRO`. You can also specify your own input and output format using `INPUTFORMAT` and `OUTPUTFORMAT`. `ROW FORMAT SERDE` can only be used with `TEXTFILE`, `SEQUENCEFILE`, or `RCFILE`, while `ROW FORMAT DELIMITED` can only be used with `TEXTFILE`. - -* **VALUES ( { value `|` NULL } [ , ... ] ) [ , ( ... ) ]** - -Specifies the values to be inserted. Either an explicitly specified value or a NULL can be inserted. -A comma must be used to separate each value in the clause. More than one set of values can be specified to insert multiple rows. - -* **query** - -A query that produces the rows to be inserted. It can be in one of following formats: -* a `SELECT` statement -* a `TABLE` statement -* a `FROM` statement - -### Examples - -```sql -INSERT OVERWRITE LOCAL DIRECTORY '/tmp/destination' -STORED AS orc -SELECT * FROM
[spark] branch branch-3.2 updated: [SPARK-37001][SQL] Disable two level of map for final hash aggregation by default
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new f886d51 [SPARK-37001][SQL] Disable two level of map for final hash aggregation by default f886d51 is described below commit f886d51b84c4510f988cb83277c07b8b9e39d25c Author: Cheng Su AuthorDate: Thu Oct 14 18:16:22 2021 +0800 [SPARK-37001][SQL] Disable two level of map for final hash aggregation by default ### What changes were proposed in this pull request? This PR is to disable two level of maps for final hash aggregation by default. The feature was introduced in https://github.com/apache/spark/pull/32242 and we found it can lead to query performance regression when the final aggregation gets rows with a lot of distinct keys. The 1st level hash map is full so a lot of rows will waste the 1st hash map lookup and inserted into 2nd hash map. This feature still benefits query with not so many distinct keys though, so introducing a config he [...] ### Why are the changes needed? Fix query regression. ### Does this PR introduce _any_ user-facing change? Yes, the introduced `spark.sql.codegen.aggregate.final.map.twolevel.enabled` config. ### How was this patch tested? Existing unit test in `AggregationQuerySuite.scala`. Also verified generated code for an example query in the file: ``` spark.sql( """ |SELECT key, avg(value) |FROM agg1 |GROUP BY key """.stripMargin) ``` Verified the generated code for final hash aggregation not have two level maps by default: https://gist.github.com/c21/d4ce87ef28a22d1ce839e0cda000ce14 . Verified the generated code for final hash aggregation have two level maps if enabling the config: https://gist.github.com/c21/4b59752c1f3f98303b60ccff66b5db69 . Closes #34270 from c21/agg-fix. Authored-by: Cheng Su Signed-off-by: Wenchen Fan (cherry picked from commit 3354a21eff0316311043a2d0b4f5d0e12774adb1) Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/catalyst/expressions/grouping.scala | 2 +- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 10 ++ .../spark/sql/execution/aggregate/HashAggregateExec.scala | 9 - 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala index 8b95ee0..8ce0e57 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala @@ -217,9 +217,9 @@ case class Grouping(child: Expression) extends Expression with Unevaluable Examples: > SELECT name, _FUNC_(), sum(age), avg(height) FROM VALUES (2, 'Alice', 165), (5, 'Bob', 180) people(age, name, height) GROUP BY cube(name, height); Alice 0 2 165.0 -Bob0 5 180.0 Alice 1 2 165.0 NULL 3 7 172.5 +Bob0 5 180.0 Bob1 5 180.0 NULL 2 2 165.0 NULL 2 5 180.0 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 11f13c4..bd990d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1685,6 +1685,16 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ENABLE_TWOLEVEL_AGG_MAP_PARTIAL_ONLY = +buildConf("spark.sql.codegen.aggregate.map.twolevel.partialOnly") + .internal() + .doc("Enable two-level aggregate hash map for partial aggregate only, " + +"because final aggregate might get more distinct keys compared to partial aggregate. " + +"Overhead of looking up 1st-level map might dominate when having a lot of distinct keys.") + .version("3.2.1") + .booleanConf + .createWithDefault(true) + val ENABLE_VECTORIZED_HASH_MAP = buildConf("spark.sql.codegen.aggregate.map.vectorized.enable") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index da310b6..8545154 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
[spark] branch master updated (c11c44e -> 3354a21)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c11c44e [SPARK-12567][SQL] Add aes_encrypt and aes_decrypt builtin functions add 3354a21 [SPARK-37001][SQL] Disable two level of map for final hash aggregation by default No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/catalyst/expressions/grouping.scala | 2 +- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 10 ++ .../spark/sql/execution/aggregate/HashAggregateExec.scala | 9 - 3 files changed, 19 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (de0161a -> c11c44e)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from de0161a [SPARK-36632][SQL] DivideYMInterval and DivideDTInterval should throw the same exception when divide by zero add c11c44e [SPARK-12567][SQL] Add aes_encrypt and aes_decrypt builtin functions No new revisions were added by this update. Summary of changes: .../catalyst/expressions/ExpressionImplUtils.java | 61 ++ .../sql/catalyst/analysis/FunctionRegistry.scala | 2 + .../spark/sql/catalyst/expressions/misc.scala | 77 + .../spark/sql/errors/QueryExecutionErrors.scala| 6 + .../sql-functions/sql-expression-schema.md | 4 +- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 127 + 6 files changed, 276 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionImplUtils.java - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36632][SQL] DivideYMInterval and DivideDTInterval should throw the same exception when divide by zero
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new d93d056 [SPARK-36632][SQL] DivideYMInterval and DivideDTInterval should throw the same exception when divide by zero d93d056 is described below commit d93d0560db18681c74916a6080f87f4136dde434 Author: gengjiaan AuthorDate: Thu Oct 14 17:05:25 2021 +0800 [SPARK-36632][SQL] DivideYMInterval and DivideDTInterval should throw the same exception when divide by zero ### What changes were proposed in this pull request? When dividing by zero, `DivideYMInterval` and `DivideDTInterval` output ``` java.lang.ArithmeticException / by zero ``` But, in ansi mode, `select 2 / 0` will output ``` org.apache.spark.SparkArithmeticException divide by zero ``` The behavior looks not inconsistent. ### Why are the changes needed? Make consistent behavior. ### Does this PR introduce _any_ user-facing change? 'Yes'. ### How was this patch tested? New tests. Closes #33889 from beliefer/SPARK-36632. Lead-authored-by: gengjiaan Co-authored-by: beliefer Signed-off-by: Wenchen Fan (cherry picked from commit de0161a4e85d3125e438a3431285d2fee22c1c65) Signed-off-by: Wenchen Fan --- .../catalyst/expressions/intervalExpressions.scala | 47 +- .../expressions/IntervalExpressionsSuite.scala | 8 ++-- .../sql-tests/results/ansi/interval.sql.out| 8 ++-- .../resources/sql-tests/results/interval.sql.out | 8 ++-- .../apache/spark/sql/ColumnExpressionSuite.scala | 28 - 5 files changed, 75 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala index c799c69..4f31708 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala @@ -598,6 +598,17 @@ trait IntervalDivide { } } } + + def divideByZeroCheck(dataType: DataType, num: Any): Unit = dataType match { +case _: DecimalType => + if (num.asInstanceOf[Decimal].isZero) throw QueryExecutionErrors.divideByZeroError() +case _ => if (num == 0) throw QueryExecutionErrors.divideByZeroError() + } + + def divideByZeroCheckCodegen(dataType: DataType, value: String): String = dataType match { +case _: DecimalType => s"if ($value.isZero()) throw QueryExecutionErrors.divideByZeroError();" +case _ => s"if ($value == 0) throw QueryExecutionErrors.divideByZeroError();" + } } // Divide an year-month interval by a numeric @@ -629,6 +640,7 @@ case class DivideYMInterval( override def nullSafeEval(interval: Any, num: Any): Any = { checkDivideOverflow(interval.asInstanceOf[Int], Int.MinValue, right, num) +divideByZeroCheck(right.dataType, num) evalFunc(interval.asInstanceOf[Int], num) } @@ -650,17 +662,24 @@ case class DivideYMInterval( // Similarly to non-codegen code. The result of `divide(Int, Long, ...)` must fit to `Int`. // Casting to `Int` is safe here. s""" + |${divideByZeroCheckCodegen(right.dataType, n)} |$checkIntegralDivideOverflow |${ev.value} = ($javaType)$math.divide($m, $n, java.math.RoundingMode.HALF_UP); """.stripMargin) case _: DecimalType => - defineCodeGen(ctx, ev, (m, n) => -s"((new Decimal()).set($m).$$div($n)).toJavaBigDecimal()" + -".setScale(0, java.math.RoundingMode.HALF_UP).intValueExact()") + nullSafeCodeGen(ctx, ev, (m, n) => +s""" + |${divideByZeroCheckCodegen(right.dataType, n)} + |${ev.value} = ((new Decimal()).set($m).$$div($n)).toJavaBigDecimal() + | .setScale(0, java.math.RoundingMode.HALF_UP).intValueExact(); + """.stripMargin) case _: FractionalType => val math = classOf[DoubleMath].getName - defineCodeGen(ctx, ev, (m, n) => -s"$math.roundToInt($m / (double)$n, java.math.RoundingMode.HALF_UP)") + nullSafeCodeGen(ctx, ev, (m, n) => +s""" + |${divideByZeroCheckCodegen(right.dataType, n)} + |${ev.value} = $math.roundToInt($m / (double)$n, java.math.RoundingMode.HALF_UP); + """.stripMargin) } override def toString: String = s"($left / $right)" @@ -696,6 +715,7 @@ case class DivideDTInterval( override def nullSafeEval(interval: Any, num: Any): Any = { checkDivideOverflow(interval.asInstanceOf[Long], Long.MinValue, right, num) +divideByZeroCheck(right.dataType, num)
[spark] branch master updated: [SPARK-36632][SQL] DivideYMInterval and DivideDTInterval should throw the same exception when divide by zero
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new de0161a [SPARK-36632][SQL] DivideYMInterval and DivideDTInterval should throw the same exception when divide by zero de0161a is described below commit de0161a4e85d3125e438a3431285d2fee22c1c65 Author: gengjiaan AuthorDate: Thu Oct 14 17:05:25 2021 +0800 [SPARK-36632][SQL] DivideYMInterval and DivideDTInterval should throw the same exception when divide by zero ### What changes were proposed in this pull request? When dividing by zero, `DivideYMInterval` and `DivideDTInterval` output ``` java.lang.ArithmeticException / by zero ``` But, in ansi mode, `select 2 / 0` will output ``` org.apache.spark.SparkArithmeticException divide by zero ``` The behavior looks not inconsistent. ### Why are the changes needed? Make consistent behavior. ### Does this PR introduce _any_ user-facing change? 'Yes'. ### How was this patch tested? New tests. Closes #33889 from beliefer/SPARK-36632. Lead-authored-by: gengjiaan Co-authored-by: beliefer Signed-off-by: Wenchen Fan --- .../catalyst/expressions/intervalExpressions.scala | 47 +- .../expressions/IntervalExpressionsSuite.scala | 8 ++-- .../sql-tests/results/ansi/interval.sql.out| 8 ++-- .../resources/sql-tests/results/interval.sql.out | 8 ++-- .../apache/spark/sql/ColumnExpressionSuite.scala | 28 - 5 files changed, 75 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala index c799c69..4f31708 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala @@ -598,6 +598,17 @@ trait IntervalDivide { } } } + + def divideByZeroCheck(dataType: DataType, num: Any): Unit = dataType match { +case _: DecimalType => + if (num.asInstanceOf[Decimal].isZero) throw QueryExecutionErrors.divideByZeroError() +case _ => if (num == 0) throw QueryExecutionErrors.divideByZeroError() + } + + def divideByZeroCheckCodegen(dataType: DataType, value: String): String = dataType match { +case _: DecimalType => s"if ($value.isZero()) throw QueryExecutionErrors.divideByZeroError();" +case _ => s"if ($value == 0) throw QueryExecutionErrors.divideByZeroError();" + } } // Divide an year-month interval by a numeric @@ -629,6 +640,7 @@ case class DivideYMInterval( override def nullSafeEval(interval: Any, num: Any): Any = { checkDivideOverflow(interval.asInstanceOf[Int], Int.MinValue, right, num) +divideByZeroCheck(right.dataType, num) evalFunc(interval.asInstanceOf[Int], num) } @@ -650,17 +662,24 @@ case class DivideYMInterval( // Similarly to non-codegen code. The result of `divide(Int, Long, ...)` must fit to `Int`. // Casting to `Int` is safe here. s""" + |${divideByZeroCheckCodegen(right.dataType, n)} |$checkIntegralDivideOverflow |${ev.value} = ($javaType)$math.divide($m, $n, java.math.RoundingMode.HALF_UP); """.stripMargin) case _: DecimalType => - defineCodeGen(ctx, ev, (m, n) => -s"((new Decimal()).set($m).$$div($n)).toJavaBigDecimal()" + -".setScale(0, java.math.RoundingMode.HALF_UP).intValueExact()") + nullSafeCodeGen(ctx, ev, (m, n) => +s""" + |${divideByZeroCheckCodegen(right.dataType, n)} + |${ev.value} = ((new Decimal()).set($m).$$div($n)).toJavaBigDecimal() + | .setScale(0, java.math.RoundingMode.HALF_UP).intValueExact(); + """.stripMargin) case _: FractionalType => val math = classOf[DoubleMath].getName - defineCodeGen(ctx, ev, (m, n) => -s"$math.roundToInt($m / (double)$n, java.math.RoundingMode.HALF_UP)") + nullSafeCodeGen(ctx, ev, (m, n) => +s""" + |${divideByZeroCheckCodegen(right.dataType, n)} + |${ev.value} = $math.roundToInt($m / (double)$n, java.math.RoundingMode.HALF_UP); + """.stripMargin) } override def toString: String = s"($left / $right)" @@ -696,6 +715,7 @@ case class DivideDTInterval( override def nullSafeEval(interval: Any, num: Any): Any = { checkDivideOverflow(interval.asInstanceOf[Long], Long.MinValue, right, num) +divideByZeroCheck(right.dataType, num) evalFunc(interval.asInstanceOf[Long], num) } @@ -711,17 +731,24 @@ case class DivideDTInterval( |""".stripMargin
[spark] branch master updated (d9d75f7 -> 1125003)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d9d75f7 [SPARK-32161][PYTHON] Removing JVM logs from SparkUpgradeException add 1125003 [SPARK-36946][PYTHON] Support time for ps.to_datetime No new revisions were added by this update. Summary of changes: python/pyspark/pandas/namespace.py| 40 +++- python/pyspark/pandas/tests/test_namespace.py | 92 +++ 2 files changed, 117 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4a722dc -> d9d75f7)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4a722dc [SPARK-36059][K8S] Support `spark.kubernetes.driver.scheduler.name` add d9d75f7 [SPARK-32161][PYTHON] Removing JVM logs from SparkUpgradeException No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/test_utils.py | 11 ++- python/pyspark/sql/utils.py| 8 2 files changed, 18 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated (74fddec -> 86bf5d3)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git. from 74fddec [SPARK-36717][CORE] Incorrect order of variable initialization may lead incorrect behavior add 86bf5d3 [SPARK-36993][SQL][3.0] Fix json_tuple throw NPE if fields exist no foldable null value No new revisions were added by this update. Summary of changes: .../sql/catalyst/expressions/jsonExpressions.scala | 7 +-- .../test/resources/sql-tests/inputs/json-functions.sql | 4 .../resources/sql-tests/results/json-functions.sql.out | 18 +- 3 files changed, 26 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36059][K8S] Support `spark.kubernetes.driver.scheduler.name`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4a722dc [SPARK-36059][K8S] Support `spark.kubernetes.driver.scheduler.name` 4a722dc is described below commit 4a722dcdd88e3969532d87f83949bdda55082d60 Author: Yikun Jiang AuthorDate: Wed Oct 13 23:47:14 2021 -0700 [SPARK-36059][K8S] Support `spark.kubernetes.driver.scheduler.name` ### What changes were proposed in this pull request? This patch adds the support driver for selecting scheduler through schedulerName. ### Why are the changes needed? We have added the scheduler specified ability in executor side, https://github.com/apache/spark/pull/26088. And in some scenarios, users want to specify the driver scheduler to make sure driverPod can be scheduled separately. Part of [SPARK-36057](https://issues.apache.org/jira/browse/SPARK-36057) . ### Does this PR introduce _any_ user-facing change? Yes, add `spark.kubernetes.driver.scheduler.name` conf ### How was this patch tested? - UT Closes #34239 from Yikun/SPARK-36059-sch. Authored-by: Yikun Jiang Signed-off-by: Dongjoon Hyun --- docs/running-on-kubernetes.md| 16 .../main/scala/org/apache/spark/deploy/k8s/Config.scala | 7 +++ .../org/apache/spark/deploy/k8s/KubernetesConf.scala | 5 + .../deploy/k8s/features/BasicDriverFeatureStep.scala | 3 +++ .../apache/spark/deploy/k8s/KubernetesConfSuite.scala| 15 +++ 5 files changed, 46 insertions(+) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index b30d61d..d32861b 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1306,6 +1306,22 @@ See the [configuration page](configuration.html) for information on Spark config 3.3.0 + + spark.kubernetes.executor.scheduler.name + (none) + + Specify the scheduler name for each executor pod. + + 3.0.0 + + + spark.kubernetes.driver.scheduler.name + (none) + +Specify the scheduler name for driver pod. + + 3.3.0 + Pod template properties diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 2aa4fbc..2458e2d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -228,6 +228,13 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_DRIVER_SCHEDULER_NAME = +ConfigBuilder("spark.kubernetes.driver.scheduler.name") + .doc("Specify the scheduler name for driver pod") + .version("3.3.0") + .stringConf + .createOptional + val KUBERNETES_EXECUTOR_REQUEST_CORES = ConfigBuilder("spark.kubernetes.executor.request.cores") .doc("Specify the cpu request for each executor pod") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 8f84555..0eef6e1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -41,6 +41,7 @@ private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) { def secretEnvNamesToKeyRefs: Map[String, String] def secretNamesToMountPaths: Map[String, String] def volumes: Seq[KubernetesVolumeSpec] + def schedulerName: String def appName: String = get("spark.app.name", "spark") @@ -130,6 +131,8 @@ private[spark] class KubernetesDriverConf( override def volumes: Seq[KubernetesVolumeSpec] = { KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX) } + + override def schedulerName: String = get(KUBERNETES_DRIVER_SCHEDULER_NAME).getOrElse("") } private[spark] class KubernetesExecutorConf( @@ -186,6 +189,8 @@ private[spark] class KubernetesExecutorConf( KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX) } + override def schedulerName: String = get(KUBERNETES_EXECUTOR_SCHEDULER_NAME).getOrElse("") + private def checkExecutorEnvKey(key: String): Boolean = { // Pattern for matching an executorEnv key, which meets certain naming rules. val executorEnvRegex = "[-._a-zA-Z][-._a-zA-Z0-9]*".r diff --git
[spark] branch master updated (1c61d90 -> 838a9d9)
This is an automated email from the ASF dual-hosted git repository. sarutak pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1c61d90 [SPARK-35973][SQL] Add command `SHOW CATALOGS` add 838a9d9 [SPARK-36922][SQL] The SIGN/SIGNUM functions should support ANSI intervals No new revisions were added by this update. Summary of changes: .../sql/catalyst/expressions/mathExpressions.scala | 7 +++ .../expressions/MathExpressionsSuite.scala | 21 + .../test/resources/sql-tests/inputs/interval.sql | 6 +++ .../sql-tests/results/ansi/interval.sql.out| 50 +- .../resources/sql-tests/results/interval.sql.out | 50 +- 5 files changed, 132 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org