[spark] branch master updated: [SPARK-39314][PS] Respect ps.concat sort parameter to follow pandas behavior
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c61da89eddc [SPARK-39314][PS] Respect ps.concat sort parameter to follow pandas behavior c61da89eddc is described below commit c61da89eddcd62d00b27531a1e7ea03548b73fc8 Author: Yikun Jiang AuthorDate: Thu Jun 2 12:36:43 2022 +0900 [SPARK-39314][PS] Respect ps.concat sort parameter to follow pandas behavior ### What changes were proposed in this pull request? Respect ps.concat sort parameter to follow pandas behavior: - Remove the multi-index special sort process case and add ut. - Still keep `num_series != 1` for now to follow pandas behavior ### Why are the changes needed? Since pandas 1.4+ (https://github.com/pandas-dev/pandas/commit/01b8d2a77e5109adda2504b1cb4b1daeab3c74df), ps.concat method the sort parameter. We need to follow pandas behavior. ### Does this PR introduce _any_ user-facing change? Yes, but follow pandas 1.4 behavior ### How was this patch tested? test_concat_index_axis, test_concat_multiindex_sort, concat doctest passed with 1.3/1.4 Closes #36711 from Yikun/SPARK-39314. Authored-by: Yikun Jiang Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/namespace.py| 13 +++- python/pyspark/pandas/tests/test_namespace.py | 46 +-- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/python/pyspark/pandas/namespace.py b/python/pyspark/pandas/namespace.py index 340e270ace5..0f5a979df79 100644 --- a/python/pyspark/pandas/namespace.py +++ b/python/pyspark/pandas/namespace.py @@ -2608,9 +2608,8 @@ def concat( label for label in column_labels_of_psdfs[0] if label in interested_columns ] -# When multi-index column, although pandas is flaky if `join="inner" and sort=False`, -# always sort to follow the `join="outer"` case behavior. -if (len(merged_columns) > 0 and len(merged_columns[0]) > 1) or sort: +# If sort is True, sort to follow pandas 1.4+ behavior. +if sort: # FIXME: better ordering merged_columns = sorted(merged_columns, key=name_like_string) @@ -2622,11 +2621,9 @@ def concat( assert len(merged_columns) > 0 -# Always sort when multi-index columns or there are more than two Series, -# and if there is only one Series, never sort. -sort = len(merged_columns[0]) > 1 or num_series > 1 or (num_series != 1 and sort) - -if sort: +# If sort is True, always sort when there are more than two Series, +# and if there is only one Series, never sort to follow pandas 1.4+ behavior. +if sort and num_series != 1: # FIXME: better ordering merged_columns = sorted(merged_columns, key=name_like_string) diff --git a/python/pyspark/pandas/tests/test_namespace.py b/python/pyspark/pandas/tests/test_namespace.py index 8c5adb9bae5..4db756c6e66 100644 --- a/python/pyspark/pandas/tests/test_namespace.py +++ b/python/pyspark/pandas/tests/test_namespace.py @@ -15,6 +15,7 @@ # limitations under the License. # +from distutils.version import LooseVersion import itertools import inspect @@ -295,6 +296,28 @@ class NamespaceTest(PandasOnSparkTestCase, SQLTestUtils): AssertionError, lambda: ps.timedelta_range(start="1 day", periods=3, freq="ns") ) +def test_concat_multiindex_sort(self): +# SPARK-39314: Respect ps.concat sort parameter to follow pandas behavior +idx = pd.MultiIndex.from_tuples([("Y", "A"), ("Y", "B"), ("X", "C"), ("X", "D")]) +pdf = pd.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]], columns=idx) +psdf = ps.from_pandas(pdf) + +ignore_indexes = [True, False] +joins = ["inner", "outer"] +sorts = [True] +if LooseVersion(pd.__version__) >= LooseVersion("1.4"): +sorts += [False] +objs = [ +([psdf, psdf.reset_index()], [pdf, pdf.reset_index()]), +([psdf.reset_index(), psdf], [pdf.reset_index(), pdf]), +] +for ignore_index, join, sort in itertools.product(ignore_indexes, joins, sorts): +for i, (psdfs, pdfs) in enumerate(objs): +self.assert_eq( +ps.concat(psdfs, ignore_index=ignore_index, join=join, sort=sort), +pd.concat(pdfs, ignore_index=ignore_index, join=join, sort=sort), +) + def test_concat_index_axis(self): pdf = pd.DataFrame({"A": [0, 2, 4], "B": [1, 3, 5], "C": [6, 7, 8]}) # TODO: pdf.columns.names = ["ABC"] @@ -306,16 +329,29 @@ class NamespaceTest(PandasOnSparkTestCase,
[spark] branch master updated (6d43556089a -> 1d4ab7c7ded)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 6d43556089a [SPARK-39360][K8S] Remove deprecation of `spark.kubernetes.memoryOverheadFactor` and recover doc add 1d4ab7c7ded [SPARK-39326][PYTHON][PS] replace "NaN" with real "None" value in indexes No new revisions were added by this update. Summary of changes: python/pyspark/pandas/frame.py| 48 +-- python/pyspark/pandas/indexes/base.py | 4 +-- python/pyspark/pandas/series.py | 32 +++ 3 files changed, 42 insertions(+), 42 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-39360][K8S] Remove deprecation of `spark.kubernetes.memoryOverheadFactor` and recover doc
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 22686656636 [SPARK-39360][K8S] Remove deprecation of `spark.kubernetes.memoryOverheadFactor` and recover doc 22686656636 is described below commit 2268665663684dd381adf266feb74ac97a53900d Author: Dongjoon Hyun AuthorDate: Wed Jun 1 20:10:01 2022 -0700 [SPARK-39360][K8S] Remove deprecation of `spark.kubernetes.memoryOverheadFactor` and recover doc ### What changes were proposed in this pull request? This PR aims to avoid the deprecation of `spark.kubernetes.memoryOverheadFactor` from Apache Spark 3.3. In addition, also recovers the documentation which is removed mistakenly at the `deprecation`. `Deprecation` is not a removal. ### Why are the changes needed? - Apache Spark 3.3.0 RC complains always about `spark.kubernetes.memoryOverheadFactor` because the configuration has the default value (which is not given by the users). There is no way to remove the warnings which means the directional message is not helpful and makes the users confused in a wrong way. In other words, we still get warnings even we use only new configurations or no configuration. ``` 22/06/01 23:53:49 WARN SparkConf: The configuration key 'spark.kubernetes.memoryOverheadFactor' has been deprecated as of Spark 3.3.0 and may be removed in the future. Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor 22/06/01 23:53:49 WARN SparkConf: The configuration key 'spark.kubernetes.memoryOverheadFactor' has been deprecated as of Spark 3.3.0 and may be removed in the future. Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor 22/06/01 23:53:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 22/06/01 23:53:50 WARN SparkConf: The configuration key 'spark.kubernetes.memoryOverheadFactor' has been deprecated as of Spark 3.3.0 and may be removed in the future. Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor ``` - The minimum constraint is slightly different because `spark.kubernetes.memoryOverheadFactor` allowed 0 since Apache Spark 2.4 while new configurations disallow `0`. - This documentation removal might be too early because the deprecation is not the removal of configuration. This PR recoveres the removed doc and added the following. ``` This will be overridden by the value set by spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor explicitly. ``` ### Does this PR introduce _any_ user-facing change? No. This is a consistent with the existing behavior. ### How was this patch tested? Pass the CIs. Closes #36744 from dongjoon-hyun/SPARK-39360. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit 6d43556089a21b26d1a7590fbe1e25bd1ca7cedd) Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/SparkConf.scala | 4 +--- docs/running-on-kubernetes.md| 10 ++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index cf121749b73..5f37a1abb19 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -636,9 +636,7 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.blacklist.killBlacklistedExecutors", "3.1.0", "Please use spark.excludeOnFailure.killExcludedExecutors"), DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0", -"Please use spark.yarn.executor.launch.excludeOnFailure.enabled"), - DeprecatedConfig("spark.kubernetes.memoryOverheadFactor", "3.3.0", -"Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor") +"Please use spark.yarn.executor.launch.excludeOnFailure.enabled") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index ee77e37beb3..9659a6ebe2f 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1137,6 +1137,16 @@ See the [configuration page](configuration.html) for information on Spark config 3.0.0 + + spark.kubernetes.memoryOverheadFactor + 0.1 + +This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, various systems processes, and tmpfs-based local directories when
[spark] branch master updated: [SPARK-39360][K8S] Remove deprecation of `spark.kubernetes.memoryOverheadFactor` and recover doc
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6d43556089a [SPARK-39360][K8S] Remove deprecation of `spark.kubernetes.memoryOverheadFactor` and recover doc 6d43556089a is described below commit 6d43556089a21b26d1a7590fbe1e25bd1ca7cedd Author: Dongjoon Hyun AuthorDate: Wed Jun 1 20:10:01 2022 -0700 [SPARK-39360][K8S] Remove deprecation of `spark.kubernetes.memoryOverheadFactor` and recover doc ### What changes were proposed in this pull request? This PR aims to avoid the deprecation of `spark.kubernetes.memoryOverheadFactor` from Apache Spark 3.3. In addition, also recovers the documentation which is removed mistakenly at the `deprecation`. `Deprecation` is not a removal. ### Why are the changes needed? - Apache Spark 3.3.0 RC complains always about `spark.kubernetes.memoryOverheadFactor` because the configuration has the default value (which is not given by the users). There is no way to remove the warnings which means the directional message is not helpful and makes the users confused in a wrong way. In other words, we still get warnings even we use only new configurations or no configuration. ``` 22/06/01 23:53:49 WARN SparkConf: The configuration key 'spark.kubernetes.memoryOverheadFactor' has been deprecated as of Spark 3.3.0 and may be removed in the future. Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor 22/06/01 23:53:49 WARN SparkConf: The configuration key 'spark.kubernetes.memoryOverheadFactor' has been deprecated as of Spark 3.3.0 and may be removed in the future. Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor 22/06/01 23:53:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 22/06/01 23:53:50 WARN SparkConf: The configuration key 'spark.kubernetes.memoryOverheadFactor' has been deprecated as of Spark 3.3.0 and may be removed in the future. Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor ``` - The minimum constraint is slightly different because `spark.kubernetes.memoryOverheadFactor` allowed 0 since Apache Spark 2.4 while new configurations disallow `0`. - This documentation removal might be too early because the deprecation is not the removal of configuration. This PR recoveres the removed doc and added the following. ``` This will be overridden by the value set by spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor explicitly. ``` ### Does this PR introduce _any_ user-facing change? No. This is a consistent with the existing behavior. ### How was this patch tested? Pass the CIs. Closes #36744 from dongjoon-hyun/SPARK-39360. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/SparkConf.scala | 4 +--- docs/running-on-kubernetes.md| 10 ++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 00a0f61ab47..f296b1408fa 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -638,9 +638,7 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.blacklist.killBlacklistedExecutors", "3.1.0", "Please use spark.excludeOnFailure.killExcludedExecutors"), DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0", -"Please use spark.yarn.executor.launch.excludeOnFailure.enabled"), - DeprecatedConfig("spark.kubernetes.memoryOverheadFactor", "3.3.0", -"Please use spark.driver.memoryOverheadFactor and spark.executor.memoryOverheadFactor") +"Please use spark.yarn.executor.launch.excludeOnFailure.enabled") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index c8c202360f8..3445e22f434 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -1137,6 +1137,16 @@ See the [configuration page](configuration.html) for information on Spark config 3.0.0 + + spark.kubernetes.memoryOverheadFactor + 0.1 + +This sets the Memory Overhead Factor that will allocate memory to non-JVM memory, which includes off-heap memory allocations, non-JVM tasks, various systems processes, and tmpfs-based local directories when spark.kubernetes.local.dirs.tmpfs is true. For JVM-based jobs this value will default to 0.10 and 0.40 for non-JVM
[spark] branch branch-3.3 updated: [SPARK-39040][SQL][FOLLOWUP] Use a unique table name in conditional-functions.sql
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 37aa0793ae1 [SPARK-39040][SQL][FOLLOWUP] Use a unique table name in conditional-functions.sql 37aa0793ae1 is described below commit 37aa0793ae1b4018eb331c1ccd4de9bd5aef9905 Author: Wenchen Fan AuthorDate: Thu Jun 2 10:31:53 2022 +0900 [SPARK-39040][SQL][FOLLOWUP] Use a unique table name in conditional-functions.sql ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/36376, to use a unique table name in the test. `t` is a quite common table name and may make test environment unstable. ### Why are the changes needed? make tests more stable ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? N/A Closes #36739 from cloud-fan/test. Authored-by: Wenchen Fan Signed-off-by: Hyukjin Kwon (cherry picked from commit 4f672db5719549c522a24cffe7b4d0c1e0cb859b) Signed-off-by: Hyukjin Kwon --- .../sql-tests/inputs/ansi/conditional-functions.sql | 16 .../sql-tests/results/ansi/conditional-functions.sql.out | 16 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/ansi/conditional-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/ansi/conditional-functions.sql index ba8f0ffe7f1..e7835619f58 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/ansi/conditional-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/ansi/conditional-functions.sql @@ -1,21 +1,21 @@ -- Tests for conditional functions -CREATE TABLE t USING PARQUET AS SELECT c1, c2 FROM VALUES(1d, 0),(2d, 1),(null, 1),(CAST('NaN' AS DOUBLE), 0) AS t(c1, c2); +CREATE TABLE conditional_t USING PARQUET AS SELECT c1, c2 FROM VALUES(1d, 0),(2d, 1),(null, 1),(CAST('NaN' AS DOUBLE), 0) AS t(c1, c2); -SELECT nanvl(c2, c1/c2 + c1/c2) FROM t; -SELECT nanvl(c2, 1/0) FROM t; -SELECT nanvl(1-0, 1/0) FROM t; +SELECT nanvl(c2, c1/c2 + c1/c2) FROM conditional_t; +SELECT nanvl(c2, 1/0) FROM conditional_t; +SELECT nanvl(1-0, 1/0) FROM conditional_t; -SELECT if(c2 >= 0, 1-0, 1/0) from t; +SELECT if(c2 >= 0, 1-0, 1/0) from conditional_t; SELECT if(1 == 1, 1, 1/0); SELECT if(1 != 1, 1/0, 1); -SELECT coalesce(c2, 1/0) from t; +SELECT coalesce(c2, 1/0) from conditional_t; SELECT coalesce(1, 1/0); SELECT coalesce(null, 1, 1/0); -SELECT case when c2 >= 0 then 1 else 1/0 end from t; +SELECT case when c2 >= 0 then 1 else 1/0 end from conditional_t; SELECT case when 1 < 2 then 1 else 1/0 end; SELECT case when 1 > 2 then 1/0 else 1 end; -DROP TABLE IF EXISTS t; +DROP TABLE conditional_t; diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/conditional-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/conditional-functions.sql.out index 6a4f694f4d7..e62654c3e23 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/conditional-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/conditional-functions.sql.out @@ -3,7 +3,7 @@ -- !query -CREATE TABLE t USING PARQUET AS SELECT c1, c2 FROM VALUES(1d, 0),(2d, 1),(null, 1),(CAST('NaN' AS DOUBLE), 0) AS t(c1, c2) +CREATE TABLE conditional_t USING PARQUET AS SELECT c1, c2 FROM VALUES(1d, 0),(2d, 1),(null, 1),(CAST('NaN' AS DOUBLE), 0) AS t(c1, c2) -- !query schema struct<> -- !query output @@ -11,7 +11,7 @@ struct<> -- !query -SELECT nanvl(c2, c1/c2 + c1/c2) FROM t +SELECT nanvl(c2, c1/c2 + c1/c2) FROM conditional_t -- !query schema struct -- !query output @@ -22,7 +22,7 @@ struct -- !query -SELECT nanvl(c2, 1/0) FROM t +SELECT nanvl(c2, 1/0) FROM conditional_t -- !query schema struct -- !query output @@ -33,7 +33,7 @@ struct -- !query -SELECT nanvl(1-0, 1/0) FROM t +SELECT nanvl(1-0, 1/0) FROM conditional_t -- !query schema struct -- !query output @@ -44,7 +44,7 @@ struct -- !query -SELECT if(c2 >= 0, 1-0, 1/0) from t +SELECT if(c2 >= 0, 1-0, 1/0) from conditional_t -- !query schema struct<(IF((c2 >= 0), (1 - 0), (1 / 0))):double> -- !query output @@ -71,7 +71,7 @@ struct<(IF((NOT (1 = 1)), (1 / 0), 1)):double> -- !query -SELECT coalesce(c2, 1/0) from t +SELECT coalesce(c2, 1/0) from conditional_t -- !query schema struct -- !query output @@ -98,7 +98,7 @@ struct -- !query -SELECT case when c2 >= 0 then 1 else 1/0 end from t +SELECT case when c2 >= 0 then 1 else 1/0 end from conditional_t -- !query schema struct= 0) THEN 1 ELSE (1 / 0) END:double> -- !query output @@ -125,7 +125,7 @@ struct 2) THEN (1 / 0) ELSE 1 END:double> -- !query -DROP TABLE IF EXISTS t +DROP TABLE conditional_t -- !query schema struct<>
[spark] branch master updated (8894e785eda -> 4f672db5719)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 8894e785eda [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase add 4f672db5719 [SPARK-39040][SQL][FOLLOWUP] Use a unique table name in conditional-functions.sql No new revisions were added by this update. Summary of changes: .../sql-tests/inputs/ansi/conditional-functions.sql | 16 .../sql-tests/results/ansi/conditional-functions.sql.out | 16 2 files changed, 16 insertions(+), 16 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8894e785eda [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase 8894e785eda is described below commit 8894e785edae42a642351ad91e539324c39da8e4 Author: Max Gekk AuthorDate: Wed Jun 1 20:16:17 2022 +0300 [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase ### What changes were proposed in this pull request? In the PR, I propose to catch asserts/illegal state exception on each phase of query execution: ANALYSIS, OPTIMIZATION, PLANNING, and convert them to a SparkException w/ the `INTERNAL_ERROR` error class. ### Why are the changes needed? To improve user experience with Spark SQL and unify representation of user-facing errors. ### Does this PR introduce _any_ user-facing change? No. The changes might affect users in corner cases only. ### How was this patch tested? By running the affected test suites: ``` $ build/sbt "test:testOnly *KafkaMicroBatchV1SourceSuite" $ build/sbt "test:testOnly *KafkaMicroBatchV2SourceSuite" ``` Closes #36704 from MaxGekk/wrapby-INTERNAL_ERROR-every-phase. Authored-by: Max Gekk Signed-off-by: Max Gekk --- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 11 +--- .../main/scala/org/apache/spark/sql/Dataset.scala | 14 +++--- .../spark/sql/execution/QueryExecution.scala | 31 +- .../sql/execution/streaming/StreamExecution.scala | 4 ++- .../streaming/MicroBatchExecutionSuite.scala | 6 +++-- .../sql/streaming/continuous/ContinuousSuite.scala | 7 ++--- 6 files changed, 51 insertions(+), 22 deletions(-) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 2396f31b954..0a32b1b54d0 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ +import org.apache.spark.{SparkException, SparkThrowable} import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.read.streaming.SparkDataStream @@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[IllegalStateException](e => { + ExpectFailure[SparkException](e => { +assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") // The offset of `topic2` should be changed from 2 to 1 -assert(e.getMessage.contains("was changed from 2 to 1")) +assert(e.getCause.getMessage.contains("was changed from 2 to 1")) }) ) } @@ -764,12 +766,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testStream(df)( StartStream(checkpointLocation = metadataPath.getAbsolutePath), -ExpectFailure[IllegalStateException](e => { +ExpectFailure[SparkException](e => { + assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") Seq( s"maximum supported log version is v1, but encountered v9", "produced by a newer version of Spark and cannot be read by this version" ).foreach { message => -assert(e.toString.contains(message)) +assert(e.getCause.toString.contains(message)) } })) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f00ebf51d6d..0a45cf92c6e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -27,7 +27,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils -import org.apache.spark.{SparkException, SparkThrowable, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.annotation.{DeveloperApi, Stable, Unstable} import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.function._ @@ -3920,19 +3920,11 @@ class Dataset[T] private[sql]( * the internal error exception. */ private def withAction[U](name: String, qe:
[spark] branch branch-3.3 updated: [SPARK-39313][SQL] `toCatalystOrdering` should fail if V2Expression can not be translated
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 000270a4ead [SPARK-39313][SQL] `toCatalystOrdering` should fail if V2Expression can not be translated 000270a4ead is described below commit 000270a4ead61bb9d7333d05c55b02a2ec477a04 Author: Cheng Pan AuthorDate: Wed Jun 1 09:49:45 2022 -0700 [SPARK-39313][SQL] `toCatalystOrdering` should fail if V2Expression can not be translated After reading code changes in #35657, I guess the original intention of changing the return type of `V2ExpressionUtils.toCatalyst` from `Expression` to `Option[Expression]` is, for reading, spark can ignore unrecognized distribution and ordering, but for writing, it should always be strict. Specifically, `V2ExpressionUtils.toCatalystOrdering` should fail if V2Expression can not be translated instead of returning empty Seq. `V2ExpressionUtils.toCatalystOrdering` is used by `DistributionAndOrderingUtils`, the current behavior will break the semantics of `RequiresDistributionAndOrdering#requiredOrdering` in some cases(see UT). No. New UT. Closes #36697 from pan3793/SPARK-39313. Authored-by: Cheng Pan Signed-off-by: Chao Sun --- .../catalyst/expressions/V2ExpressionUtils.scala | 23 +++--- .../expressions/V2ExpressionUtilsSuite.scala | 40 ++ .../sql/connector/catalog/InMemoryTable.scala | 11 ++- .../v2/DistributionAndOrderingUtils.scala | 5 +- .../datasources/v2/V2ScanPartitioning.scala| 4 +- .../connector/KeyGroupedPartitioningSuite.scala| 92 +- 6 files changed, 85 insertions(+), 90 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala index 596d5d8b565..c252ea5ccfe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.connector.expressions.{BucketTransform, Expression => V2Expression, FieldReference, IdentityTransform, NamedReference, NamedTransform, NullOrdering => V2NullOrdering, SortDirection => V2SortDirection, SortOrder => V2SortOrder, SortValue, Transform} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types._ -import org.apache.spark.util.collection.Utils.sequenceToOption /** * A utility class that converts public connector expressions into Catalyst expressions. @@ -54,19 +53,25 @@ object V2ExpressionUtils extends SQLConfHelper with Logging { * Converts the array of input V2 [[V2SortOrder]] into their counterparts in catalyst. */ def toCatalystOrdering(ordering: Array[V2SortOrder], query: LogicalPlan): Seq[SortOrder] = { -sequenceToOption(ordering.map(toCatalyst(_, query))).asInstanceOf[Option[Seq[SortOrder]]] - .getOrElse(Seq.empty) +ordering.map(toCatalyst(_, query).asInstanceOf[SortOrder]) } def toCatalyst( + expr: V2Expression, + query: LogicalPlan, + funCatalogOpt: Option[FunctionCatalog] = None): Expression = +toCatalystOpt(expr, query, funCatalogOpt) +.getOrElse(throw new AnalysisException(s"$expr is not currently supported")) + + def toCatalystOpt( expr: V2Expression, query: LogicalPlan, funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] = { expr match { case t: Transform => -toCatalystTransform(t, query, funCatalogOpt) +toCatalystTransformOpt(t, query, funCatalogOpt) case SortValue(child, direction, nullOrdering) => -toCatalyst(child, query, funCatalogOpt).map { catalystChild => +toCatalystOpt(child, query, funCatalogOpt).map { catalystChild => SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty) } case ref: FieldReference => @@ -76,7 +81,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging { } } - def toCatalystTransform( + def toCatalystTransformOpt( trans: Transform, query: LogicalPlan, funCatalogOpt: Option[FunctionCatalog] = None): Option[Expression] = trans match { @@ -89,7 +94,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging { // look up the V2 function. val numBucketsRef = AttributeReference("numBuckets", IntegerType, nullable = false)() funCatalogOpt.flatMap { catalog => -loadV2Function(catalog, "bucket", Seq(numBucketsRef) ++ resolvedRefs).map { bound => +
[spark] branch master updated (5a3ba9b0b30 -> ef0b87a5a95)
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 5a3ba9b0b30 [SPARK-39267][SQL] Clean up dsl unnecessary symbol add ef0b87a5a95 [SPARK-39313][SQL] `toCatalystOrdering` should fail if V2Expression can not be translated No new revisions were added by this update. Summary of changes: .../catalyst/expressions/V2ExpressionUtils.scala | 23 +++--- .../expressions/V2ExpressionUtilsSuite.scala} | 35 .../sql/connector/catalog/InMemoryTable.scala | 11 ++- .../v2/DistributionAndOrderingUtils.scala | 5 +- .../datasources/v2/V2ScanPartitioning.scala| 4 +- .../connector/KeyGroupedPartitioningSuite.scala| 92 +- 6 files changed, 61 insertions(+), 109 deletions(-) copy sql/{hive/src/test/scala/org/apache/spark/sql/hive/test/TestHiveSingleton.scala => catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtilsSuite.scala} (50%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (f4cb5f6f66e -> 5a3ba9b0b30)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from f4cb5f6f66e [SPARK-39350][SQL] DESC NAMESPACE EXTENDED should redact properties add 5a3ba9b0b30 [SPARK-39267][SQL] Clean up dsl unnecessary symbol No new revisions were added by this update. Summary of changes: .../apache/spark/sql/catalyst/dsl/package.scala| 7 +- .../analysis/PullOutNondeterministicSuite.scala| 2 +- .../sql/catalyst/analysis/ResolveHintsSuite.scala | 4 +- .../analysis/ResolvedUuidExpressionsSuite.scala| 6 +- .../optimizer/AggregateOptimizeSuite.scala | 32 +-- .../catalyst/optimizer/CollapseProjectSuite.scala | 66 ++--- .../catalyst/optimizer/CollapseWindowSuite.scala | 30 +- .../catalyst/optimizer/ColumnPruningSuite.scala| 20 +- .../catalyst/optimizer/ConstantFoldingSuite.scala | 130 - .../optimizer/DecimalAggregatesSuite.scala | 8 +- .../optimizer/EliminateAggregateFilterSuite.scala | 20 +- .../optimizer/EliminateDistinctSuite.scala | 8 +- .../catalyst/optimizer/EliminateSortsSuite.scala | 8 +- .../optimizer/FilterPushdownOnePassSuite.scala | 52 ++-- .../catalyst/optimizer/FilterPushdownSuite.scala | 302 ++--- .../optimizer/FoldablePropagationSuite.scala | 88 +++--- .../InferFiltersFromConstraintsSuite.scala | 104 +++ .../catalyst/optimizer/JoinOptimizationSuite.scala | 12 +- .../optimizer/LeftSemiAntiJoinPushDownSuite.scala | 22 +- .../catalyst/optimizer/LimitPushdownSuite.scala| 8 +- .../optimizer/NestedColumnAliasingSuite.scala | 4 +- .../optimizer/OptimizeLimitZeroSuite.scala | 4 +- .../OptimizerStructuralIntegrityCheckerSuite.scala | 4 +- .../optimizer/OuterJoinEliminationSuite.scala | 52 ++-- .../optimizer/PropagateEmptyRelationSuite.scala| 18 +- .../sql/catalyst/optimizer/PruneFiltersSuite.scala | 22 +- .../optimizer/RemoveRedundantAggregatesSuite.scala | 22 +- .../RemoveRedundantAliasAndProjectSuite.scala | 24 +- .../ReplaceNullWithFalseInPredicateSuite.scala | 4 +- .../optimizer/RewriteDistinctAggregatesSuite.scala | 4 +- .../sql/catalyst/optimizer/SetOperationSuite.scala | 28 +- .../SimplifyStringCaseConversionSuite.scala| 16 +- .../catalyst/optimizer/TransposeWindowSuite.scala | 40 +-- .../sql/catalyst/optimizer/complexTypesSuite.scala | 12 +- .../joinReorder/StarJoinReorderSuite.scala | 4 +- .../catalyst/parser/ExpressionParserSuite.scala| 4 +- .../plans/ConstraintPropagationSuite.scala | 28 +- .../plans/logical/DistinctKeyVisitorSuite.scala| 4 +- .../BasicStatsEstimationSuite.scala| 2 +- .../catalyst/util/PhysicalAggregationSuite.scala | 6 +- 40 files changed, 617 insertions(+), 614 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (cb0e4198996 -> f4cb5f6f66e)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from cb0e4198996 [SPARK-39338][SQL] Remove dynamic pruning subquery if pruningKey's references is empty add f4cb5f6f66e [SPARK-39350][SQL] DESC NAMESPACE EXTENDED should redact properties No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/sql/execution/command/ddl.scala | 2 +- .../execution/datasources/v2/DescribeNamespaceExec.scala | 2 +- .../sql/execution/command/v2/DescribeNamespaceSuite.scala | 14 ++ 3 files changed, 12 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org