[spark] branch branch-3.3 updated: [SPARK-38889][SQL] Compile boolean column filters to use the bit type for MSSQL data source

2022-04-13 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 30c6802574e [SPARK-38889][SQL] Compile boolean column filters to use 
the bit type for MSSQL data source
30c6802574e is described below

commit 30c6802574e5993e6f0f10d4c189c6e8325bcc5c
Author: allisonwang-db 
AuthorDate: Thu Apr 14 13:11:00 2022 +0900

[SPARK-38889][SQL] Compile boolean column filters to use the bit type for 
MSSQL data source

### What changes were proposed in this pull request?
This PR compiles the boolean data type to the bit data type for pushed 
column filters while querying the MSSQL data soruce. Microsoft SQL Server does 
not support the boolean type, so the JDBC dialect should use the bit data type 
instead.

### Why are the changes needed?

To fix a bug that was exposed by the boolean column filter pushdown to SQL 
server data source.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a new integration test.

Closes #36182 from allisonwang-db/spark-38889-mssql-predicate-pushdown.

Authored-by: allisonwang-db 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 320f88d54440e05228a90ef5663991e28ae07c95)
Signed-off-by: Hyukjin Kwon 
---
 .../spark/sql/jdbc/MsSqlServerIntegrationSuite.scala| 17 +
 .../org/apache/spark/sql/jdbc/MsSqlServerDialect.scala  | 10 ++
 2 files changed, 27 insertions(+)

diff --git 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
index 5992253a958..e293f9a8f7b 100644
--- 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
+++ 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
@@ -22,6 +22,7 @@ import java.sql.{Connection, Date, Timestamp}
 import java.util.Properties
 
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
+import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.tags.DockerTest
 
@@ -140,6 +141,14 @@ class MsSqlServerIntegrationSuite extends 
DockerJDBCIntegrationSuite {
 |'MULTIPOLYGON(((2 2, 2 -2, -2 -2, -2 2, 2 2)),((1 1, 3 1, 3 3, 1 3, 1 
1)))',
 |'GEOMETRYCOLLECTION(LINESTRING(1 1, 3 5),POLYGON((-1 -1, -1 -5, -5 
-5, -5 -1, -1 -1)))')
   """.stripMargin).executeUpdate()
+conn.prepareStatement(
+  """
+|CREATE TABLE bits(a INT, b INT, c BIT)
+|""".stripMargin).executeUpdate()
+conn.prepareStatement(
+  """
+|INSERT INTO bits VALUES (1, 2, 1)
+  """.stripMargin).executeUpdate()
   }
 
   test("Basic test") {
@@ -357,4 +366,12 @@ class MsSqlServerIntegrationSuite extends 
DockerJDBCIntegrationSuite {
 0, 3, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 0, 
2, 0,
 0, 0, 0, 1, 0, 0, 0, 3))
   }
+
+  test("SPARK-38889: MsSqlServerDialect should handle boolean filter push 
down") {
+val df = spark.read.jdbc(jdbcUrl, "bits", new Properties)
+val rows = df.collect()
+assert(rows.length == 1)
+val filtered = df.where(col("c") === 0).collect()
+assert(filtered.length == 0)
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 8d2fbec55f9..a42129dbe8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -40,6 +40,16 @@ private object MsSqlServerDialect extends JdbcDialect {
   override def canHandle(url: String): Boolean =
 url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")
 
+  // Microsoft SQL Server does not have the boolean type.
+  // Compile the boolean value to the bit data type instead.
+  // scalastyle:off line.size.limit
+  // See 
https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver15
+  // scalastyle:on line.size.limit
+  override def compileValue(value: Any): Any = value match {
+case booleanValue: Boolean => if (booleanValue) 1 else 0
+case other => super.compileValue(other)
+  }
+
   // scalastyle:off line.size.limit
   // See 
https://docs.microsoft.com/en-us/sql/t-sql/functions/aggregate-functions-transact-sql?view=sql-server-ver15
   // scalastyle:on line.size.limit


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[spark] branch master updated: [SPARK-38889][SQL] Compile boolean column filters to use the bit type for MSSQL data source

2022-04-13 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 320f88d5444 [SPARK-38889][SQL] Compile boolean column filters to use 
the bit type for MSSQL data source
320f88d5444 is described below

commit 320f88d54440e05228a90ef5663991e28ae07c95
Author: allisonwang-db 
AuthorDate: Thu Apr 14 13:11:00 2022 +0900

[SPARK-38889][SQL] Compile boolean column filters to use the bit type for 
MSSQL data source

### What changes were proposed in this pull request?
This PR compiles the boolean data type to the bit data type for pushed 
column filters while querying the MSSQL data soruce. Microsoft SQL Server does 
not support the boolean type, so the JDBC dialect should use the bit data type 
instead.

### Why are the changes needed?

To fix a bug that was exposed by the boolean column filter pushdown to SQL 
server data source.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a new integration test.

Closes #36182 from allisonwang-db/spark-38889-mssql-predicate-pushdown.

Authored-by: allisonwang-db 
Signed-off-by: Hyukjin Kwon 
---
 .../spark/sql/jdbc/MsSqlServerIntegrationSuite.scala| 17 +
 .../org/apache/spark/sql/jdbc/MsSqlServerDialect.scala  | 10 ++
 2 files changed, 27 insertions(+)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
index 5992253a958..e293f9a8f7b 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MsSqlServerIntegrationSuite.scala
@@ -22,6 +22,7 @@ import java.sql.{Connection, Date, Timestamp}
 import java.util.Properties
 
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
+import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.tags.DockerTest
 
@@ -140,6 +141,14 @@ class MsSqlServerIntegrationSuite extends 
DockerJDBCIntegrationSuite {
 |'MULTIPOLYGON(((2 2, 2 -2, -2 -2, -2 2, 2 2)),((1 1, 3 1, 3 3, 1 3, 1 
1)))',
 |'GEOMETRYCOLLECTION(LINESTRING(1 1, 3 5),POLYGON((-1 -1, -1 -5, -5 
-5, -5 -1, -1 -1)))')
   """.stripMargin).executeUpdate()
+conn.prepareStatement(
+  """
+|CREATE TABLE bits(a INT, b INT, c BIT)
+|""".stripMargin).executeUpdate()
+conn.prepareStatement(
+  """
+|INSERT INTO bits VALUES (1, 2, 1)
+  """.stripMargin).executeUpdate()
   }
 
   test("Basic test") {
@@ -357,4 +366,12 @@ class MsSqlServerIntegrationSuite extends 
DockerJDBCIntegrationSuite {
 0, 3, 0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 0, 
2, 0,
 0, 0, 0, 1, 0, 0, 0, 3))
   }
+
+  test("SPARK-38889: MsSqlServerDialect should handle boolean filter push 
down") {
+val df = spark.read.jdbc(jdbcUrl, "bits", new Properties)
+val rows = df.collect()
+assert(rows.length == 1)
+val filtered = df.where(col("c") === 0).collect()
+assert(filtered.length == 0)
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index 8d2fbec55f9..a42129dbe8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -40,6 +40,16 @@ private object MsSqlServerDialect extends JdbcDialect {
   override def canHandle(url: String): Boolean =
 url.toLowerCase(Locale.ROOT).startsWith("jdbc:sqlserver")
 
+  // Microsoft SQL Server does not have the boolean type.
+  // Compile the boolean value to the bit data type instead.
+  // scalastyle:off line.size.limit
+  // See 
https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver15
+  // scalastyle:on line.size.limit
+  override def compileValue(value: Any): Any = value match {
+case booleanValue: Boolean => if (booleanValue) 1 else 0
+case other => super.compileValue(other)
+  }
+
   // scalastyle:off line.size.limit
   // See 
https://docs.microsoft.com/en-us/sql/t-sql/functions/aggregate-functions-transact-sql?view=sql-server-ver15
   // scalastyle:on line.size.limit


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38857][PYTHON] series name should be preserved in series.mode()

2022-04-13 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5d763eb63b6 [SPARK-38857][PYTHON] series name should be preserved in 
series.mode()
5d763eb63b6 is described below

commit 5d763eb63b67d4fee5972559ddfe0ff3e0e8e210
Author: Yikun Jiang 
AuthorDate: Thu Apr 14 10:27:19 2022 +0900

[SPARK-38857][PYTHON] series name should be preserved in series.mode()

### What changes were proposed in this pull request?

series name is preserved in `series.mode`.

### Why are the changes needed?

series name should be preserved in series.mode() to follow pandas 1.4.x 
behavior.

### Does this PR introduce _any_ user-facing change?
Yes, if series set name, it will be preserved in series.mode()

### How was this patch tested?

UT test both in before and after 1.4.x

Closes #36159 from Yikun/SPARK-38857.

Authored-by: Yikun Jiang 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/series.py| 8 ++--
 python/pyspark/pandas/tests/test_series.py | 7 ++-
 2 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index da1d41c2abe..f4638fe22de 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -4523,6 +4523,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
 
 Always returns Series even if only one value is returned.
 
+.. versionchanged:: 3.4.0
+   Series name is preserved to follow pandas 1.4+ behavior.
+
 Parameters
 --
 dropna : bool, default True
@@ -4597,8 +4600,9 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
 F.col(SPARK_DEFAULT_INDEX_NAME).alias(SPARK_DEFAULT_SERIES_NAME)
 )
 internal = InternalFrame(spark_frame=sdf, index_spark_columns=None, 
column_labels=[None])
-
-return first_series(DataFrame(internal))
+ser_mode = first_series(DataFrame(internal))
+ser_mode.name = self.name
+return ser_mode
 
 def keys(self) -> "ps.Index":
 """
diff --git a/python/pyspark/pandas/tests/test_series.py 
b/python/pyspark/pandas/tests/test_series.py
index 76d35c51196..68fed26324d 100644
--- a/python/pyspark/pandas/tests/test_series.py
+++ b/python/pyspark/pandas/tests/test_series.py
@@ -2121,7 +2121,12 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
 
 pser.name = "x"
 psser = ps.from_pandas(pser)
-self.assert_eq(psser.mode(), pser.mode())
+if LooseVersion(pd.__version__) < LooseVersion("1.4"):
+# Due to pandas bug: 
https://github.com/pandas-dev/pandas/issues/46737
+psser.name = None
+self.assert_eq(psser.mode(), pser.mode())
+else:
+self.assert_eq(psser.mode(), pser.mode())
 self.assert_eq(
 psser.mode(dropna=False).sort_values().reset_index(drop=True),
 pser.mode(dropna=False).sort_values().reset_index(drop=True),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38797][SQL] Runtime Filter supports pruning side has window

2022-04-13 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 78700d939c4 [SPARK-38797][SQL] Runtime Filter supports pruning side 
has window
78700d939c4 is described below

commit 78700d939c42404ce6bd420094e13a258875949b
Author: Yuming Wang 
AuthorDate: Thu Apr 14 08:39:15 2022 +0800

[SPARK-38797][SQL] Runtime Filter supports pruning side has window

### What changes were proposed in this pull request?

1. Makes row-level runtime filtering support pruning side has window. For 
example:
```sql
SELECT *
FROM   (SELECT *,
   Row_number() OVER ( partition BY c1 ORDER BY f1) rn
FROM   bf1) bf1
   JOIN bf2
 ON bf1.c1 = bf2.c2
WHERE  bf2.a2 = 62
```

After this PR:
```
== Optimized Logical Plan ==
Join Inner, (c1#45922 = c2#45928), Statistics(sizeInBytes=12.3 MiB)
:- Window [row_number() windowspecdefinition(c1#45922, f1#45925 ASC 
NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS rn#45976], [c1#45922], [f1#45925 ASC NULLS FIRST], 
Statistics(sizeInBytes=3.7 KiB)
:  +- Filter (isnotnull(c1#45922) AND 
might_contain(scalar-subquery#45993 [], xxhash64(c1#45922, 42))), 
Statistics(sizeInBytes=3.3 KiB)
: :  +- Aggregate [bloom_filter_agg(xxhash64(c2#45928, 42), 
100, 8388608, 0, 0) AS bloomFilter#45992], Statistics(sizeInBytes=108.0 B, 
rowCount=1)
: : +- Project [c2#45928], Statistics(sizeInBytes=1278.0 B)
: :+- Filter ((isnotnull(a2#45926) AND (a2#45926 = 62)) AND 
isnotnull(c2#45928)), Statistics(sizeInBytes=3.3 KiB)
: :   +- Relation 
default.bf2[a2#45926,b2#45927,c2#45928,d2#45929,e2#45930,f2#45931] parquet, 
Statistics(sizeInBytes=3.3 KiB)
: +- Relation 
default.bf1[a1#45920,b1#45921,c1#45922,d1#45923,e1#45924,f1#45925] parquet, 
Statistics(sizeInBytes=3.3 KiB)
+- Filter ((isnotnull(a2#45926) AND (a2#45926 = 62)) AND 
isnotnull(c2#45928)), Statistics(sizeInBytes=3.3 KiB)
   +- Relation 
default.bf2[a2#45926,b2#45927,c2#45928,d2#45929,e2#45930,f2#45931] parquet, 
Statistics(sizeInBytes=3.3 KiB)
```

2. Make sure injected filters could push through Shuffle if current join is 
a broadcast join.

### Why are the changes needed?

Improve query performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #36080 from wangyum/SPARK-38797.

Lead-authored-by: Yuming Wang 
Co-authored-by: Yuming Wang 
Signed-off-by: Yuming Wang 
---
 .../catalyst/optimizer/InjectRuntimeFilter.scala   |  5 +++--
 .../spark/sql/InjectRuntimeFilterSuite.scala   | 26 ++
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 134292ae30d..01c1786e05a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -141,6 +141,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
 plan.exists {
   case Join(left, right, _, _, hint) => isProbablyShuffleJoin(left, right, 
hint)
   case _: Aggregate => true
+  case _: Window => true
   case _ => false
 }
   }
@@ -172,8 +173,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
 
   /**
* Check that:
-   * - The filterApplicationSideJoinExp can be pushed down through joins and 
aggregates (ie the
-   *   expression references originate from a single leaf node)
+   * - The filterApplicationSideJoinExp can be pushed down through joins, 
aggregates and windows
+   *   (ie the expression references originate from a single leaf node)
* - The filter creation side has a selective predicate
* - The current join is a shuffle join or a broadcast join that has a 
shuffle below it
* - The max filterApplicationSide scan size is greater than a configurable 
threshold
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
index 726fa341b5c..6065f232109 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala
@@ -539,4 +539,30 @@ class InjectRuntimeFilterSuite 

[spark] branch master updated: [SPARK-38890][PYTHON] Implement `ignore_index` of `DataFrame.sort_index`

2022-04-13 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e549c6fd22a [SPARK-38890][PYTHON] Implement `ignore_index` of 
`DataFrame.sort_index`
e549c6fd22a is described below

commit e549c6fd22ac0d5a6df0d817212637c532b9a681
Author: Xinrong Meng 
AuthorDate: Thu Apr 14 09:34:13 2022 +0900

[SPARK-38890][PYTHON] Implement `ignore_index` of `DataFrame.sort_index`

### What changes were proposed in this pull request?
Implement `ignore_index` of `DataFrame.sort_index`.

### Why are the changes needed?
To reach parity with pandas API.

### Does this PR introduce _any_ user-facing change?
Yes. `ignore_index` of `DataFrame.sort_index` is supported as below:
```py
>>> df = ps.DataFrame({'A': [2, 1, np.nan]}, index=['b', 'a', np.nan])
>>> df
   A
b2.0
a1.0
NaN  NaN
>>> df.sort_index(ignore_index=True)
 A
0  1.0
1  2.0
2  NaN
```

### How was this patch tested?
Unit tests.

Closes #36184 from xinrong-databricks/frame.sort_index.ignore_index.

Authored-by: Xinrong Meng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/frame.py| 22 +-
 python/pyspark/pandas/tests/test_dataframe.py | 12 
 2 files changed, 33 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 36e992fef93..a78aaa66f08 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -7014,6 +7014,7 @@ defaultdict(, {'col..., 'col...})]
 inplace: bool = False,
 kind: str = None,
 na_position: str = "last",
+ignore_index: bool = False,
 ) -> Optional["DataFrame"]:
 """
 Sort object by labels (along an axis)
@@ -7033,6 +7034,10 @@ defaultdict(, {'col..., 'col...})]
 na_position : {‘first’, ‘last’}, default ‘last’
 first puts NaNs at the beginning, last puts NaNs at the end. Not 
implemented for
 MultiIndex.
+ignore_index : bool, default False
+If True, the resulting axis will be labeled 0, 1, …, n - 1.
+
+.. versionadded:: 3.4.0
 
 Returns
 ---
@@ -7060,6 +7065,12 @@ defaultdict(, {'col..., 'col...})]
 a1.0
 b2.0
 
+>>> df.sort_index(ignore_index=True)
+ A
+0  1.0
+1  2.0
+2  NaN
+
 >>> df.sort_index(inplace=True)
 >>> df
A
@@ -7091,6 +7102,13 @@ defaultdict(, {'col..., 'col...})]
 b 0  1  2
 a 1  2  1
 b 1  0  3
+
+>>> df.sort_index(ignore_index=True)
+   A  B
+0  3  0
+1  2  1
+2  1  2
+3  0  3
 """
 inplace = validate_bool_kwarg(inplace, "inplace")
 axis = validate_axis(axis)
@@ -7112,10 +7130,12 @@ defaultdict(, {'col..., 'col...})]
 
 psdf = self._sort(by=by, ascending=ascending, na_position=na_position)
 if inplace:
+if ignore_index:
+psdf.reset_index(drop=True, inplace=inplace)
 self._update_internal_frame(psdf._internal)
 return None
 else:
-return psdf
+return psdf.reset_index(drop=True) if ignore_index else psdf
 
 def swaplevel(
 self, i: Union[int, Name] = -2, j: Union[int, Name] = -1, axis: Axis = 0
diff --git a/python/pyspark/pandas/tests/test_dataframe.py 
b/python/pyspark/pandas/tests/test_dataframe.py
index fa32b38d3c9..b99a9a2e807 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -1678,6 +1678,8 @@ class DataFrameTest(ComparisonTestBase, SQLTestUtils):
 
 # Assert default behavior without parameters
 self.assert_eq(psdf.sort_index(), pdf.sort_index())
+# Assert ignoring index
+self.assert_eq(psdf.sort_index(ignore_index=True), 
pdf.sort_index(ignore_index=True))
 # Assert sorting descending
 self.assert_eq(psdf.sort_index(ascending=False), 
pdf.sort_index(ascending=False))
 # Assert sorting NA indices first
@@ -1694,6 +1696,14 @@ class DataFrameTest(ComparisonTestBase, SQLTestUtils):
 self.assertEqual(psdf.sort_index(inplace=True), 
pdf.sort_index(inplace=True))
 self.assert_eq(psdf, pdf)
 self.assert_eq(psserA, pserA)
+pserA = pdf.A
+psserA = psdf.A
+self.assertEqual(
+psdf.sort_index(inplace=True, ascending=False, ignore_index=True),
+pdf.sort_index(inplace=True, ascending=False, ignore_index=True),
+)
+self.assert_eq(psdf, pdf)
+self.assert_eq(psserA, pserA)
 
 # Assert multi-indices
 pdf = pd.DataFrame(
@@ 

[spark] branch master updated (c0c1f35cd92 -> 0085bfc8aca)

2022-04-13 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from c0c1f35cd92 [SPARK-37014][PYTHON] Inline type hints for 
python/pyspark/streaming/context.py
 add 0085bfc8aca [SPARK-38107][SQL][FOLLOWUP] Refine the error-class name 
and message for grouped agg pandas UDF

No new revisions were added by this update.

Summary of changes:
 core/src/main/resources/error/error-classes.json|  6 +++---
 python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py |  4 +++-
 .../apache/spark/sql/errors/QueryCompilationErrors.scala|  7 ---
 .../org/apache/spark/sql/execution/SparkStrategies.scala|  7 +--
 .../spark/sql/errors/QueryCompilationErrorsSuite.scala  | 13 -
 5 files changed, 23 insertions(+), 14 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-37014][PYTHON] Inline type hints for python/pyspark/streaming/context.py

2022-04-13 Thread zero323
This is an automated email from the ASF dual-hosted git repository.

zero323 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new baaa3bbecd9 [SPARK-37014][PYTHON] Inline type hints for 
python/pyspark/streaming/context.py
baaa3bbecd9 is described below

commit baaa3bbecd9f63aa0a71cf76de4b53d3c1dcf7a4
Author: dch nguyen 
AuthorDate: Thu Apr 14 02:03:24 2022 +0200

[SPARK-37014][PYTHON] Inline type hints for 
python/pyspark/streaming/context.py

### What changes were proposed in this pull request?
Inline type hints for python/pyspark/streaming/context.py from Inline type 
hints for python/pyspark/streaming/context.pyi.

### Why are the changes needed?
Currently, there is type hint stub files 
python/pyspark/streaming/context.pyi to show the expected types for functions, 
but we can also take advantage of static type checking within the functions by 
inlining the type hints.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing test.

Closes #34293 from dchvn/SPARK-37014.

Authored-by: dch nguyen 
Signed-off-by: zero323 
(cherry picked from commit c0c1f35cd9279bc1a7a50119be72a297162a9b55)
Signed-off-by: zero323 
---
 python/pyspark/streaming/context.py  | 123 ---
 python/pyspark/streaming/context.pyi |  71 
 python/pyspark/streaming/kinesis.py  |   9 +--
 3 files changed, 91 insertions(+), 112 deletions(-)

diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index cc9875d6575..52e5efed063 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -14,18 +14,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from typing import Any, Callable, List, Optional, TypeVar
 
-from py4j.java_gateway import java_import, is_instance_of
+from py4j.java_gateway import java_import, is_instance_of, JavaObject
 
 from pyspark import RDD, SparkConf
 from pyspark.serializers import NoOpSerializer, UTF8Deserializer, 
CloudPickleSerializer
 from pyspark.context import SparkContext
 from pyspark.storagelevel import StorageLevel
 from pyspark.streaming.dstream import DStream
+from pyspark.streaming.listener import StreamingListener
 from pyspark.streaming.util import TransformFunction, 
TransformFunctionSerializer
 
 __all__ = ["StreamingContext"]
 
+T = TypeVar("T")
+
 
 class StreamingContext:
 """
@@ -51,27 +55,35 @@ class StreamingContext:
 # Reference to a currently active StreamingContext
 _activeContext = None
 
-def __init__(self, sparkContext, batchDuration=None, jssc=None):
-
+def __init__(
+self,
+sparkContext: SparkContext,
+batchDuration: Optional[int] = None,
+jssc: Optional[JavaObject] = None,
+):
 self._sc = sparkContext
 self._jvm = self._sc._jvm
 self._jssc = jssc or self._initialize_context(self._sc, batchDuration)
 
-def _initialize_context(self, sc, duration):
+def _initialize_context(self, sc: SparkContext, duration: Optional[int]) 
-> JavaObject:
 self._ensure_initialized()
+assert self._jvm is not None and duration is not None
 return self._jvm.JavaStreamingContext(sc._jsc, 
self._jduration(duration))
 
-def _jduration(self, seconds):
+def _jduration(self, seconds: int) -> JavaObject:
 """
 Create Duration object given number of seconds
 """
+assert self._jvm is not None
 return self._jvm.Duration(int(seconds * 1000))
 
 @classmethod
-def _ensure_initialized(cls):
+def _ensure_initialized(cls) -> None:
 SparkContext._ensure_initialized()
 gw = SparkContext._gateway
 
+assert gw is not None
+
 java_import(gw.jvm, "org.apache.spark.streaming.*")
 java_import(gw.jvm, "org.apache.spark.streaming.api.java.*")
 java_import(gw.jvm, "org.apache.spark.streaming.api.python.*")
@@ -83,11 +95,15 @@ class StreamingContext:
 # register serializer for TransformFunction
 # it happens before creating SparkContext when loading from 
checkpointing
 cls._transformerSerializer = TransformFunctionSerializer(
-SparkContext._active_spark_context, CloudPickleSerializer(), gw
+SparkContext._active_spark_context,
+CloudPickleSerializer(),
+gw,
 )
 
 @classmethod
-def getOrCreate(cls, checkpointPath, setupFunc):
+def getOrCreate(
+cls, checkpointPath: str, setupFunc: Callable[[], "StreamingContext"]
+) -> "StreamingContext":
 """
 Either recreate a StreamingContext from checkpoint data or create a 
new StreamingContext.
 If checkpoint data exists in the 

[spark] branch master updated: [SPARK-37014][PYTHON] Inline type hints for python/pyspark/streaming/context.py

2022-04-13 Thread zero323
This is an automated email from the ASF dual-hosted git repository.

zero323 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c0c1f35cd92 [SPARK-37014][PYTHON] Inline type hints for 
python/pyspark/streaming/context.py
c0c1f35cd92 is described below

commit c0c1f35cd9279bc1a7a50119be72a297162a9b55
Author: dch nguyen 
AuthorDate: Thu Apr 14 02:03:24 2022 +0200

[SPARK-37014][PYTHON] Inline type hints for 
python/pyspark/streaming/context.py

### What changes were proposed in this pull request?
Inline type hints for python/pyspark/streaming/context.py from Inline type 
hints for python/pyspark/streaming/context.pyi.

### Why are the changes needed?
Currently, there is type hint stub files 
python/pyspark/streaming/context.pyi to show the expected types for functions, 
but we can also take advantage of static type checking within the functions by 
inlining the type hints.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing test.

Closes #34293 from dchvn/SPARK-37014.

Authored-by: dch nguyen 
Signed-off-by: zero323 
---
 python/pyspark/streaming/context.py  | 123 ---
 python/pyspark/streaming/context.pyi |  71 
 python/pyspark/streaming/kinesis.py  |   9 +--
 3 files changed, 91 insertions(+), 112 deletions(-)

diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index cc9875d6575..52e5efed063 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -14,18 +14,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+from typing import Any, Callable, List, Optional, TypeVar
 
-from py4j.java_gateway import java_import, is_instance_of
+from py4j.java_gateway import java_import, is_instance_of, JavaObject
 
 from pyspark import RDD, SparkConf
 from pyspark.serializers import NoOpSerializer, UTF8Deserializer, 
CloudPickleSerializer
 from pyspark.context import SparkContext
 from pyspark.storagelevel import StorageLevel
 from pyspark.streaming.dstream import DStream
+from pyspark.streaming.listener import StreamingListener
 from pyspark.streaming.util import TransformFunction, 
TransformFunctionSerializer
 
 __all__ = ["StreamingContext"]
 
+T = TypeVar("T")
+
 
 class StreamingContext:
 """
@@ -51,27 +55,35 @@ class StreamingContext:
 # Reference to a currently active StreamingContext
 _activeContext = None
 
-def __init__(self, sparkContext, batchDuration=None, jssc=None):
-
+def __init__(
+self,
+sparkContext: SparkContext,
+batchDuration: Optional[int] = None,
+jssc: Optional[JavaObject] = None,
+):
 self._sc = sparkContext
 self._jvm = self._sc._jvm
 self._jssc = jssc or self._initialize_context(self._sc, batchDuration)
 
-def _initialize_context(self, sc, duration):
+def _initialize_context(self, sc: SparkContext, duration: Optional[int]) 
-> JavaObject:
 self._ensure_initialized()
+assert self._jvm is not None and duration is not None
 return self._jvm.JavaStreamingContext(sc._jsc, 
self._jduration(duration))
 
-def _jduration(self, seconds):
+def _jduration(self, seconds: int) -> JavaObject:
 """
 Create Duration object given number of seconds
 """
+assert self._jvm is not None
 return self._jvm.Duration(int(seconds * 1000))
 
 @classmethod
-def _ensure_initialized(cls):
+def _ensure_initialized(cls) -> None:
 SparkContext._ensure_initialized()
 gw = SparkContext._gateway
 
+assert gw is not None
+
 java_import(gw.jvm, "org.apache.spark.streaming.*")
 java_import(gw.jvm, "org.apache.spark.streaming.api.java.*")
 java_import(gw.jvm, "org.apache.spark.streaming.api.python.*")
@@ -83,11 +95,15 @@ class StreamingContext:
 # register serializer for TransformFunction
 # it happens before creating SparkContext when loading from 
checkpointing
 cls._transformerSerializer = TransformFunctionSerializer(
-SparkContext._active_spark_context, CloudPickleSerializer(), gw
+SparkContext._active_spark_context,
+CloudPickleSerializer(),
+gw,
 )
 
 @classmethod
-def getOrCreate(cls, checkpointPath, setupFunc):
+def getOrCreate(
+cls, checkpointPath: str, setupFunc: Callable[[], "StreamingContext"]
+) -> "StreamingContext":
 """
 Either recreate a StreamingContext from checkpoint data or create a 
new StreamingContext.
 If checkpoint data exists in the provided `checkpointPath`, then 
StreamingContext will be
@@ -104,6 +120,8 @@ class StreamingContext:
 

[spark] branch master updated (43c63337f98 -> e0c9604f0c4)

2022-04-13 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 43c63337f98 [SPARK-34659][UI] Forbid using keyword "proxy" or 
"history" in reverse proxy URL
 add e0c9604f0c4 [SPARK-38835][CORE][TESTS] Refactor 
`FsHistoryProviderSuite` to add UTs for RocksDB

No new revisions were added by this update.

Summary of changes:
 .../spark/deploy/history/FsHistoryProviderSuite.scala | 19 ---
 1 file changed, 16 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-34659][UI] Forbid using keyword "proxy" or "history" in reverse proxy URL

2022-04-13 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 43c63337f98 [SPARK-34659][UI] Forbid using keyword "proxy" or 
"history" in reverse proxy URL
43c63337f98 is described below

commit 43c63337f98097b046b70fcfb4fae44a3295b513
Author: Gengliang Wang 
AuthorDate: Wed Apr 13 10:50:52 2022 -0700

[SPARK-34659][UI] Forbid using keyword "proxy" or "history" in reverse 
proxy URL

### What changes were proposed in this pull request?

When the reverse proxy URL contains "proxy" or "history", the application 
ID in UI is wrongly parsed.
For example, if we set spark.ui.reverseProxyURL as "/test/proxy/prefix" or 
"/test/history/prefix", the application ID is parsed as "prefix" and the 
related API calls will fail in stages/executors pages:
```
.../api/v1/applications/prefix/allexecutors
```
instead of
```
.../api/v1/applications/app-20220413142241-/allexecutors
```

There are more contexts in https://github.com/apache/spark/pull/31774
We can fix this entirely like https://github.com/apache/spark/pull/36174, 
but it is risky and complicated to do that.

### Why are the changes needed?

Avoid users setting keywords in reverse proxy URL and getting wrong UI 
results.

### Does this PR introduce _any_ user-facing change?

No
### How was this patch tested?

A new unit test.
Also doc preview:
https://user-images.githubusercontent.com/1097932/163126641-da315012-aae5-45a5-a048-340a5dd6e91e.png;>

Closes #36176 from gengliangwang/forbidURLPrefix.

Authored-by: Gengliang Wang 
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/SparkContext.scala   |  5 ++---
 core/src/main/scala/org/apache/spark/internal/config/UI.scala |  5 +
 core/src/test/scala/org/apache/spark/SparkContextSuite.scala  | 11 +++
 docs/configuration.md |  4 +++-
 4 files changed, 21 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 7257371256d..c6cb5cb5e19 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -592,9 +592,8 @@ class SparkContext(config: SparkConf) extends Logging {
   _env.blockManager.blockStoreClient.setAppAttemptId(attemptId)
 }
 if (_conf.get(UI_REVERSE_PROXY)) {
-  val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") +
-"/proxy/" + _applicationId
-  System.setProperty("spark.ui.proxyBase", proxyUrl)
+  val proxyUrl = 
_conf.get(UI_REVERSE_PROXY_URL).getOrElse("").stripSuffix("/")
+  System.setProperty("spark.ui.proxyBase", proxyUrl + "/proxy/" + 
_applicationId)
 }
 _ui.foreach(_.setAppId(_applicationId))
 _env.blockManager.initialize(_applicationId)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala 
b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
index 1790e97b35a..464034b8fcd 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
@@ -79,6 +79,11 @@ private[spark] object UI {
   "reach your proxy.")
 .version("2.1.0")
 .stringConf
+.checkValue ({ s =>
+  val words = s.split("/")
+  !words.contains("proxy") && !words.contains("history") },
+  "Cannot use the keyword 'proxy' or 'history' in reverse proxy URL. Spark 
UI relies on both " +
+"keywords for getting REST API endpoints from URIs.")
 .createOptional
 
   val UI_KILL_ENABLED = ConfigBuilder("spark.ui.killEnabled")
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 8671180b3ca..c64a4371911 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -1343,6 +1343,17 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 assert(env.blockManager.blockStoreClient.getAppAttemptId.equals("1"))
   }
 
+  test("SPARK-34659: check invalid UI_REVERSE_PROXY_URL") {
+val reverseProxyUrl = "http://proxyhost:8080/path/proxy/spark;
+val conf = new SparkConf().setAppName("testAppAttemptId")
+  .setMaster("pushbasedshuffleclustermanager")
+conf.set(UI_REVERSE_PROXY, true)
+conf.set(UI_REVERSE_PROXY_URL, reverseProxyUrl)
+val msg = intercept[java.lang.IllegalArgumentException] {
+  new SparkContext(conf)
+}.getMessage
+assert(msg.contains("Cannot use the keyword 'proxy' or 'history' in 
reverse proxy URL"))
+  }
 }
 
 object 

[spark] branch master updated: [SPARK-38678][TESTS][FOLLOWUP] Enable RocksDB tests in `AppStatusStoreSuite` and `StreamingQueryStatusListenerSuite` on Apple Silicon on MacOS

2022-04-13 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7ec3a5730bd [SPARK-38678][TESTS][FOLLOWUP] Enable RocksDB tests in 
`AppStatusStoreSuite` and `StreamingQueryStatusListenerSuite` on Apple Silicon 
on MacOS
7ec3a5730bd is described below

commit 7ec3a5730bd864089c22e19e823d87b107688378
Author: yangjie01 
AuthorDate: Wed Apr 13 10:45:22 2022 -0700

[SPARK-38678][TESTS][FOLLOWUP] Enable RocksDB tests in 
`AppStatusStoreSuite` and `StreamingQueryStatusListenerSuite` on Apple Silicon 
on MacOS

### What changes were proposed in this pull request?
This pr aims to enable RocksDB tests in `AppStatusStoreSuite` and 
`StreamingQueryStatusListenerSuite` on Apple Silicon on MacOS, it's a followup 
of SPARK-38678.

### Why are the changes needed?
Enable more RocksDB related test on Apple Silicon on MacOS

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass GA
- Manual test on Apple Silicon environment:

```
build/sbt "core/testOnly *AppStatusStoreSuite*" "sql/testOnly 
*StreamingQueryStatusListenerSuite*"
```

All tests passed.

Closes #36139 from LuciferYang/SPARK-38678-FOLLOWUP.

Authored-by: yangjie01 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/status/AppStatusStoreSuite.scala  | 26 +-
 .../ui/StreamingQueryStatusListenerSuite.scala |  1 -
 2 files changed, 16 insertions(+), 11 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
index 70852164b89..b05f2b799d2 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -90,10 +90,6 @@ class AppStatusStoreSuite extends SparkFunSuite {
 if (live) {
   return AppStatusStore.createLiveStore(conf)
 }
-// LevelDB doesn't support Apple Silicon yet
-if (Utils.isMacOnAppleSilicon && disk) {
-  return null
-}
 
 val store: KVStore = if (disk) {
   conf.set(HYBRID_STORE_DISK_BACKEND, diskStoreType.toString)
@@ -106,12 +102,22 @@ class AppStatusStoreSuite extends SparkFunSuite {
 new AppStatusStore(store)
   }
 
-  Seq(
-"disk leveldb" -> createAppStore(disk = true, 
HybridStoreDiskBackend.LEVELDB, live = false),
-"disk rocksdb" -> createAppStore(disk = true, 
HybridStoreDiskBackend.ROCKSDB, live = false),
-"in memory" -> createAppStore(disk = false, live = false),
-"in memory live" -> createAppStore(disk = false, live = true)
-  ).foreach { case (hint, appStore) =>
+  private val cases = {
+val baseCases = Seq(
+  "disk rocksdb" -> createAppStore(disk = true, 
HybridStoreDiskBackend.ROCKSDB, live = false),
+  "in memory" -> createAppStore(disk = false, live = false),
+  "in memory live" -> createAppStore(disk = false, live = true)
+)
+if (Utils.isMacOnAppleSilicon) {
+  baseCases
+} else {
+  Seq(
+"disk leveldb" -> createAppStore(disk = true, 
HybridStoreDiskBackend.LEVELDB, live = false)
+  ) ++ baseCases
+}
+  }
+
+  cases.foreach { case (hint, appStore) =>
 test(s"SPARK-26260: summary should contain only successful tasks' metrics 
(store = $hint)") {
   assume(appStore != null)
   val store = appStore.store
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
index 1d1b51354f8..1a51b58f4f6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -236,7 +236,6 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
   }
 
   test("SPARK-38056: test writing StreamingQueryData to a RocksDB store") {
-assume(!Utils.isMacOnAppleSilicon)
 val conf = new SparkConf()
   .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString)
 val testDir = Utils.createTempDir()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38745][SQL][TESTS] Move the tests for `NON_PARTITION_COLUMN` to `QueryCompilationErrorsDSv2Suite`

2022-04-13 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 98fbbcdac5d [SPARK-38745][SQL][TESTS] Move the tests for 
`NON_PARTITION_COLUMN` to `QueryCompilationErrorsDSv2Suite`
98fbbcdac5d is described below

commit 98fbbcdac5defebec81626dd1dbd5522a2fd910b
Author: Max Gekk 
AuthorDate: Wed Apr 13 17:39:40 2022 +0300

[SPARK-38745][SQL][TESTS] Move the tests for `NON_PARTITION_COLUMN` to 
`QueryCompilationErrorsDSv2Suite`

### What changes were proposed in this pull request?
Move test for the error class `NON_PARTITION_COLUMN` from 
`InsertIntoSQLOnlyTests` to `QueryCompilationErrorsDSv2Suite`.

### Why are the changes needed?
To improve code maintenance - all tests for error classes are placed to 
Query.*ErrorsSuite. Also exception are raised from 
[QueryCompilationErrors](https://github.com/apache/spark/blob/bf75b495e18ed87d0c118bfd5f1ceb52d720cad9/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala#L100-L104),
 so, tests should be in `QueryCompilationErrorsDSv2Suite` for consistency.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the moved tests:
```
$ build/sbt "test:testOnly *QueryCompilationErrorsDSv2Suite"
```

Closes #36175 from MaxGekk/move-tests-for-NON_PARTITION_COLUMN.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 .../spark/sql/connector/InsertIntoTests.scala  | 36 +---
 .../errors/QueryCompilationErrorsDSv2Suite.scala   | 49 +++---
 2 files changed, 45 insertions(+), 40 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
index fc98cfd5138..7493966790c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
@@ -198,7 +198,7 @@ trait InsertIntoSQLOnlyTests
   /** Whether to include the SQL specific tests in this trait within the 
extending test suite. */
   protected val includeSQLOnlyTests: Boolean
 
-  private def withTableAndData(tableName: String)(testFn: String => Unit): 
Unit = {
+  protected def withTableAndData(tableName: String)(testFn: String => Unit): 
Unit = {
 withTable(tableName) {
   val viewName = "tmp_view"
   val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, 
"c"))).toDF("id", "data")
@@ -248,40 +248,6 @@ trait InsertIntoSQLOnlyTests
   }
 }
 
-test("InsertInto: static PARTITION clause fails with non-partition 
column") {
-  val t1 = s"${catalogAndNamespace}tbl"
-  withTableAndData(t1) { view =>
-sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (data)")
-
-val exc = intercept[AnalysisException] {
-  sql(s"INSERT INTO TABLE $t1 PARTITION (id=1) SELECT data FROM $view")
-}
-
-verifyTable(t1, spark.emptyDataFrame)
-assert(exc.getMessage.contains(
-  "PARTITION clause cannot contain a non-partition column name"))
-assert(exc.getMessage.contains("id"))
-assert(exc.getErrorClass == "NON_PARTITION_COLUMN")
-  }
-}
-
-test("InsertInto: dynamic PARTITION clause fails with non-partition 
column") {
-  val t1 = s"${catalogAndNamespace}tbl"
-  withTableAndData(t1) { view =>
-sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format 
PARTITIONED BY (id)")
-
-val exc = intercept[AnalysisException] {
-  sql(s"INSERT INTO TABLE $t1 PARTITION (data) SELECT * FROM $view")
-}
-
-verifyTable(t1, spark.emptyDataFrame)
-assert(exc.getMessage.contains(
-  "PARTITION clause cannot contain a non-partition column name"))
-assert(exc.getMessage.contains("data"))
-assert(exc.getErrorClass == "NON_PARTITION_COLUMN")
-  }
-}
-
 test("InsertInto: overwrite - dynamic clause - static mode") {
   withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.STATIC.toString) {
 val t1 = s"${catalogAndNamespace}tbl"
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala
index bfea3f535dd..042f130d7f5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsDSv2Suite.scala
@@ -17,18 +17,27 @@
 
 package org.apache.spark.sql.errors
 
-import org.apache.spark.sql.{AnalysisException, QueryTest}
-import 

[spark] branch master updated (ee74bd0d4e3 -> 30fc0ba2307)

2022-04-13 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from ee74bd0d4e3 [SPARK-38832][SQL] Remove unnecessary distinct in 
aggregate expression by distinctKeys
 add 30fc0ba2307 [SPARK-38844][PYTHON][SQL] Implement linear interpolate

No new revisions were added by this update.

Summary of changes:
 .../docs/source/reference/pyspark.pandas/frame.rst |   1 +
 .../source/reference/pyspark.pandas/series.rst |   1 +
 python/pyspark/pandas/frame.py |  17 +++
 python/pyspark/pandas/generic.py   |  84 ++
 python/pyspark/pandas/missing/frame.py |   1 -
 python/pyspark/pandas/missing/series.py|   1 -
 python/pyspark/pandas/series.py|  57 ++
 .../pyspark/pandas/tests/test_generic_functions.py | 124 +
 .../catalyst/expressions/windowExpressions.scala   |  74 
 .../spark/sql/api/python/PythonSQLUtils.scala  |   6 +-
 10 files changed, 363 insertions(+), 3 deletions(-)
 create mode 100644 python/pyspark/pandas/tests/test_generic_functions.py


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-38832][SQL] Remove unnecessary distinct in aggregate expression by distinctKeys

2022-04-13 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ee74bd0d4e3 [SPARK-38832][SQL] Remove unnecessary distinct in 
aggregate expression by distinctKeys
ee74bd0d4e3 is described below

commit ee74bd0d4e3d97b33aa57fe523ab4f5537125f68
Author: ulysses-you 
AuthorDate: Wed Apr 13 18:10:33 2022 +0800

[SPARK-38832][SQL] Remove unnecessary distinct in aggregate expression by 
distinctKeys

### What changes were proposed in this pull request?

Make `EliminateDistinct` support eliminate distinct by child distinct keys.

### Why are the changes needed?

We can remove the distinct in aggregate expression if the distinct 
semantics is guaranteed by child.

For example:
```sql
SELECT count(distinct c) FROM (
  SELECT c FROM t GROUP BY c
)
```

### Does this PR introduce _any_ user-facing change?

improve performance

### How was this patch tested?

add test in `EliminateDistinctSuite`

Closes #36117 from ulysses-you/remove-distinct.

Authored-by: ulysses-you 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 25 --
 .../plans/logical/LogicalPlanDistinctKeys.scala|  8 ++-
 .../optimizer/EliminateDistinctSuite.scala | 18 
 3 files changed, 44 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 66c2ad84cce..bb788336c6d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -146,7 +146,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
 PushDownPredicates) :: Nil
 }
 
-val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) ::
+val batches = (
 // Technically some of the rules in Finish Analysis are not optimizer 
rules and belong more
 // in the analyzer, because they are needed for correctness (e.g. 
ComputeCurrentTime).
 // However, because we also use the analyzer to canonicalized queries (for 
view definition),
@@ -166,6 +166,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
 
//
 // Optimizer rules start here
 
//
+Batch("Eliminate Distinct", Once, EliminateDistinct) ::
 // - Do the first call of CombineUnions before starting the major 
Optimizer rules,
 //   since it can reduce the number of iteration and the other rules could 
add/move
 //   extra operators between two adjacent Union operators.
@@ -411,14 +412,26 @@ abstract class Optimizer(catalogManager: CatalogManager)
 }
 
 /**
- * Remove useless DISTINCT for MAX and MIN.
+ * Remove useless DISTINCT:
+ *   1. For some aggregate expression, e.g.: MAX and MIN.
+ *   2. If the distinct semantics is guaranteed by child.
+ *
  * This rule should be applied before RewriteDistinctAggregates.
  */
 object EliminateDistinct extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.transformAllExpressionsWithPruning(
-_.containsPattern(AGGREGATE_EXPRESSION)) {
-case ae: AggregateExpression if ae.isDistinct && 
isDuplicateAgnostic(ae.aggregateFunction) =>
-  ae.copy(isDistinct = false)
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.transformWithPruning(
+_.containsPattern(AGGREGATE)) {
+case agg: Aggregate =>
+  
agg.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) {
+case ae: AggregateExpression if ae.isDistinct &&
+  isDuplicateAgnostic(ae.aggregateFunction) =>
+  ae.copy(isDistinct = false)
+
+case ae: AggregateExpression if ae.isDistinct &&
+  agg.child.distinctKeys.exists(
+
_.subsetOf(ExpressionSet(ae.aggregateFunction.children.filterNot(_.foldable 
=>
+  ae.copy(isDistinct = false)
+  }
   }
 
   def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala
index 1843c2da478..2ffa5a0e594 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala
@@ -29,6 

[spark] branch master updated: [SPARK-38774][PYTHON] Implement Series.autocorr

2022-04-13 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new eb699ec138d [SPARK-38774][PYTHON] Implement Series.autocorr
eb699ec138d is described below

commit eb699ec138d4a49ecc204f530eeefa513b42f4ad
Author: Ruifeng Zheng 
AuthorDate: Wed Apr 13 18:09:06 2022 +0900

[SPARK-38774][PYTHON] Implement Series.autocorr

### What changes were proposed in this pull request?
Implement Series.autocorr

### Why are the changes needed?
for API coverage

### Does this PR introduce _any_ user-facing change?
yes, Series now support function `autocorr`

```
In [86]: s = pd.Series([.2, .0, .6, .2, np.nan, .5, .6])

In [87]: s.autocorr()
Out[87]: -0.14121975762272054
```

### How was this patch tested?
added doctest

Closes #36048 from zhengruifeng/pandas_series_autocorr.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 .../source/reference/pyspark.pandas/series.rst |  1 +
 python/pyspark/pandas/missing/series.py|  1 -
 python/pyspark/pandas/series.py| 76 ++
 python/pyspark/pandas/tests/test_series.py | 17 +
 4 files changed, 94 insertions(+), 1 deletion(-)

diff --git a/python/docs/source/reference/pyspark.pandas/series.rst 
b/python/docs/source/reference/pyspark.pandas/series.rst
index b6a0d1e52d5..0f897ce2e14 100644
--- a/python/docs/source/reference/pyspark.pandas/series.rst
+++ b/python/docs/source/reference/pyspark.pandas/series.rst
@@ -134,6 +134,7 @@ Computations / Descriptive Stats
Series.abs
Series.all
Series.any
+   Series.autocorr
Series.between
Series.clip
Series.corr
diff --git a/python/pyspark/pandas/missing/series.py 
b/python/pyspark/pandas/missing/series.py
index 9bb191f1c81..07094b64bbb 100644
--- a/python/pyspark/pandas/missing/series.py
+++ b/python/pyspark/pandas/missing/series.py
@@ -33,7 +33,6 @@ class MissingPandasLikeSeries:
 
 # Functions
 asfreq = _unsupported_function("asfreq")
-autocorr = _unsupported_function("autocorr")
 combine = _unsupported_function("combine")
 convert_dtypes = _unsupported_function("convert_dtypes")
 infer_objects = _unsupported_function("infer_objects")
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index ced81b12e8c..d6cc4a8627d 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -3045,6 +3045,82 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
 DataFrame(internal.with_new_sdf(sdf, index_fields=([None] * 
internal.index_level)))
 )
 
+def autocorr(self, periods: int = 1) -> float:
+"""
+Compute the lag-N autocorrelation.
+
+This method computes the Pearson correlation between
+the Series and its shifted self.
+
+.. note:: the current implementation of rank uses Spark's Window 
without
+specifying partition specification. This leads to move all data 
into
+single partition in single machine and could cause serious
+performance degradation. Avoid this method against very large 
dataset.
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+periods : int, default 1
+Number of lags to apply before performing autocorrelation.
+
+Returns
+---
+float
+The Pearson correlation between self and self.shift(lag).
+
+See Also
+
+Series.corr : Compute the correlation between two Series.
+Series.shift : Shift index by desired number of periods.
+DataFrame.corr : Compute pairwise correlation of columns.
+
+Notes
+-
+If the Pearson correlation is not well defined return 'NaN'.
+
+Examples
+
+>>> s = ps.Series([.2, .0, .6, .2, np.nan, .5, .6])
+>>> s.autocorr()  # doctest: +ELLIPSIS
+-0.141219...
+>>> s.autocorr(0)  # doctest: +ELLIPSIS
+1.0...
+>>> s.autocorr(2)  # doctest: +ELLIPSIS
+0.970725...
+>>> s.autocorr(-3)  # doctest: +ELLIPSIS
+0.277350...
+>>> s.autocorr(5)  # doctest: +ELLIPSIS
+-1.00...
+>>> s.autocorr(6)  # doctest: +ELLIPSIS
+nan
+
+If the Pearson correlation is not well defined, then 'NaN' is returned.
+
+>>> s = ps.Series([1, 0, 0, 0])
+>>> s.autocorr()
+nan
+"""
+# This implementation is suboptimal because it moves all data to a 
single partition,
+# global sort should be used instead of window, but it should be a 
start
+if not isinstance(periods, int):
+raise TypeError("periods should be an int; however, got [%s]" % 

[spark] branch branch-3.3 updated: [SPARK-38829][SQL][3.3] Remove TimestampNTZ type support in Parquet for Spark 3.3

2022-04-13 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 76f40eef8b9 [SPARK-38829][SQL][3.3] Remove TimestampNTZ type support 
in Parquet for Spark 3.3
76f40eef8b9 is described below

commit 76f40eef8b97e23f4a16e471366ae410a3e6cc20
Author: Ivan Sadikov 
AuthorDate: Wed Apr 13 17:06:03 2022 +0800

[SPARK-38829][SQL][3.3] Remove TimestampNTZ type support in Parquet for 
Spark 3.3

### What changes were proposed in this pull request?

This is a follow-up for https://github.com/apache/spark/pull/36094.
I added `Utils.isTesting` whenever we perform schema conversion or row 
conversion for TimestampNTZType.

I verified that the tests, e.g. ParquetIOSuite, fail with unsupported data 
type when running in non-testing mode:
```
[info]   Cause: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 1 in stage 40.0 failed 1 times, most recent failure: Lost task 
1.0 in stage 40.0 (TID 66) (ip-10-110-16-208.us-west-2.compute.internal 
executor driver): org.apache.spark.sql.AnalysisException: Unsupported data type 
timestamp_ntz
[info]  at 
org.apache.spark.sql.errors.QueryCompilationErrors$.cannotConvertDataTypeToParquetTypeError(QueryCompilationErrors.scala:1304)
[info]  at 
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:707)
[info]  at 
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:479)
[info]  at 
org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.$anonfun$convert$1(ParquetSchemaConverter.scala:471)
```

### Why are the changes needed?
We have to disable TimestampNTZType as other parts of the codebase do not 
yet support this type.

### Does this PR introduce _any_ user-facing change?

No, the TimestampNTZ type is not released yet.

### How was this patch tested?

I tested the changes manually by rerunning the test suites that verify 
TimestampNTZType in the non-testing mode.

Closes #36137 from sadikovi/SPARK-38829-parquet-ntz-off.

Authored-by: Ivan Sadikov 
Signed-off-by: Gengliang Wang 
---
 .../sql/execution/datasources/parquet/ParquetRowConverter.scala| 5 -
 .../sql/execution/datasources/parquet/ParquetSchemaConverter.scala | 7 +--
 .../sql/execution/datasources/parquet/ParquetWriteSupport.scala| 4 +++-
 3 files changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index a955dd6fc76..ffd90fd722b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
 
 /**
  * A [[ParentContainerUpdater]] is used by a Parquet converter to set 
converted values to some
@@ -487,7 +488,9 @@ private[parquet] class ParquetRowConverter(
 parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 &&
   
parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation]
 &&
   !parquetType.getLogicalTypeAnnotation
-.asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC
+.asInstanceOf[TimestampLogicalTypeAnnotation].isAdjustedToUTC &&
+  // SPARK-38829: Remove TimestampNTZ type support in Parquet for Spark 3.3
+  Utils.isTesting
 
   /**
* Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 0e065f19a88..3419bf15f8e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 /**
  * This converter class is used to convert Parquet [[MessageType]] to 

[spark] branch branch-3.3 updated: [SPARK-38833][PYTHON][SQL] Allow applyInPandas to return empty DataFrame without columns

2022-04-13 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new c44020b961f [SPARK-38833][PYTHON][SQL] Allow applyInPandas to return 
empty DataFrame without columns
c44020b961f is described below

commit c44020b961ffe44e30ee617af6ffb84effbd28fe
Author: Enrico Minack 
AuthorDate: Wed Apr 13 17:07:27 2022 +0900

[SPARK-38833][PYTHON][SQL] Allow applyInPandas to return empty DataFrame 
without columns

### What changes were proposed in this pull request?
Methods `wrap_cogrouped_map_pandas_udf` and `wrap_grouped_map_pandas_udf` 
in `python/pyspark/worker.py` do not need to reject `pd.DataFrame`s with no 
columns return by udf when that DataFrame is empty (zero rows). This allows to 
return empty DataFrames without the need to define columns. The DataFrame is 
empty after all!

**The proposed behaviour is consistent with the current behaviour of 
`DataFrame.mapInPandas`.**

### Why are the changes needed?
Returning an empty DataFrame from the lambda given to `applyInPandas` 
should be as easy as this:

```python
return pd.DataFrame([])
```

However, PySpark requires that empty DataFrame to have the right _number_ 
of columns. This seems redundant as the schema is already defined in the 
`applyInPandas` call. Returning a non-empty DataFrame does not require defining 
columns.

Behaviour of `applyInPandas` should be consistent with `mapInPandas`.

Here is an example to reproduce:
```python
import pandas as pd

from pyspark.sql.functions import pandas_udf, ceil

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))

def mean_func(key, pdf):
if key == (1,):
return pd.DataFrame([])
else:
return pd.DataFrame([key + (pdf.v.mean(),)])

df.groupby("id").applyInPandas(mean_func, schema="id long, v double").show()
```

### Does this PR introduce _any_ user-facing change?
It changes the behaviour of the following calls to allow returning empty 
`pd.DataFrame` without defining columns. The PySpark DataFrame returned by 
`applyInPandas` is unchanged:

- `df.groupby(…).applyInPandas(…)`
- `df.cogroup(…).applyInPandas(…)`

### How was this patch tested?
Tests are added that test `applyInPandas` and `mapInPandas` when returning

- empty DataFrame with no columns
- empty DataFrame with the wrong number of columns
- non-empty DataFrame with wrong number of columns
- something other than `pd.DataFrame`

NOTE: It is not an error for `mapInPandas` to return DataFrames with more 
columns than specified in the `mapInPandas` schema.

Closes #36120 from EnricoMi/branch-empty-pd-dataframes.

Authored-by: Enrico Minack 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 556c74578eb2379fc6e0ec8d147674d0b10e5a2c)
Signed-off-by: Hyukjin Kwon 
---
 .../pyspark/sql/tests/test_pandas_cogrouped_map.py | 97 ++
 .../pyspark/sql/tests/test_pandas_grouped_map.py   | 76 +
 python/pyspark/sql/tests/test_pandas_map.py| 71 ++--
 python/pyspark/worker.py   | 12 ++-
 4 files changed, 246 insertions(+), 10 deletions(-)

diff --git a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py 
b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
index 58022fa6e83..3f403d9c9d6 100644
--- a/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_cogrouped_map.py
@@ -20,6 +20,7 @@ from typing import cast
 
 from pyspark.sql.functions import array, explode, col, lit, udf, pandas_udf
 from pyspark.sql.types import DoubleType, StructType, StructField, Row
+from pyspark.sql.utils import PythonException
 from pyspark.testing.sqlutils import (
 ReusedSQLTestCase,
 have_pandas,
@@ -124,6 +125,102 @@ class CogroupedMapInPandasTests(ReusedSQLTestCase):
 
 assert_frame_equal(expected, result)
 
+def test_apply_in_pandas_not_returning_pandas_dataframe(self):
+left = self.data1
+right = self.data2
+
+def merge_pandas(lft, rgt):
+return lft.size + rgt.size
+
+with QuietTest(self.sc):
+with self.assertRaisesRegex(
+PythonException,
+"Return type of the user-defined function should be 
pandas.DataFrame, "
+"but is ",
+):
+(
+left.groupby("id")
+.cogroup(right.groupby("id"))
+.applyInPandas(merge_pandas, "id long, k int, v int, v2 
int")
+.collect()
+)
+
+def 

[spark] branch master updated (094a4ef6703 -> 556c74578eb)

2022-04-13 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 094a4ef6703 [SPARK-38804][PYTHON][SS][DOCS][FOLLOW-UP] Document 
StreamingQueryManager.removeListener
 add 556c74578eb [SPARK-38833][PYTHON][SQL] Allow applyInPandas to return 
empty DataFrame without columns

No new revisions were added by this update.

Summary of changes:
 .../pyspark/sql/tests/test_pandas_cogrouped_map.py | 97 ++
 .../pyspark/sql/tests/test_pandas_grouped_map.py   | 76 +
 python/pyspark/sql/tests/test_pandas_map.py| 71 ++--
 python/pyspark/worker.py   | 12 ++-
 4 files changed, 246 insertions(+), 10 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (bf75b495e18 -> 094a4ef6703)

2022-04-13 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from bf75b495e18 [SPARK-38855][SQL] DS V2 supports push down math functions
 add 094a4ef6703 [SPARK-38804][PYTHON][SS][DOCS][FOLLOW-UP] Document 
StreamingQueryManager.removeListener

No new revisions were added by this update.

Summary of changes:
 python/docs/source/reference/pyspark.ss.rst | 1 +
 1 file changed, 1 insertion(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-38677][PYSPARK][3.2] Python MonitorThread should detect deadlock due to blocking I/O

2022-04-13 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new d66350a2aad [SPARK-38677][PYSPARK][3.2] Python MonitorThread should 
detect deadlock due to blocking I/O
d66350a2aad is described below

commit d66350a2aadd7f1e612cc9cf54009ea6f531630e
Author: Ankur Dave 
AuthorDate: Wed Apr 13 16:58:18 2022 +0900

[SPARK-38677][PYSPARK][3.2] Python MonitorThread should detect deadlock due 
to blocking I/O

### What changes were proposed in this pull request?

This PR cherry-picks https://github.com/apache/spark/pull/36065 to 
branch-3.2.

---

When calling a Python UDF on a DataFrame with large rows, a deadlock can 
occur involving the following three threads:

1. The Scala task executor thread. During task execution, this is 
responsible for reading output produced by the Python process. However, in this 
case the task has finished early, and this thread is no longer reading output 
produced by the Python process. Instead, it is waiting for the Scala 
WriterThread to exit so that it can finish the task.
2. The Scala WriterThread. This is trying to send a large row to the Python 
process, and is waiting for the Python process to read that row.
3. The Python process. This is trying to send a large output to the Scala 
task executor thread, and is waiting for that thread to read that output, which 
will never happen.

We considered the following three solutions for the deadlock:

1. When the task completes, make the Scala task executor thread close the 
socket before waiting for the Scala WriterThread to exit. If the WriterThread 
is blocked on a large write, this would interrupt that write and allow the 
WriterThread to exit. However, it would prevent Python worker reuse.
2. Modify PythonWorkerFactory to use interruptible I/O. 
[java.nio.channels.SocketChannel](https://docs.oracle.com/javase/6/docs/api/java/nio/channels/SocketChannel.html#write(java.nio.ByteBuffer))
 supports interruptible blocking operations. The goal is that when the 
WriterThread is interrupted, it should exit even if it was blocked on a large 
write. However, this would be invasive.
3. Add a watchdog thread similar to the existing PythonRunner.MonitorThread 
to detect this deadlock and kill the Python worker. The MonitorThread currently 
kills the Python worker only if the task itself is interrupted. In this case, 
the task completes normally, so the MonitorThread does not take action. We want 
the new watchdog thread (WriterMonitorThread) to detect that the task is 
completed but the Python writer thread has not stopped, indicating a deadlock.

This PR implements Option 3.

### Why are the changes needed?
To fix a deadlock that can cause PySpark queries to hang.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added a test that previously encountered the deadlock and timed out, and 
now succeeds.

Closes #36172 from HyukjinKwon/SPARK-38677-3.2.

Authored-by: Ankur Dave 
Signed-off-by: Hyukjin Kwon 
---
 .../org/apache/spark/api/python/PythonRunner.scala | 49 ++
 python/pyspark/tests/test_rdd.py   | 35 
 2 files changed, 84 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 66b23782cf9..fabff970f2b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -181,6 +181,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
 }
 
 writerThread.start()
+new WriterMonitorThread(SparkEnv.get, worker, writerThread, 
context).start()
 if (reuseWorker) {
   val key = (worker, context.taskAttemptId)
   // SPARK-35009: avoid creating multiple monitor threads for the same 
python worker
@@ -643,6 +644,54 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   }
 }
   }
+
+  /**
+   * This thread monitors the WriterThread and kills it in case of deadlock.
+   *
+   * A deadlock can arise if the task completes while the writer thread is 
sending input to the
+   * Python process (e.g. due to the use of `take()`), and the Python process 
is still producing
+   * output. When the inputs are sufficiently large, this can result in a 
deadlock due to the use of
+   * blocking I/O (SPARK-38677). To resolve the deadlock, we need to close the 
socket.
+   */
+  class WriterMonitorThread(
+  env: SparkEnv, worker: Socket, writerThread: WriterThread, context: 
TaskContext)
+extends Thread(s"Writer Monitor for $pythonExec (writer thread id 
${writerThread.getId})") {
+
+

[spark] branch branch-3.3 updated: [SPARK-38855][SQL] DS V2 supports push down math functions

2022-04-13 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 96c8b4f47c2 [SPARK-38855][SQL] DS V2 supports push down math functions
96c8b4f47c2 is described below

commit 96c8b4f47c2d0df249efb02b248b5c230188
Author: Jiaan Geng 
AuthorDate: Wed Apr 13 14:41:47 2022 +0800

[SPARK-38855][SQL] DS V2 supports push down math functions

### What changes were proposed in this pull request?
Currently, Spark have some math functions of ANSI standard. Please refer 
https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388
These functions show below:
`LN`,
`EXP`,
`POWER`,
`SQRT`,
`FLOOR`,
`CEIL`,
`WIDTH_BUCKET`

The mainstream databases support these functions show below.

|  函数   | PostgreSQL  | ClickHouse  | H2  | MySQL  | Oracle  | Redshift  | 
Presto  | Teradata  | Snowflake  | DB2  | Vertica  | Exasol  | SqlServer  | 
Yellowbrick  | Impala  | Mariadb | Druid | Pig | SQLite | Influxdata | 
Singlestore | ElasticSearch |
|    |   |   |   |   |   |   |   |   | 
  |   |   |   |   |   |   |   |   |   | 
  |   |   |   |
| `LN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | 
Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes |
| `EXP` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | 
Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes |
| `POWER` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes 
| Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes |
| `SQRT` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes 
| Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes |
| `FLOOR` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes 
| Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes |
| `CEIL` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes 
| Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes |
| `WIDTH_BUCKET` | Yes | No | No | No | Yes | No | Yes | Yes | Yes | Yes | 
Yes | No | No | No | Yes | No | No | No | No | No | No | No |

DS V2 should supports push down these math functions.

### Why are the changes needed?
DS V2 supports push down math functions

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests.

Closes #36140 from beliefer/SPARK-38855.

Authored-by: Jiaan Geng 
Signed-off-by: Wenchen Fan 
(cherry picked from commit bf75b495e18ed87d0c118bfd5f1ceb52d720cad9)
Signed-off-by: Wenchen Fan 
---
 .../expressions/GeneralScalarExpression.java   | 54 ++
 .../sql/connector/util/V2ExpressionSQLBuilder.java |  7 +++
 .../spark/sql/errors/QueryCompilationErrors.scala  |  4 ++
 .../sql/catalyst/util/V2ExpressionBuilder.scala| 28 ++-
 .../org/apache/spark/sql/jdbc/H2Dialect.scala  | 26 +++
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 28 ++-
 6 files changed, 145 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
index 8952761f9ef..58082d5ee09 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
@@ -94,6 +94,60 @@ import 
org.apache.spark.sql.connector.util.V2ExpressionSQLBuilder;
  *Since version: 3.3.0
  *   
  *  
+ *  Name: ABS
+ *   
+ *SQL semantic: ABS(expr)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: COALESCE
+ *   
+ *SQL semantic: COALESCE(expr1, expr2)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: LN
+ *   
+ *SQL semantic: LN(expr)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: EXP
+ *   
+ *SQL semantic: EXP(expr)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: POWER
+ *   
+ *SQL semantic: POWER(expr, number)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: SQRT
+ *   
+ *SQL semantic: SQRT(expr)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: FLOOR
+ *   
+ *SQL semantic: FLOOR(expr)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: CEIL
+ *   
+ *SQL semantic: CEIL(expr)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: WIDTH_BUCKET
+ *   
+ *SQL semantic: WIDTH_BUCKET(expr)
+ *Since version: 3.3.0
+ *   
+ *  

[spark] branch master updated: [SPARK-38855][SQL] DS V2 supports push down math functions

2022-04-13 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bf75b495e18 [SPARK-38855][SQL] DS V2 supports push down math functions
bf75b495e18 is described below

commit bf75b495e18ed87d0c118bfd5f1ceb52d720cad9
Author: Jiaan Geng 
AuthorDate: Wed Apr 13 14:41:47 2022 +0800

[SPARK-38855][SQL] DS V2 supports push down math functions

### What changes were proposed in this pull request?
Currently, Spark have some math functions of ANSI standard. Please refer 
https://github.com/apache/spark/blob/2f8613f22c0750c00cf1dcfb2f31c431d8dc1be7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L388
These functions show below:
`LN`,
`EXP`,
`POWER`,
`SQRT`,
`FLOOR`,
`CEIL`,
`WIDTH_BUCKET`

The mainstream databases support these functions show below.

|  函数   | PostgreSQL  | ClickHouse  | H2  | MySQL  | Oracle  | Redshift  | 
Presto  | Teradata  | Snowflake  | DB2  | Vertica  | Exasol  | SqlServer  | 
Yellowbrick  | Impala  | Mariadb | Druid | Pig | SQLite | Influxdata | 
Singlestore | ElasticSearch |
|    |   |   |   |   |   |   |   |   | 
  |   |   |   |   |   |   |   |   |   | 
  |   |   |   |
| `LN` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | 
Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes |
| `EXP` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | 
Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes |
| `POWER` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes 
| Yes | Yes | Yes | Yes | Yes | Yes | No | Yes | Yes | Yes | Yes |
| `SQRT` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes 
| Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes |
| `FLOOR` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes 
| Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes |
| `CEIL` | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes 
| Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes | Yes |
| `WIDTH_BUCKET` | Yes | No | No | No | Yes | No | Yes | Yes | Yes | Yes | 
Yes | No | No | No | Yes | No | No | No | No | No | No | No |

DS V2 should supports push down these math functions.

### Why are the changes needed?
DS V2 supports push down math functions

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests.

Closes #36140 from beliefer/SPARK-38855.

Authored-by: Jiaan Geng 
Signed-off-by: Wenchen Fan 
---
 .../expressions/GeneralScalarExpression.java   | 54 ++
 .../sql/connector/util/V2ExpressionSQLBuilder.java |  7 +++
 .../spark/sql/errors/QueryCompilationErrors.scala  |  4 ++
 .../sql/catalyst/util/V2ExpressionBuilder.scala| 28 ++-
 .../org/apache/spark/sql/jdbc/H2Dialect.scala  | 26 +++
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala| 28 ++-
 6 files changed, 145 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
index 8952761f9ef..58082d5ee09 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/GeneralScalarExpression.java
@@ -94,6 +94,60 @@ import 
org.apache.spark.sql.connector.util.V2ExpressionSQLBuilder;
  *Since version: 3.3.0
  *   
  *  
+ *  Name: ABS
+ *   
+ *SQL semantic: ABS(expr)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: COALESCE
+ *   
+ *SQL semantic: COALESCE(expr1, expr2)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: LN
+ *   
+ *SQL semantic: LN(expr)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: EXP
+ *   
+ *SQL semantic: EXP(expr)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: POWER
+ *   
+ *SQL semantic: POWER(expr, number)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: SQRT
+ *   
+ *SQL semantic: SQRT(expr)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: FLOOR
+ *   
+ *SQL semantic: FLOOR(expr)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: CEIL
+ *   
+ *SQL semantic: CEIL(expr)
+ *Since version: 3.3.0
+ *   
+ *  
+ *  Name: WIDTH_BUCKET
+ *   
+ *SQL semantic: WIDTH_BUCKET(expr)
+ *Since version: 3.3.0
+ *   
+ *  
  * 
  * Note: SQL semantic conforms ANSI standard, so some expressions are not 
supported when ANSI off,
  * 

[spark] branch master updated: [SPARK-38530][SQL] Fix a bug that GeneratorNestedColumnAliasing can be incorrectly applied to some expressions

2022-04-13 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 13edafab9f4 [SPARK-38530][SQL] Fix a bug that 
GeneratorNestedColumnAliasing can be incorrectly applied to some expressions
13edafab9f4 is described below

commit 13edafab9f45cc80aee41e2f82475367d88357ec
Author: minyyy 
AuthorDate: Wed Apr 13 14:01:27 2022 +0800

[SPARK-38530][SQL] Fix a bug that GeneratorNestedColumnAliasing can be 
incorrectly applied to some expressions

### What changes were proposed in this pull request?

This PR makes GeneratorNestedColumnAliasing only be able to apply to 
GetStructField*(_: AttributeReference), here GetStructField* means nested 
GetStructField. The current way to collect expressions is a top-down way and it 
actually only checks 2 levels which is wrong. The rule is simple - If we see 
expressions other than GetStructField, we are done. When an expression E is 
pushed down into an Explode, the thing happens is:
E(x) is now pushed down to apply to E(array(x)).
So only expressions that can operate on both x and array(x) can be pushed. 
GetStructField is special since we have GetArrayStructFields and when 
GetStructField is pushed down, it becomes GetArrayStructFields. Any other 
expressions are not applicable.
We also do not even need to check the child type is Array(Array()) or 
whether the rewritten expression has the pattern 
GetArrayStructFields(GetArrayStructFields()).
1. When the child input type is Array(Array()), the ExtractValues 
expressions we get will always start from an innermost GetArrayStructFields, it 
does not align with GetStructField*(x).
2. When we see GetArrayStructFields(GetArrayStructFields()) in the 
rewritten generator, we must have seen a GetArrayStructFields in the 
expressions before pushdown.

### Why are the changes needed?
It fixes some correctness issues. See the above section for more details.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests.

Closes #35866 from minyyy/gnca_wrong_expr.

Lead-authored-by: minyyy 
Co-authored-by: minyyy <98760575+min...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
---
 .../catalyst/optimizer/NestedColumnAliasing.scala  | 50 ++
 .../optimizer/NestedColumnAliasingSuite.scala  | 40 -
 2 files changed, 69 insertions(+), 21 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index 9cf2925cdd2..45f84c21b7d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -240,12 +240,14 @@ object NestedColumnAliasing {
*/
   def getAttributeToExtractValues(
   exprList: Seq[Expression],
-  exclusiveAttrs: Seq[Attribute]): Map[Attribute, Seq[ExtractValue]] = {
+  exclusiveAttrs: Seq[Attribute],
+  extractor: (Expression) => Seq[Expression] = 
collectRootReferenceAndExtractValue)
+: Map[Attribute, Seq[ExtractValue]] = {
 
 val nestedFieldReferences = new mutable.ArrayBuffer[ExtractValue]()
 val otherRootReferences = new mutable.ArrayBuffer[AttributeReference]()
 exprList.foreach { e =>
-  collectRootReferenceAndExtractValue(e).foreach {
+  extractor(e).foreach {
 // we can not alias the attr from lambda variable whose expr id is not 
available
 case ev: ExtractValue if 
!ev.exists(_.isInstanceOf[NamedLambdaVariable]) =>
   if (ev.references.size == 1) {
@@ -350,23 +352,44 @@ object GeneratorNestedColumnAliasing {
 return None
   }
   val generatorOutputSet = AttributeSet(g.qualifiedGeneratorOutput)
-  val (attrToExtractValuesOnGenerator, attrToExtractValuesNotOnGenerator) =
+  var (attrToExtractValuesOnGenerator, attrToExtractValuesNotOnGenerator) =
 attrToExtractValues.partition { case (attr, _) =>
   attr.references.subsetOf(generatorOutputSet) }
 
   val pushedThrough = NestedColumnAliasing.rewritePlanWithAliases(
 plan, attrToExtractValuesNotOnGenerator)
 
-  // If the generator output is `ArrayType`, we cannot push through the 
extractor.
-  // It is because we don't allow field extractor on two-level array,
-  // i.e., attr.field when attr is a ArrayType(ArrayType(...)).
-  // Similarily, we also cannot push through if the child of generator is 
`MapType`.
+  // We cannot push through if the child of generator is `MapType`.
   g.generator.children.head.dataType match {
 case _: MapType =>