[spark] branch branch-3.3 updated: [SPARK-38889][SQL] Compile boolean column filters to use the bit type for MSSQL data source
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 30c6802574e [SPARK-38889][SQL] Compile boolean column filters to use the bit type for MSSQL data source 30c6802574e is described below commit 30c6802574e5993e6f0f10d4c189c6e8325bcc5c Author: allisonwang-db AuthorDate: Thu Apr 14 13:11:00 2022 +0900 [SPARK-38889][SQL] Compile boolean column filters to use the bit type for MSSQL data source ### What changes were proposed in this pull request? This PR compiles the boolean data type to the bit data type for pushed column filters while querying the MSSQL data soruce. Microsoft SQL Server does not support the boolean type, so the JDBC dialect should use the bit data type instead. ### Why are the changes needed? To fix a bug that was exposed by the boolean column filter pushdown to SQL server data source. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a new integration test. Closes #36182 from allisonwang-db/spark-38889-mssql-predicate-pushdown. Authored-by: allisonwang-db Signed-off-by: Hyukjin Kwon (cherry picked from commit 320f88d54440e05228a90ef5663991e28ae07c95) Signed-off-by: Hyukjin Kwon --- .../spark/sql/jdbc/MsSqlServerIntegrationSuite.scala| 17 + .../org/apache/spark/sql/jdbc/MsSqlServerDialect.scala | 10 ++ 2 files changed, 27 insertions(+) diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index 5992253a958..e293f9a8f7b 100644 --- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -22,6 +22,7 @@ import java.sql.{Connection, Date, Timestamp} import java.util.Properties import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ +import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.tags.DockerTest @@ -140,6 +141,14 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { |'MULTIPOLYGON(((2 2, 2 -2, -2 -2, -2 2, 2 2)),((1 1, 3 1, 3 3, 1 3, 1 1)))', |'GEOMETRYCOLLECTION(LINESTRING(1 1, 3 5),POLYGON((-1 -1, -1 -5, -5 -5, -5 -1, -1 -1)))') """.stripMargin).executeUpdate() +conn.prepareStatement( + """ +|CREATE TABLE bits(a INT, b INT, c BIT) +|""".stripMargin).executeUpdate() +conn.prepareStatement( + """ +|INSERT INTO bits VALUES (1, 2, 1) + """.stripMargin).executeUpdate() } test("Basic test") { @@ -357,4 +366,12 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { 0, 3, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0, 0, 3)) } + + test("SPARK-38889: MsSqlServerDialect should handle boolean filter push down") { +val df = spark.read.jdbc(jdbcUrl, "bits", new Properties) +val rows = df.collect() +assert(rows.length == 1) +val filtered = df.where(col("c") === 0).collect() +assert(filtered.length == 0) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 8d2fbec55f9..a42129dbe8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -40,6 +40,16 @@ private object MsSqlServerDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver") + // Microsoft SQL Server does not have the boolean type. + // Compile the boolean value to the bit data type instead. + // scalastyle:off line.size.limit + // See https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver15 + // scalastyle:on line.size.limit + override def compileValue(value: Any): Any = value match { +case booleanValue: Boolean => if (booleanValue) 1 else 0 +case other => super.compileValue(other) + } + // scalastyle:off line.size.limit // See https://docs.microsoft.com/en-us/sql/t-sql/functions/aggregate-functions-transact-sql?view=sql-server-ver15 // scalastyle:on line.size.limit - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail:
[spark] branch master updated: [SPARK-38889][SQL] Compile boolean column filters to use the bit type for MSSQL data source
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 320f88d5444 [SPARK-38889][SQL] Compile boolean column filters to use the bit type for MSSQL data source 320f88d5444 is described below commit 320f88d54440e05228a90ef5663991e28ae07c95 Author: allisonwang-db AuthorDate: Thu Apr 14 13:11:00 2022 +0900 [SPARK-38889][SQL] Compile boolean column filters to use the bit type for MSSQL data source ### What changes were proposed in this pull request? This PR compiles the boolean data type to the bit data type for pushed column filters while querying the MSSQL data soruce. Microsoft SQL Server does not support the boolean type, so the JDBC dialect should use the bit data type instead. ### Why are the changes needed? To fix a bug that was exposed by the boolean column filter pushdown to SQL server data source. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a new integration test. Closes #36182 from allisonwang-db/spark-38889-mssql-predicate-pushdown. Authored-by: allisonwang-db Signed-off-by: Hyukjin Kwon --- .../spark/sql/jdbc/MsSqlServerIntegrationSuite.scala| 17 + .../org/apache/spark/sql/jdbc/MsSqlServerDialect.scala | 10 ++ 2 files changed, 27 insertions(+) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala index 5992253a958..e293f9a8f7b 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala @@ -22,6 +22,7 @@ import java.sql.{Connection, Date, Timestamp} import java.util.Properties import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._ +import org.apache.spark.sql.functions.col import org.apache.spark.sql.internal.SQLConf import org.apache.spark.tags.DockerTest @@ -140,6 +141,14 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { |'MULTIPOLYGON(((2 2, 2 -2, -2 -2, -2 2, 2 2)),((1 1, 3 1, 3 3, 1 3, 1 1)))', |'GEOMETRYCOLLECTION(LINESTRING(1 1, 3 5),POLYGON((-1 -1, -1 -5, -5 -5, -5 -1, -1 -1)))') """.stripMargin).executeUpdate() +conn.prepareStatement( + """ +|CREATE TABLE bits(a INT, b INT, c BIT) +|""".stripMargin).executeUpdate() +conn.prepareStatement( + """ +|INSERT INTO bits VALUES (1, 2, 1) + """.stripMargin).executeUpdate() } test("Basic test") { @@ -357,4 +366,12 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite { 0, 3, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0, 0, 3)) } + + test("SPARK-38889: MsSqlServerDialect should handle boolean filter push down") { +val df = spark.read.jdbc(jdbcUrl, "bits", new Properties) +val rows = df.collect() +assert(rows.length == 1) +val filtered = df.where(col("c") === 0).collect() +assert(filtered.length == 0) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 8d2fbec55f9..a42129dbe8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -40,6 +40,16 @@ private object MsSqlServerDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver") + // Microsoft SQL Server does not have the boolean type. + // Compile the boolean value to the bit data type instead. + // scalastyle:off line.size.limit + // See https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver15 + // scalastyle:on line.size.limit + override def compileValue(value: Any): Any = value match { +case booleanValue: Boolean => if (booleanValue) 1 else 0 +case other => super.compileValue(other) + } + // scalastyle:off line.size.limit // See https://docs.microsoft.com/en-us/sql/t-sql/functions/aggregate-functions-transact-sql?view=sql-server-ver15 // scalastyle:on line.size.limit - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-38857][PYTHON] series name should be preserved in series.mode()
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5d763eb63b6 [SPARK-38857][PYTHON] series name should be preserved in series.mode() 5d763eb63b6 is described below commit 5d763eb63b67d4fee5972559ddfe0ff3e0e8e210 Author: Yikun Jiang AuthorDate: Thu Apr 14 10:27:19 2022 +0900 [SPARK-38857][PYTHON] series name should be preserved in series.mode() ### What changes were proposed in this pull request? series name is preserved in `series.mode`. ### Why are the changes needed? series name should be preserved in series.mode() to follow pandas 1.4.x behavior. ### Does this PR introduce _any_ user-facing change? Yes, if series set name, it will be preserved in series.mode() ### How was this patch tested? UT test both in before and after 1.4.x Closes #36159 from Yikun/SPARK-38857. Authored-by: Yikun Jiang Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/series.py| 8 ++-- python/pyspark/pandas/tests/test_series.py | 7 ++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index da1d41c2abe..f4638fe22de 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -4523,6 +4523,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]): Always returns Series even if only one value is returned. +.. versionchanged:: 3.4.0 + Series name is preserved to follow pandas 1.4+ behavior. + Parameters -- dropna : bool, default True @@ -4597,8 +4600,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]): F.col(SPARK_DEFAULT_INDEX_NAME).alias(SPARK_DEFAULT_SERIES_NAME) ) internal = InternalFrame(spark_frame=sdf, index_spark_columns=None, column_labels=[None]) - -return first_series(DataFrame(internal)) +ser_mode = first_series(DataFrame(internal)) +ser_mode.name = self.name +return ser_mode def keys(self) -> "ps.Index": """ diff --git a/python/pyspark/pandas/tests/test_series.py b/python/pyspark/pandas/tests/test_series.py index 76d35c51196..68fed26324d 100644 --- a/python/pyspark/pandas/tests/test_series.py +++ b/python/pyspark/pandas/tests/test_series.py @@ -2121,7 +2121,12 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils): pser.name = "x" psser = ps.from_pandas(pser) -self.assert_eq(psser.mode(), pser.mode()) +if LooseVersion(pd.__version__) < LooseVersion("1.4"): +# Due to pandas bug: https://github.com/pandas-dev/pandas/issues/46737 +psser.name = None +self.assert_eq(psser.mode(), pser.mode()) +else: +self.assert_eq(psser.mode(), pser.mode()) self.assert_eq( psser.mode(dropna=False).sort_values().reset_index(drop=True), pser.mode(dropna=False).sort_values().reset_index(drop=True), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-38797][SQL] Runtime Filter supports pruning side has window
This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 78700d939c4 [SPARK-38797][SQL] Runtime Filter supports pruning side has window 78700d939c4 is described below commit 78700d939c42404ce6bd420094e13a258875949b Author: Yuming Wang AuthorDate: Thu Apr 14 08:39:15 2022 +0800 [SPARK-38797][SQL] Runtime Filter supports pruning side has window ### What changes were proposed in this pull request? 1. Makes row-level runtime filtering support pruning side has window. For example: ```sql SELECT * FROM (SELECT *, Row_number() OVER ( partition BY c1 ORDER BY f1) rn FROM bf1) bf1 JOIN bf2 ON bf1.c1 = bf2.c2 WHERE bf2.a2 = 62 ``` After this PR: ``` == Optimized Logical Plan == Join Inner, (c1#45922 = c2#45928), Statistics(sizeInBytes=12.3 MiB) :- Window [row_number() windowspecdefinition(c1#45922, f1#45925 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#45976], [c1#45922], [f1#45925 ASC NULLS FIRST], Statistics(sizeInBytes=3.7 KiB) : +- Filter (isnotnull(c1#45922) AND might_contain(scalar-subquery#45993 [], xxhash64(c1#45922, 42))), Statistics(sizeInBytes=3.3 KiB) : : +- Aggregate [bloom_filter_agg(xxhash64(c2#45928, 42), 100, 8388608, 0, 0) AS bloomFilter#45992], Statistics(sizeInBytes=108.0 B, rowCount=1) : : +- Project [c2#45928], Statistics(sizeInBytes=1278.0 B) : :+- Filter ((isnotnull(a2#45926) AND (a2#45926 = 62)) AND isnotnull(c2#45928)), Statistics(sizeInBytes=3.3 KiB) : : +- Relation default.bf2[a2#45926,b2#45927,c2#45928,d2#45929,e2#45930,f2#45931] parquet, Statistics(sizeInBytes=3.3 KiB) : +- Relation default.bf1[a1#45920,b1#45921,c1#45922,d1#45923,e1#45924,f1#45925] parquet, Statistics(sizeInBytes=3.3 KiB) +- Filter ((isnotnull(a2#45926) AND (a2#45926 = 62)) AND isnotnull(c2#45928)), Statistics(sizeInBytes=3.3 KiB) +- Relation default.bf2[a2#45926,b2#45927,c2#45928,d2#45929,e2#45930,f2#45931] parquet, Statistics(sizeInBytes=3.3 KiB) ``` 2. Make sure injected filters could push through Shuffle if current join is a broadcast join. ### Why are the changes needed? Improve query performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #36080 from wangyum/SPARK-38797. Lead-authored-by: Yuming Wang Co-authored-by: Yuming Wang Signed-off-by: Yuming Wang --- .../catalyst/optimizer/InjectRuntimeFilter.scala | 5 +++-- .../spark/sql/InjectRuntimeFilterSuite.scala | 26 ++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 134292ae30d..01c1786e05a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -141,6 +141,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J plan.exists { case Join(left, right, _, _, hint) => isProbablyShuffleJoin(left, right, hint) case _: Aggregate => true + case _: Window => true case _ => false } } @@ -172,8 +173,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J /** * Check that: - * - The filterApplicationSideJoinExp can be pushed down through joins and aggregates (ie the - * expression references originate from a single leaf node) + * - The filterApplicationSideJoinExp can be pushed down through joins, aggregates and windows + * (ie the expression references originate from a single leaf node) * - The filter creation side has a selective predicate * - The current join is a shuffle join or a broadcast join that has a shuffle below it * - The max filterApplicationSide scan size is greater than a configurable threshold diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala index 726fa341b5c..6065f232109 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala @@ -539,4 +539,30 @@ class InjectRuntimeFilterSuite
[spark] branch master updated: [SPARK-38890][PYTHON] Implement `ignore_index` of `DataFrame.sort_index`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e549c6fd22a [SPARK-38890][PYTHON] Implement `ignore_index` of `DataFrame.sort_index` e549c6fd22a is described below commit e549c6fd22ac0d5a6df0d817212637c532b9a681 Author: Xinrong Meng AuthorDate: Thu Apr 14 09:34:13 2022 +0900 [SPARK-38890][PYTHON] Implement `ignore_index` of `DataFrame.sort_index` ### What changes were proposed in this pull request? Implement `ignore_index` of `DataFrame.sort_index`. ### Why are the changes needed? To reach parity with pandas API. ### Does this PR introduce _any_ user-facing change? Yes. `ignore_index` of `DataFrame.sort_index` is supported as below: ```py >>> df = ps.DataFrame({'A': [2, 1, np.nan]}, index=['b', 'a', np.nan]) >>> df A b2.0 a1.0 NaN NaN >>> df.sort_index(ignore_index=True) A 0 1.0 1 2.0 2 NaN ``` ### How was this patch tested? Unit tests. Closes #36184 from xinrong-databricks/frame.sort_index.ignore_index. Authored-by: Xinrong Meng Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/frame.py| 22 +- python/pyspark/pandas/tests/test_dataframe.py | 12 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 36e992fef93..a78aaa66f08 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -7014,6 +7014,7 @@ defaultdict(, {'col..., 'col...})] inplace: bool = False, kind: str = None, na_position: str = "last", +ignore_index: bool = False, ) -> Optional["DataFrame"]: """ Sort object by labels (along an axis) @@ -7033,6 +7034,10 @@ defaultdict(, {'col..., 'col...})] na_position : {‘first’, ‘last’}, default ‘last’ first puts NaNs at the beginning, last puts NaNs at the end. Not implemented for MultiIndex. +ignore_index : bool, default False +If True, the resulting axis will be labeled 0, 1, …, n - 1. + +.. versionadded:: 3.4.0 Returns --- @@ -7060,6 +7065,12 @@ defaultdict(, {'col..., 'col...})] a1.0 b2.0 +>>> df.sort_index(ignore_index=True) + A +0 1.0 +1 2.0 +2 NaN + >>> df.sort_index(inplace=True) >>> df A @@ -7091,6 +7102,13 @@ defaultdict(, {'col..., 'col...})] b 0 1 2 a 1 2 1 b 1 0 3 + +>>> df.sort_index(ignore_index=True) + A B +0 3 0 +1 2 1 +2 1 2 +3 0 3 """ inplace = validate_bool_kwarg(inplace, "inplace") axis = validate_axis(axis) @@ -7112,10 +7130,12 @@ defaultdict(, {'col..., 'col...})] psdf = self._sort(by=by, ascending=ascending, na_position=na_position) if inplace: +if ignore_index: +psdf.reset_index(drop=True, inplace=inplace) self._update_internal_frame(psdf._internal) return None else: -return psdf +return psdf.reset_index(drop=True) if ignore_index else psdf def swaplevel( self, i: Union[int, Name] = -2, j: Union[int, Name] = -1, axis: Axis = 0 diff --git a/python/pyspark/pandas/tests/test_dataframe.py b/python/pyspark/pandas/tests/test_dataframe.py index fa32b38d3c9..b99a9a2e807 100644 --- a/python/pyspark/pandas/tests/test_dataframe.py +++ b/python/pyspark/pandas/tests/test_dataframe.py @@ -1678,6 +1678,8 @@ class DataFrameTest(ComparisonTestBase, SQLTestUtils): # Assert default behavior without parameters self.assert_eq(psdf.sort_index(), pdf.sort_index()) +# Assert ignoring index +self.assert_eq(psdf.sort_index(ignore_index=True), pdf.sort_index(ignore_index=True)) # Assert sorting descending self.assert_eq(psdf.sort_index(ascending=False), pdf.sort_index(ascending=False)) # Assert sorting NA indices first @@ -1694,6 +1696,14 @@ class DataFrameTest(ComparisonTestBase, SQLTestUtils): self.assertEqual(psdf.sort_index(inplace=True), pdf.sort_index(inplace=True)) self.assert_eq(psdf, pdf) self.assert_eq(psserA, pserA) +pserA = pdf.A +psserA = psdf.A +self.assertEqual( +psdf.sort_index(inplace=True, ascending=False, ignore_index=True), +pdf.sort_index(inplace=True, ascending=False, ignore_index=True), +) +self.assert_eq(psdf, pdf) +self.assert_eq(psserA, pserA) # Assert multi-indices pdf = pd.DataFrame( @@
[spark] branch master updated (c0c1f35cd92 -> 0085bfc8aca)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from c0c1f35cd92 [SPARK-37014][PYTHON] Inline type hints for python/pyspark/streaming/context.py add 0085bfc8aca [SPARK-38107][SQL][FOLLOWUP] Refine the error-class name and message for grouped agg pandas UDF No new revisions were added by this update. Summary of changes: core/src/main/resources/error/error-classes.json| 6 +++--- python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py | 4 +++- .../apache/spark/sql/errors/QueryCompilationErrors.scala| 7 --- .../org/apache/spark/sql/execution/SparkStrategies.scala| 7 +-- .../spark/sql/errors/QueryCompilationErrorsSuite.scala | 13 - 5 files changed, 23 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-37014][PYTHON] Inline type hints for python/pyspark/streaming/context.py
This is an automated email from the ASF dual-hosted git repository. zero323 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new baaa3bbecd9 [SPARK-37014][PYTHON] Inline type hints for python/pyspark/streaming/context.py baaa3bbecd9 is described below commit baaa3bbecd9f63aa0a71cf76de4b53d3c1dcf7a4 Author: dch nguyen AuthorDate: Thu Apr 14 02:03:24 2022 +0200 [SPARK-37014][PYTHON] Inline type hints for python/pyspark/streaming/context.py ### What changes were proposed in this pull request? Inline type hints for python/pyspark/streaming/context.py from Inline type hints for python/pyspark/streaming/context.pyi. ### Why are the changes needed? Currently, there is type hint stub files python/pyspark/streaming/context.pyi to show the expected types for functions, but we can also take advantage of static type checking within the functions by inlining the type hints. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test. Closes #34293 from dchvn/SPARK-37014. Authored-by: dch nguyen Signed-off-by: zero323 (cherry picked from commit c0c1f35cd9279bc1a7a50119be72a297162a9b55) Signed-off-by: zero323 --- python/pyspark/streaming/context.py | 123 --- python/pyspark/streaming/context.pyi | 71 python/pyspark/streaming/kinesis.py | 9 +-- 3 files changed, 91 insertions(+), 112 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index cc9875d6575..52e5efed063 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -14,18 +14,22 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from typing import Any, Callable, List, Optional, TypeVar -from py4j.java_gateway import java_import, is_instance_of +from py4j.java_gateway import java_import, is_instance_of, JavaObject from pyspark import RDD, SparkConf from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer from pyspark.context import SparkContext from pyspark.storagelevel import StorageLevel from pyspark.streaming.dstream import DStream +from pyspark.streaming.listener import StreamingListener from pyspark.streaming.util import TransformFunction, TransformFunctionSerializer __all__ = ["StreamingContext"] +T = TypeVar("T") + class StreamingContext: """ @@ -51,27 +55,35 @@ class StreamingContext: # Reference to a currently active StreamingContext _activeContext = None -def __init__(self, sparkContext, batchDuration=None, jssc=None): - +def __init__( +self, +sparkContext: SparkContext, +batchDuration: Optional[int] = None, +jssc: Optional[JavaObject] = None, +): self._sc = sparkContext self._jvm = self._sc._jvm self._jssc = jssc or self._initialize_context(self._sc, batchDuration) -def _initialize_context(self, sc, duration): +def _initialize_context(self, sc: SparkContext, duration: Optional[int]) -> JavaObject: self._ensure_initialized() +assert self._jvm is not None and duration is not None return self._jvm.JavaStreamingContext(sc._jsc, self._jduration(duration)) -def _jduration(self, seconds): +def _jduration(self, seconds: int) -> JavaObject: """ Create Duration object given number of seconds """ +assert self._jvm is not None return self._jvm.Duration(int(seconds * 1000)) @classmethod -def _ensure_initialized(cls): +def _ensure_initialized(cls) -> None: SparkContext._ensure_initialized() gw = SparkContext._gateway +assert gw is not None + java_import(gw.jvm, "org.apache.spark.streaming.*") java_import(gw.jvm, "org.apache.spark.streaming.api.java.*") java_import(gw.jvm, "org.apache.spark.streaming.api.python.*") @@ -83,11 +95,15 @@ class StreamingContext: # register serializer for TransformFunction # it happens before creating SparkContext when loading from checkpointing cls._transformerSerializer = TransformFunctionSerializer( -SparkContext._active_spark_context, CloudPickleSerializer(), gw +SparkContext._active_spark_context, +CloudPickleSerializer(), +gw, ) @classmethod -def getOrCreate(cls, checkpointPath, setupFunc): +def getOrCreate( +cls, checkpointPath: str, setupFunc: Callable[[], "StreamingContext"] +) -> "StreamingContext": """ Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. If checkpoint data exists in the
[spark] branch master updated: [SPARK-37014][PYTHON] Inline type hints for python/pyspark/streaming/context.py
This is an automated email from the ASF dual-hosted git repository. zero323 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c0c1f35cd92 [SPARK-37014][PYTHON] Inline type hints for python/pyspark/streaming/context.py c0c1f35cd92 is described below commit c0c1f35cd9279bc1a7a50119be72a297162a9b55 Author: dch nguyen AuthorDate: Thu Apr 14 02:03:24 2022 +0200 [SPARK-37014][PYTHON] Inline type hints for python/pyspark/streaming/context.py ### What changes were proposed in this pull request? Inline type hints for python/pyspark/streaming/context.py from Inline type hints for python/pyspark/streaming/context.pyi. ### Why are the changes needed? Currently, there is type hint stub files python/pyspark/streaming/context.pyi to show the expected types for functions, but we can also take advantage of static type checking within the functions by inlining the type hints. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing test. Closes #34293 from dchvn/SPARK-37014. Authored-by: dch nguyen Signed-off-by: zero323 --- python/pyspark/streaming/context.py | 123 --- python/pyspark/streaming/context.pyi | 71 python/pyspark/streaming/kinesis.py | 9 +-- 3 files changed, 91 insertions(+), 112 deletions(-) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index cc9875d6575..52e5efed063 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -14,18 +14,22 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from typing import Any, Callable, List, Optional, TypeVar -from py4j.java_gateway import java_import, is_instance_of +from py4j.java_gateway import java_import, is_instance_of, JavaObject from pyspark import RDD, SparkConf from pyspark.serializers import NoOpSerializer, UTF8Deserializer, CloudPickleSerializer from pyspark.context import SparkContext from pyspark.storagelevel import StorageLevel from pyspark.streaming.dstream import DStream +from pyspark.streaming.listener import StreamingListener from pyspark.streaming.util import TransformFunction, TransformFunctionSerializer __all__ = ["StreamingContext"] +T = TypeVar("T") + class StreamingContext: """ @@ -51,27 +55,35 @@ class StreamingContext: # Reference to a currently active StreamingContext _activeContext = None -def __init__(self, sparkContext, batchDuration=None, jssc=None): - +def __init__( +self, +sparkContext: SparkContext, +batchDuration: Optional[int] = None, +jssc: Optional[JavaObject] = None, +): self._sc = sparkContext self._jvm = self._sc._jvm self._jssc = jssc or self._initialize_context(self._sc, batchDuration) -def _initialize_context(self, sc, duration): +def _initialize_context(self, sc: SparkContext, duration: Optional[int]) -> JavaObject: self._ensure_initialized() +assert self._jvm is not None and duration is not None return self._jvm.JavaStreamingContext(sc._jsc, self._jduration(duration)) -def _jduration(self, seconds): +def _jduration(self, seconds: int) -> JavaObject: """ Create Duration object given number of seconds """ +assert self._jvm is not None return self._jvm.Duration(int(seconds * 1000)) @classmethod -def _ensure_initialized(cls): +def _ensure_initialized(cls) -> None: SparkContext._ensure_initialized() gw = SparkContext._gateway +assert gw is not None + java_import(gw.jvm, "org.apache.spark.streaming.*") java_import(gw.jvm, "org.apache.spark.streaming.api.java.*") java_import(gw.jvm, "org.apache.spark.streaming.api.python.*") @@ -83,11 +95,15 @@ class StreamingContext: # register serializer for TransformFunction # it happens before creating SparkContext when loading from checkpointing cls._transformerSerializer = TransformFunctionSerializer( -SparkContext._active_spark_context, CloudPickleSerializer(), gw +SparkContext._active_spark_context, +CloudPickleSerializer(), +gw, ) @classmethod -def getOrCreate(cls, checkpointPath, setupFunc): +def getOrCreate( +cls, checkpointPath: str, setupFunc: Callable[[], "StreamingContext"] +) -> "StreamingContext": """ Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be @@ -104,6 +120,8 @@ class StreamingContext:
[spark] branch master updated (43c63337f98 -> e0c9604f0c4)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 43c63337f98 [SPARK-34659][UI] Forbid using keyword "proxy" or "history" in reverse proxy URL add e0c9604f0c4 [SPARK-38835][CORE][TESTS] Refactor `FsHistoryProviderSuite` to add UTs for RocksDB No new revisions were added by this update. Summary of changes: .../spark/deploy/history/FsHistoryProviderSuite.scala | 19 --- 1 file changed, 16 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-34659][UI] Forbid using keyword "proxy" or "history" in reverse proxy URL
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 43c63337f98 [SPARK-34659][UI] Forbid using keyword "proxy" or "history" in reverse proxy URL 43c63337f98 is described below commit 43c63337f98097b046b70fcfb4fae44a3295b513 Author: Gengliang Wang AuthorDate: Wed Apr 13 10:50:52 2022 -0700 [SPARK-34659][UI] Forbid using keyword "proxy" or "history" in reverse proxy URL ### What changes were proposed in this pull request? When the reverse proxy URL contains "proxy" or "history", the application ID in UI is wrongly parsed. For example, if we set spark.ui.reverseProxyURL as "/test/proxy/prefix" or "/test/history/prefix", the application ID is parsed as "prefix" and the related API calls will fail in stages/executors pages: ``` .../api/v1/applications/prefix/allexecutors ``` instead of ``` .../api/v1/applications/app-20220413142241-/allexecutors ``` There are more contexts in https://github.com/apache/spark/pull/31774 We can fix this entirely like https://github.com/apache/spark/pull/36174, but it is risky and complicated to do that. ### Why are the changes needed? Avoid users setting keywords in reverse proxy URL and getting wrong UI results. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A new unit test. Also doc preview: https://user-images.githubusercontent.com/1097932/163126641-da315012-aae5-45a5-a048-340a5dd6e91e.png;> Closes #36176 from gengliangwang/forbidURLPrefix. Authored-by: Gengliang Wang Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/SparkContext.scala | 5 ++--- core/src/main/scala/org/apache/spark/internal/config/UI.scala | 5 + core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 11 +++ docs/configuration.md | 4 +++- 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7257371256d..c6cb5cb5e19 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -592,9 +592,8 @@ class SparkContext(config: SparkConf) extends Logging { _env.blockManager.blockStoreClient.setAppAttemptId(attemptId) } if (_conf.get(UI_REVERSE_PROXY)) { - val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") + -"/proxy/" + _applicationId - System.setProperty("spark.ui.proxyBase", proxyUrl) + val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL).getOrElse("").stripSuffix("/") + System.setProperty("spark.ui.proxyBase", proxyUrl + "/proxy/" + _applicationId) } _ui.foreach(_.setAppId(_applicationId)) _env.blockManager.initialize(_applicationId) diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala index 1790e97b35a..464034b8fcd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala @@ -79,6 +79,11 @@ private[spark] object UI { "reach your proxy.") .version("2.1.0") .stringConf +.checkValue ({ s => + val words = s.split("/") + !words.contains("proxy") && !words.contains("history") }, + "Cannot use the keyword 'proxy' or 'history' in reverse proxy URL. Spark UI relies on both " + +"keywords for getting REST API endpoints from URIs.") .createOptional val UI_KILL_ENABLED = ConfigBuilder("spark.ui.killEnabled") diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 8671180b3ca..c64a4371911 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -1343,6 +1343,17 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(env.blockManager.blockStoreClient.getAppAttemptId.equals("1")) } + test("SPARK-34659: check invalid UI_REVERSE_PROXY_URL") { +val reverseProxyUrl = "http://proxyhost:8080/path/proxy/spark; +val conf = new SparkConf().setAppName("testAppAttemptId") + .setMaster("pushbasedshuffleclustermanager") +conf.set(UI_REVERSE_PROXY, true) +conf.set(UI_REVERSE_PROXY_URL, reverseProxyUrl) +val msg = intercept[java.lang.IllegalArgumentException] { + new SparkContext(conf) +}.getMessage +assert(msg.contains("Cannot use the keyword 'proxy' or 'history' in reverse proxy URL")) + } } object
[spark] branch master updated: [SPARK-38678][TESTS][FOLLOWUP] Enable RocksDB tests in `AppStatusStoreSuite` and `StreamingQueryStatusListenerSuite` on Apple Silicon on MacOS
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7ec3a5730bd [SPARK-38678][TESTS][FOLLOWUP] Enable RocksDB tests in `AppStatusStoreSuite` and `StreamingQueryStatusListenerSuite` on Apple Silicon on MacOS 7ec3a5730bd is described below commit 7ec3a5730bd864089c22e19e823d87b107688378 Author: yangjie01 AuthorDate: Wed Apr 13 10:45:22 2022 -0700 [SPARK-38678][TESTS][FOLLOWUP] Enable RocksDB tests in `AppStatusStoreSuite` and `StreamingQueryStatusListenerSuite` on Apple Silicon on MacOS ### What changes were proposed in this pull request? This pr aims to enable RocksDB tests in `AppStatusStoreSuite` and `StreamingQueryStatusListenerSuite` on Apple Silicon on MacOS, it's a followup of SPARK-38678. ### Why are the changes needed? Enable more RocksDB related test on Apple Silicon on MacOS ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GA - Manual test on Apple Silicon environment: ``` build/sbt "core/testOnly *AppStatusStoreSuite*" "sql/testOnly *StreamingQueryStatusListenerSuite*" ``` All tests passed. Closes #36139 from LuciferYang/SPARK-38678-FOLLOWUP. Authored-by: yangjie01 Signed-off-by: Dongjoon Hyun --- .../apache/spark/status/AppStatusStoreSuite.scala | 26 +- .../ui/StreamingQueryStatusListenerSuite.scala | 1 - 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala index 70852164b89..b05f2b799d2 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala @@ -90,10 +90,6 @@ class AppStatusStoreSuite extends SparkFunSuite { if (live) { return AppStatusStore.createLiveStore(conf) } -// LevelDB doesn't support Apple Silicon yet -if (Utils.isMacOnAppleSilicon && disk) { - return null -} val store: KVStore = if (disk) { conf.set(HYBRID_STORE_DISK_BACKEND, diskStoreType.toString) @@ -106,12 +102,22 @@ class AppStatusStoreSuite extends SparkFunSuite { new AppStatusStore(store) } - Seq( -"disk leveldb" -> createAppStore(disk = true, HybridStoreDiskBackend.LEVELDB, live = false), -"disk rocksdb" -> createAppStore(disk = true, HybridStoreDiskBackend.ROCKSDB, live = false), -"in memory" -> createAppStore(disk = false, live = false), -"in memory live" -> createAppStore(disk = false, live = true) - ).foreach { case (hint, appStore) => + private val cases = { +val baseCases = Seq( + "disk rocksdb" -> createAppStore(disk = true, HybridStoreDiskBackend.ROCKSDB, live = false), + "in memory" -> createAppStore(disk = false, live = false), + "in memory live" -> createAppStore(disk = false, live = true) +) +if (Utils.isMacOnAppleSilicon) { + baseCases +} else { + Seq( +"disk leveldb" -> createAppStore(disk = true, HybridStoreDiskBackend.LEVELDB, live = false) + ) ++ baseCases +} + } + + cases.foreach { case (hint, appStore) => test(s"SPARK-26260: summary should contain only successful tasks' metrics (store = $hint)") { assume(appStore != null) val store = appStore.store diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala index 1d1b51354f8..1a51b58f4f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala @@ -236,7 +236,6 @@ class StreamingQueryStatusListenerSuite extends StreamTest { } test("SPARK-38056: test writing StreamingQueryData to a RocksDB store") { -assume(!Utils.isMacOnAppleSilicon) val conf = new SparkConf() .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString) val testDir = Utils.createTempDir() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-38745][SQL][TESTS] Move the tests for `NON_PARTITION_COLUMN` to `QueryCompilationErrorsDSv2Suite`
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 98fbbcdac5d [SPARK-38745][SQL][TESTS] Move the tests for `NON_PARTITION_COLUMN` to `QueryCompilationErrorsDSv2Suite` 98fbbcdac5d is described below commit 98fbbcdac5defebec81626dd1dbd5522a2fd910b Author: Max Gekk AuthorDate: Wed Apr 13 17:39:40 2022 +0300 [SPARK-38745][SQL][TESTS] Move the tests for `NON_PARTITION_COLUMN` to `QueryCompilationErrorsDSv2Suite` ### What changes were proposed in this pull request? Move test for the error class `NON_PARTITION_COLUMN` from `InsertIntoSQLOnlyTests` to `QueryCompilationErrorsDSv2Suite`. ### Why are the changes needed? To improve code maintenance - all tests for error classes are placed to Query.*ErrorsSuite. Also exception are raised from [QueryCompilationErrors](https://github.com/apache/spark/blob/bf75b495e18ed87d0c118bfd5f1ceb52d720cad9/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala#L100-L104), so, tests should be in `QueryCompilationErrorsDSv2Suite` for consistency. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the moved tests: ``` $ build/sbt "test:testOnly *QueryCompilationErrorsDSv2Suite" ``` Closes #36175 from MaxGekk/move-tests-for-NON_PARTITION_COLUMN. Authored-by: Max Gekk Signed-off-by: Max Gekk --- .../spark/sql/connector/InsertIntoTests.scala | 36 +--- .../errors/QueryCompilationErrorsDSv2Suite.scala | 49 +++--- 2 files changed, 45 insertions(+), 40 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index fc98cfd5138..7493966790c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -198,7 +198,7 @@ trait InsertIntoSQLOnlyTests /** Whether to include the SQL specific tests in this trait within the extending test suite. */ protected val includeSQLOnlyTests: Boolean - private def withTableAndData(tableName: String)(testFn: String => Unit): Unit = { + protected def withTableAndData(tableName: String)(testFn: String => Unit): Unit = { withTable(tableName) { val viewName = "tmp_view" val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") @@ -248,40 +248,6 @@ trait InsertIntoSQLOnlyTests } } -test("InsertInto: static PARTITION clause fails with non-partition column") { - val t1 = s"${catalogAndNamespace}tbl" - withTableAndData(t1) { view => -sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (data)") - -val exc = intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $t1 PARTITION (id=1) SELECT data FROM $view") -} - -verifyTable(t1, spark.emptyDataFrame) -assert(exc.getMessage.contains( - "PARTITION clause cannot contain a non-partition column name")) -assert(exc.getMessage.contains("id")) -assert(exc.getErrorClass == "NON_PARTITION_COLUMN") - } -} - -test("InsertInto: dynamic PARTITION clause fails with non-partition column") { - val t1 = s"${catalogAndNamespace}tbl" - withTableAndData(t1) { view => -sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)") - -val exc = intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $t1 PARTITION (data) SELECT * FROM $view") -} - -verifyTable(t1, spark.emptyDataFrame) -assert(exc.getMessage.contains( - "PARTITION clause cannot contain a non-partition column name")) -assert(exc.getMessage.contains("data")) -assert(exc.getErrorClass == "NON_PARTITION_COLUMN") - } -} - test("InsertInto: overwrite - dynamic clause - static mode") { withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString) { val t1 = s"${catalogAndNamespace}tbl" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala index bfea3f535dd..042f130d7f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala @@ -17,18 +17,27 @@ package org.apache.spark.sql.errors -import org.apache.spark.sql.{AnalysisException, QueryTest} -import
[spark] branch master updated (ee74bd0d4e3 -> 30fc0ba2307)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from ee74bd0d4e3 [SPARK-38832][SQL] Remove unnecessary distinct in aggregate expression by distinctKeys add 30fc0ba2307 [SPARK-38844][PYTHON][SQL] Implement linear interpolate No new revisions were added by this update. Summary of changes: .../docs/source/reference/pyspark.pandas/frame.rst | 1 + .../source/reference/pyspark.pandas/series.rst | 1 + python/pyspark/pandas/frame.py | 17 +++ python/pyspark/pandas/generic.py | 84 ++ python/pyspark/pandas/missing/frame.py | 1 - python/pyspark/pandas/missing/series.py| 1 - python/pyspark/pandas/series.py| 57 ++ .../pyspark/pandas/tests/test_generic_functions.py | 124 + .../catalyst/expressions/windowExpressions.scala | 74 .../spark/sql/api/python/PythonSQLUtils.scala | 6 +- 10 files changed, 363 insertions(+), 3 deletions(-) create mode 100644 python/pyspark/pandas/tests/test_generic_functions.py - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-38832][SQL] Remove unnecessary distinct in aggregate expression by distinctKeys
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ee74bd0d4e3 [SPARK-38832][SQL] Remove unnecessary distinct in aggregate expression by distinctKeys ee74bd0d4e3 is described below commit ee74bd0d4e3d97b33aa57fe523ab4f5537125f68 Author: ulysses-you AuthorDate: Wed Apr 13 18:10:33 2022 +0800 [SPARK-38832][SQL] Remove unnecessary distinct in aggregate expression by distinctKeys ### What changes were proposed in this pull request? Make `EliminateDistinct` support eliminate distinct by child distinct keys. ### Why are the changes needed? We can remove the distinct in aggregate expression if the distinct semantics is guaranteed by child. For example: ```sql SELECT count(distinct c) FROM ( SELECT c FROM t GROUP BY c ) ``` ### Does this PR introduce _any_ user-facing change? improve performance ### How was this patch tested? add test in `EliminateDistinctSuite` Closes #36117 from ulysses-you/remove-distinct. Authored-by: ulysses-you Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 25 -- .../plans/logical/LogicalPlanDistinctKeys.scala| 8 ++- .../optimizer/EliminateDistinctSuite.scala | 18 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 66c2ad84cce..bb788336c6d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -146,7 +146,7 @@ abstract class Optimizer(catalogManager: CatalogManager) PushDownPredicates) :: Nil } -val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) :: +val batches = ( // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). // However, because we also use the analyzer to canonicalized queries (for view definition), @@ -166,6 +166,7 @@ abstract class Optimizer(catalogManager: CatalogManager) // // Optimizer rules start here // +Batch("Eliminate Distinct", Once, EliminateDistinct) :: // - Do the first call of CombineUnions before starting the major Optimizer rules, // since it can reduce the number of iteration and the other rules could add/move // extra operators between two adjacent Union operators. @@ -411,14 +412,26 @@ abstract class Optimizer(catalogManager: CatalogManager) } /** - * Remove useless DISTINCT for MAX and MIN. + * Remove useless DISTINCT: + * 1. For some aggregate expression, e.g.: MAX and MIN. + * 2. If the distinct semantics is guaranteed by child. + * * This rule should be applied before RewriteDistinctAggregates. */ object EliminateDistinct extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressionsWithPruning( -_.containsPattern(AGGREGATE_EXPRESSION)) { -case ae: AggregateExpression if ae.isDistinct && isDuplicateAgnostic(ae.aggregateFunction) => - ae.copy(isDistinct = false) + override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( +_.containsPattern(AGGREGATE)) { +case agg: Aggregate => + agg.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { +case ae: AggregateExpression if ae.isDistinct && + isDuplicateAgnostic(ae.aggregateFunction) => + ae.copy(isDistinct = false) + +case ae: AggregateExpression if ae.isDistinct && + agg.child.distinctKeys.exists( + _.subsetOf(ExpressionSet(ae.aggregateFunction.children.filterNot(_.foldable => + ae.copy(isDistinct = false) + } } def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala index 1843c2da478..2ffa5a0e594 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala @@ -29,6
[spark] branch master updated: [SPARK-38774][PYTHON] Implement Series.autocorr
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new eb699ec138d [SPARK-38774][PYTHON] Implement Series.autocorr eb699ec138d is described below commit eb699ec138d4a49ecc204f530eeefa513b42f4ad Author: Ruifeng Zheng AuthorDate: Wed Apr 13 18:09:06 2022 +0900 [SPARK-38774][PYTHON] Implement Series.autocorr ### What changes were proposed in this pull request? Implement Series.autocorr ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, Series now support function `autocorr` ``` In [86]: s = pd.Series([.2, .0, .6, .2, np.nan, .5, .6]) In [87]: s.autocorr() Out[87]: -0.14121975762272054 ``` ### How was this patch tested? added doctest Closes #36048 from zhengruifeng/pandas_series_autocorr. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- .../source/reference/pyspark.pandas/series.rst | 1 + python/pyspark/pandas/missing/series.py| 1 - python/pyspark/pandas/series.py| 76 ++ python/pyspark/pandas/tests/test_series.py | 17 + 4 files changed, 94 insertions(+), 1 deletion(-) diff --git a/python/docs/source/reference/pyspark.pandas/series.rst b/python/docs/source/reference/pyspark.pandas/series.rst index b6a0d1e52d5..0f897ce2e14 100644 --- a/python/docs/source/reference/pyspark.pandas/series.rst +++ b/python/docs/source/reference/pyspark.pandas/series.rst @@ -134,6 +134,7 @@ Computations / Descriptive Stats Series.abs Series.all Series.any + Series.autocorr Series.between Series.clip Series.corr diff --git a/python/pyspark/pandas/missing/series.py b/python/pyspark/pandas/missing/series.py index 9bb191f1c81..07094b64bbb 100644 --- a/python/pyspark/pandas/missing/series.py +++ b/python/pyspark/pandas/missing/series.py @@ -33,7 +33,6 @@ class MissingPandasLikeSeries: # Functions asfreq = _unsupported_function("asfreq") -autocorr = _unsupported_function("autocorr") combine = _unsupported_function("combine") convert_dtypes = _unsupported_function("convert_dtypes") infer_objects = _unsupported_function("infer_objects") diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index ced81b12e8c..d6cc4a8627d 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -3045,6 +3045,82 @@ class Series(Frame, IndexOpsMixin, Generic[T]): DataFrame(internal.with_new_sdf(sdf, index_fields=([None] * internal.index_level))) ) +def autocorr(self, periods: int = 1) -> float: +""" +Compute the lag-N autocorrelation. + +This method computes the Pearson correlation between +the Series and its shifted self. + +.. note:: the current implementation of rank uses Spark's Window without +specifying partition specification. This leads to move all data into +single partition in single machine and could cause serious +performance degradation. Avoid this method against very large dataset. + +.. versionadded:: 3.4.0 + +Parameters +-- +periods : int, default 1 +Number of lags to apply before performing autocorrelation. + +Returns +--- +float +The Pearson correlation between self and self.shift(lag). + +See Also + +Series.corr : Compute the correlation between two Series. +Series.shift : Shift index by desired number of periods. +DataFrame.corr : Compute pairwise correlation of columns. + +Notes +- +If the Pearson correlation is not well defined return 'NaN'. + +Examples + +>>> s = ps.Series([.2, .0, .6, .2, np.nan, .5, .6]) +>>> s.autocorr() # doctest: +ELLIPSIS +-0.141219... +>>> s.autocorr(0) # doctest: +ELLIPSIS +1.0... +>>> s.autocorr(2) # doctest: +ELLIPSIS +0.970725... +>>> s.autocorr(-3) # doctest: +ELLIPSIS +0.277350... +>>> s.autocorr(5) # doctest: +ELLIPSIS +-1.00... +>>> s.autocorr(6) # doctest: +ELLIPSIS +nan + +If the Pearson correlation is not well defined, then 'NaN' is returned. + +>>> s = ps.Series([1, 0, 0, 0]) +>>> s.autocorr() +nan +""" +# This implementation is suboptimal because it moves all data to a single partition, +# global sort should be used instead of window, but it should be a start +if not isinstance(periods, int): +raise TypeError("periods should be an int; however, got [%s]" %
[spark] branch branch-3.3 updated: [SPARK-38829][SQL][3.3] Remove TimestampNTZ type support in Parquet for Spark 3.3
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 76f40eef8b9 [SPARK-38829][SQL][3.3] Remove TimestampNTZ type support in Parquet for Spark 3.3 76f40eef8b9 is described below commit 76f40eef8b97e23f4a16e471366ae410a3e6cc20 Author: Ivan Sadikov AuthorDate: Wed Apr 13 17:06:03 2022 +0800 [SPARK-38829][SQL][3.3] Remove TimestampNTZ type support in Parquet for Spark 3.3 ### What changes were proposed in this pull request? This is a follow-up for https://github.com/apache/spark/pull/36094. I added `Utils.isTesting` whenever we perform schema conversion or row conversion for TimestampNTZType. I verified that the tests, e.g. ParquetIOSuite, fail with unsupported data type when running in non-testing mode: ``` [info] Cause: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 40.0 failed 1 times, most recent failure: Lost task 1.0 in stage 40.0 (TID 66) (ip-10-110-16-208.us-west-2.compute.internal executor driver): org.apache.spark.sql.AnalysisException: Unsupported data type timestamp_ntz [info] at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotConvertDataTypeToParquetTypeError(QueryCompilationErrors.scala:1304) [info] at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:707) [info] at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:479) [info] at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.$anonfun$convert$1(ParquetSchemaConverter.scala:471) ``` ### Why are the changes needed? We have to disable TimestampNTZType as other parts of the codebase do not yet support this type. ### Does this PR introduce _any_ user-facing change? No, the TimestampNTZ type is not released yet. ### How was this patch tested? I tested the changes manually by rerunning the test suites that verify TimestampNTZType in the non-testing mode. Closes #36137 from sadikovi/SPARK-38829-parquet-ntz-off. Authored-by: Ivan Sadikov Signed-off-by: Gengliang Wang --- .../sql/execution/datasources/parquet/ParquetRowConverter.scala| 5 - .../sql/execution/datasources/parquet/ParquetSchemaConverter.scala | 7 +-- .../sql/execution/datasources/parquet/ParquetWriteSupport.scala| 4 +++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index a955dd6fc76..ffd90fd722b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils /** * A [[ParentContainerUpdater]] is used by a Parquet converter to set converted values to some @@ -487,7 +488,9 @@ private[parquet] class ParquetRowConverter( parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 && parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] && !parquetType.getLogicalTypeAnnotation -.asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC +.asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC && + // SPARK-38829: Remove TimestampNTZ type support in Parquet for Spark 3.3 + Utils.isTesting /** * Parquet converter for strings. A dictionary is used to minimize string decoding cost. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 0e065f19a88..3419bf15f8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * This converter class is used to convert Parquet [[MessageType]] to
[spark] branch branch-3.3 updated: [SPARK-38833][PYTHON][SQL] Allow applyInPandas to return empty DataFrame without columns
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new c44020b961f [SPARK-38833][PYTHON][SQL] Allow applyInPandas to return empty DataFrame without columns c44020b961f is described below commit c44020b961ffe44e30ee617af6ffb84effbd28fe Author: Enrico Minack AuthorDate: Wed Apr 13 17:07:27 2022 +0900 [SPARK-38833][PYTHON][SQL] Allow applyInPandas to return empty DataFrame without columns ### What changes were proposed in this pull request? Methods `wrap_cogrouped_map_pandas_udf` and `wrap_grouped_map_pandas_udf` in `python/pyspark/worker.py` do not need to reject `pd.DataFrame`s with no columns return by udf when that DataFrame is empty (zero rows). This allows to return empty DataFrames without the need to define columns. The DataFrame is empty after all! **The proposed behaviour is consistent with the current behaviour of `DataFrame.mapInPandas`.** ### Why are the changes needed? Returning an empty DataFrame from the lambda given to `applyInPandas` should be as easy as this: ```python return pd.DataFrame([]) ``` However, PySpark requires that empty DataFrame to have the right _number_ of columns. This seems redundant as the schema is already defined in the `applyInPandas` call. Returning a non-empty DataFrame does not require defining columns. Behaviour of `applyInPandas` should be consistent with `mapInPandas`. Here is an example to reproduce: ```python import pandas as pd from pyspark.sql.functions import pandas_udf, ceil df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) def mean_func(key, pdf): if key == (1,): return pd.DataFrame([]) else: return pd.DataFrame([key + (pdf.v.mean(),)]) df.groupby("id").applyInPandas(mean_func, schema="id long, v double").show() ``` ### Does this PR introduce _any_ user-facing change? It changes the behaviour of the following calls to allow returning empty `pd.DataFrame` without defining columns. The PySpark DataFrame returned by `applyInPandas` is unchanged: - `df.groupby(…).applyInPandas(…)` - `df.cogroup(…).applyInPandas(…)` ### How was this patch tested? Tests are added that test `applyInPandas` and `mapInPandas` when returning - empty DataFrame with no columns - empty DataFrame with the wrong number of columns - non-empty DataFrame with wrong number of columns - something other than `pd.DataFrame` NOTE: It is not an error for `mapInPandas` to return DataFrames with more columns than specified in the `mapInPandas` schema. Closes #36120 from EnricoMi/branch-empty-pd-dataframes. Authored-by: Enrico Minack Signed-off-by: Hyukjin Kwon (cherry picked from commit 556c74578eb2379fc6e0ec8d147674d0b10e5a2c) Signed-off-by: Hyukjin Kwon --- .../pyspark/sql/tests/test_pandas_cogrouped_map.py | 97 ++ .../pyspark/sql/tests/test_pandas_grouped_map.py | 76 + python/pyspark/sql/tests/test_pandas_map.py| 71 ++-- python/pyspark/worker.py | 12 ++- 4 files changed, 246 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py index 58022fa6e83..3f403d9c9d6 100644 --- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py @@ -20,6 +20,7 @@ from typing import cast from pyspark.sql.functions import array, explode, col, lit, udf, pandas_udf from pyspark.sql.types import DoubleType, StructType, StructField, Row +from pyspark.sql.utils import PythonException from pyspark.testing.sqlutils import ( ReusedSQLTestCase, have_pandas, @@ -124,6 +125,102 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase): assert_frame_equal(expected, result) +def test_apply_in_pandas_not_returning_pandas_dataframe(self): +left = self.data1 +right = self.data2 + +def merge_pandas(lft, rgt): +return lft.size + rgt.size + +with QuietTest(self.sc): +with self.assertRaisesRegex( +PythonException, +"Return type of the user-defined function should be pandas.DataFrame, " +"but is ", +): +( +left.groupby("id") +.cogroup(right.groupby("id")) +.applyInPandas(merge_pandas, "id long, k int, v int, v2 int") +.collect() +) + +def
[spark] branch master updated (094a4ef6703 -> 556c74578eb)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 094a4ef6703 [SPARK-38804][PYTHON][SS][DOCS][FOLLOW-UP] Document StreamingQueryManager.removeListener add 556c74578eb [SPARK-38833][PYTHON][SQL] Allow applyInPandas to return empty DataFrame without columns No new revisions were added by this update. Summary of changes: .../pyspark/sql/tests/test_pandas_cogrouped_map.py | 97 ++ .../pyspark/sql/tests/test_pandas_grouped_map.py | 76 + python/pyspark/sql/tests/test_pandas_map.py| 71 ++-- python/pyspark/worker.py | 12 ++- 4 files changed, 246 insertions(+), 10 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (bf75b495e18 -> 094a4ef6703)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from bf75b495e18 [SPARK-38855][SQL] DS V2 supports push down math functions add 094a4ef6703 [SPARK-38804][PYTHON][SS][DOCS][FOLLOW-UP] Document StreamingQueryManager.removeListener No new revisions were added by this update. Summary of changes: python/docs/source/reference/pyspark.ss.rst | 1 + 1 file changed, 1 insertion(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-38677][PYSPARK][3.2] Python MonitorThread should detect deadlock due to blocking I/O
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new d66350a2aad [SPARK-38677][PYSPARK][3.2] Python MonitorThread should detect deadlock due to blocking I/O d66350a2aad is described below commit d66350a2aadd7f1e612cc9cf54009ea6f531630e Author: Ankur Dave AuthorDate: Wed Apr 13 16:58:18 2022 +0900 [SPARK-38677][PYSPARK][3.2] Python MonitorThread should detect deadlock due to blocking I/O ### What changes were proposed in this pull request? This PR cherry-picks https://github.com/apache/spark/pull/36065 to branch-3.2. --- When calling a Python UDF on a DataFrame with large rows, a deadlock can occur involving the following three threads: 1. The Scala task executor thread. During task execution, this is responsible for reading output produced by the Python process. However, in this case the task has finished early, and this thread is no longer reading output produced by the Python process. Instead, it is waiting for the Scala WriterThread to exit so that it can finish the task. 2. The Scala WriterThread. This is trying to send a large row to the Python process, and is waiting for the Python process to read that row. 3. The Python process. This is trying to send a large output to the Scala task executor thread, and is waiting for that thread to read that output, which will never happen. We considered the following three solutions for the deadlock: 1. When the task completes, make the Scala task executor thread close the socket before waiting for the Scala WriterThread to exit. If the WriterThread is blocked on a large write, this would interrupt that write and allow the WriterThread to exit. However, it would prevent Python worker reuse. 2. Modify PythonWorkerFactory to use interruptible I/O. [java.nio.channels.SocketChannel](https://docs.oracle.com/javase/6/docs/api/java/nio/channels/SocketChannel.html#write(java.nio.ByteBuffer)) supports interruptible blocking operations. The goal is that when the WriterThread is interrupted, it should exit even if it was blocked on a large write. However, this would be invasive. 3. Add a watchdog thread similar to the existing PythonRunner.MonitorThread to detect this deadlock and kill the Python worker. The MonitorThread currently kills the Python worker only if the task itself is interrupted. In this case, the task completes normally, so the MonitorThread does not take action. We want the new watchdog thread (WriterMonitorThread) to detect that the task is completed but the Python writer thread has not stopped, indicating a deadlock. This PR implements Option 3. ### Why are the changes needed? To fix a deadlock that can cause PySpark queries to hang. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a test that previously encountered the deadlock and timed out, and now succeeds. Closes #36172 from HyukjinKwon/SPARK-38677-3.2. Authored-by: Ankur Dave Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/api/python/PythonRunner.scala | 49 ++ python/pyspark/tests/test_rdd.py | 35 2 files changed, 84 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 66b23782cf9..fabff970f2b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -181,6 +181,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( } writerThread.start() +new WriterMonitorThread(SparkEnv.get, worker, writerThread, context).start() if (reuseWorker) { val key = (worker, context.taskAttemptId) // SPARK-35009: avoid creating multiple monitor threads for the same python worker @@ -643,6 +644,54 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( } } } + + /** + * This thread monitors the WriterThread and kills it in case of deadlock. + * + * A deadlock can arise if the task completes while the writer thread is sending input to the + * Python process (e.g. due to the use of `take()`), and the Python process is still producing + * output. When the inputs are sufficiently large, this can result in a deadlock due to the use of + * blocking I/O (SPARK-38677). To resolve the deadlock, we need to close the socket. + */ + class WriterMonitorThread( + env: SparkEnv, worker: Socket, writerThread: WriterThread, context: TaskContext) +extends Thread(s"Writer Monitor for $pythonExec (writer thread id ${writerThread.getId})") { + +
[spark] branch branch-3.3 updated: [SPARK-38855][SQL] DS V2 supports push down math functions
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 96c8b4f47c2 [SPARK-38855][SQL] DS V2 supports push down math functions 96c8b4f47c2 is described below commit 96c8b4f47c2d0df249efb02b248b5c230188 Author: Jiaan Geng AuthorDate: Wed Apr 13 14:41:47 2022 +0800 [SPARK-38855][SQL] DS V2 supports push down math functions ### What changes were proposed in this pull request? Currently, Spark have some math functions of ANSI standard. Please refer https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 These functions show below: `LN`, `EXP`, `POWER`, `SQRT`, `FLOOR`, `CEIL`, `WIDTH_BUCKET` The mainstream databases support these functions show below. | 函数 | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | SQLite | Influxdata | Singlestore | ElasticSearch | | | | | | | | | | | | | | | | | | | | | | | | | | `LN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `EXP` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `POWER` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | | `SQRT` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `FLOOR` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `CEIL` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `WIDTH_BUCKET` | Yes | No | No | No | Yes | No | Yes | Yes | Yes | Yes | Yes | No | No | No | Yes | No | No | No | No | No | No | No | DS V2 should supports push down these math functions. ### Why are the changes needed? DS V2 supports push down math functions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes #36140 from beliefer/SPARK-38855. Authored-by: Jiaan Geng Signed-off-by: Wenchen Fan (cherry picked from commit bf75b495e18ed87d0c118bfd5f1ceb52d720cad9) Signed-off-by: Wenchen Fan --- .../expressions/GeneralScalarExpression.java | 54 ++ .../sql/connector/util/V2ExpressionSQLBuilder.java | 7 +++ .../spark/sql/errors/QueryCompilationErrors.scala | 4 ++ .../sql/catalyst/util/V2ExpressionBuilder.scala| 28 ++- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 26 +++ .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 28 ++- 6 files changed, 145 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java index 8952761f9ef..58082d5ee09 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java @@ -94,6 +94,60 @@ import org.apache.spark.sql.connector.util.V2ExpressionSQLBuilder; *Since version: 3.3.0 * * + * Name: ABS + * + *SQL semantic: ABS(expr) + *Since version: 3.3.0 + * + * + * Name: COALESCE + * + *SQL semantic: COALESCE(expr1, expr2) + *Since version: 3.3.0 + * + * + * Name: LN + * + *SQL semantic: LN(expr) + *Since version: 3.3.0 + * + * + * Name: EXP + * + *SQL semantic: EXP(expr) + *Since version: 3.3.0 + * + * + * Name: POWER + * + *SQL semantic: POWER(expr, number) + *Since version: 3.3.0 + * + * + * Name: SQRT + * + *SQL semantic: SQRT(expr) + *Since version: 3.3.0 + * + * + * Name: FLOOR + * + *SQL semantic: FLOOR(expr) + *Since version: 3.3.0 + * + * + * Name: CEIL + * + *SQL semantic: CEIL(expr) + *Since version: 3.3.0 + * + * + * Name: WIDTH_BUCKET + * + *SQL semantic: WIDTH_BUCKET(expr) + *Since version: 3.3.0 + * + *
[spark] branch master updated: [SPARK-38855][SQL] DS V2 supports push down math functions
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bf75b495e18 [SPARK-38855][SQL] DS V2 supports push down math functions bf75b495e18 is described below commit bf75b495e18ed87d0c118bfd5f1ceb52d720cad9 Author: Jiaan Geng AuthorDate: Wed Apr 13 14:41:47 2022 +0800 [SPARK-38855][SQL] DS V2 supports push down math functions ### What changes were proposed in this pull request? Currently, Spark have some math functions of ANSI standard. Please refer https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388 These functions show below: `LN`, `EXP`, `POWER`, `SQRT`, `FLOOR`, `CEIL`, `WIDTH_BUCKET` The mainstream databases support these functions show below. | 函数 | PostgreSQL | ClickHouse | H2 | MySQL | Oracle | Redshift | Presto | Teradata | Snowflake | DB2 | Vertica | Exasol | SqlServer | Yellowbrick | Impala | Mariadb | Druid | Pig | SQLite | Influxdata | Singlestore | ElasticSearch | | | | | | | | | | | | | | | | | | | | | | | | | | `LN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `EXP` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `POWER` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes | | `SQRT` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `FLOOR` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `CEIL` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | | `WIDTH_BUCKET` | Yes | No | No | No | Yes | No | Yes | Yes | Yes | Yes | Yes | No | No | No | Yes | No | No | No | No | No | No | No | DS V2 should supports push down these math functions. ### Why are the changes needed? DS V2 supports push down math functions ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New tests. Closes #36140 from beliefer/SPARK-38855. Authored-by: Jiaan Geng Signed-off-by: Wenchen Fan --- .../expressions/GeneralScalarExpression.java | 54 ++ .../sql/connector/util/V2ExpressionSQLBuilder.java | 7 +++ .../spark/sql/errors/QueryCompilationErrors.scala | 4 ++ .../sql/catalyst/util/V2ExpressionBuilder.scala| 28 ++- .../org/apache/spark/sql/jdbc/H2Dialect.scala | 26 +++ .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 28 ++- 6 files changed, 145 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java index 8952761f9ef..58082d5ee09 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java @@ -94,6 +94,60 @@ import org.apache.spark.sql.connector.util.V2ExpressionSQLBuilder; *Since version: 3.3.0 * * + * Name: ABS + * + *SQL semantic: ABS(expr) + *Since version: 3.3.0 + * + * + * Name: COALESCE + * + *SQL semantic: COALESCE(expr1, expr2) + *Since version: 3.3.0 + * + * + * Name: LN + * + *SQL semantic: LN(expr) + *Since version: 3.3.0 + * + * + * Name: EXP + * + *SQL semantic: EXP(expr) + *Since version: 3.3.0 + * + * + * Name: POWER + * + *SQL semantic: POWER(expr, number) + *Since version: 3.3.0 + * + * + * Name: SQRT + * + *SQL semantic: SQRT(expr) + *Since version: 3.3.0 + * + * + * Name: FLOOR + * + *SQL semantic: FLOOR(expr) + *Since version: 3.3.0 + * + * + * Name: CEIL + * + *SQL semantic: CEIL(expr) + *Since version: 3.3.0 + * + * + * Name: WIDTH_BUCKET + * + *SQL semantic: WIDTH_BUCKET(expr) + *Since version: 3.3.0 + * + * * * Note: SQL semantic conforms ANSI standard, so some expressions are not supported when ANSI off, *
[spark] branch master updated: [SPARK-38530][SQL] Fix a bug that GeneratorNestedColumnAliasing can be incorrectly applied to some expressions
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 13edafab9f4 [SPARK-38530][SQL] Fix a bug that GeneratorNestedColumnAliasing can be incorrectly applied to some expressions 13edafab9f4 is described below commit 13edafab9f45cc80aee41e2f82475367d88357ec Author: minyyy AuthorDate: Wed Apr 13 14:01:27 2022 +0800 [SPARK-38530][SQL] Fix a bug that GeneratorNestedColumnAliasing can be incorrectly applied to some expressions ### What changes were proposed in this pull request? This PR makes GeneratorNestedColumnAliasing only be able to apply to GetStructField*(_: AttributeReference), here GetStructField* means nested GetStructField. The current way to collect expressions is a top-down way and it actually only checks 2 levels which is wrong. The rule is simple - If we see expressions other than GetStructField, we are done. When an expression E is pushed down into an Explode, the thing happens is: E(x) is now pushed down to apply to E(array(x)). So only expressions that can operate on both x and array(x) can be pushed. GetStructField is special since we have GetArrayStructFields and when GetStructField is pushed down, it becomes GetArrayStructFields. Any other expressions are not applicable. We also do not even need to check the child type is Array(Array()) or whether the rewritten expression has the pattern GetArrayStructFields(GetArrayStructFields()). 1. When the child input type is Array(Array()), the ExtractValues expressions we get will always start from an innermost GetArrayStructFields, it does not align with GetStructField*(x). 2. When we see GetArrayStructFields(GetArrayStructFields()) in the rewritten generator, we must have seen a GetArrayStructFields in the expressions before pushdown. ### Why are the changes needed? It fixes some correctness issues. See the above section for more details. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests. Closes #35866 from minyyy/gnca_wrong_expr. Lead-authored-by: minyyy Co-authored-by: minyyy <98760575+min...@users.noreply.github.com> Signed-off-by: Wenchen Fan --- .../catalyst/optimizer/NestedColumnAliasing.scala | 50 ++ .../optimizer/NestedColumnAliasingSuite.scala | 40 - 2 files changed, 69 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 9cf2925cdd2..45f84c21b7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -240,12 +240,14 @@ object NestedColumnAliasing { */ def getAttributeToExtractValues( exprList: Seq[Expression], - exclusiveAttrs: Seq[Attribute]): Map[Attribute, Seq[ExtractValue]] = { + exclusiveAttrs: Seq[Attribute], + extractor: (Expression) => Seq[Expression] = collectRootReferenceAndExtractValue) +: Map[Attribute, Seq[ExtractValue]] = { val nestedFieldReferences = new mutable.ArrayBuffer[ExtractValue]() val otherRootReferences = new mutable.ArrayBuffer[AttributeReference]() exprList.foreach { e => - collectRootReferenceAndExtractValue(e).foreach { + extractor(e).foreach { // we can not alias the attr from lambda variable whose expr id is not available case ev: ExtractValue if !ev.exists(_.isInstanceOf[NamedLambdaVariable]) => if (ev.references.size == 1) { @@ -350,23 +352,44 @@ object GeneratorNestedColumnAliasing { return None } val generatorOutputSet = AttributeSet(g.qualifiedGeneratorOutput) - val (attrToExtractValuesOnGenerator, attrToExtractValuesNotOnGenerator) = + var (attrToExtractValuesOnGenerator, attrToExtractValuesNotOnGenerator) = attrToExtractValues.partition { case (attr, _) => attr.references.subsetOf(generatorOutputSet) } val pushedThrough = NestedColumnAliasing.rewritePlanWithAliases( plan, attrToExtractValuesNotOnGenerator) - // If the generator output is `ArrayType`, we cannot push through the extractor. - // It is because we don't allow field extractor on two-level array, - // i.e., attr.field when attr is a ArrayType(ArrayType(...)). - // Similarily, we also cannot push through if the child of generator is `MapType`. + // We cannot push through if the child of generator is `MapType`. g.generator.children.head.dataType match { case _: MapType =>