[spark] branch master updated (5aeae892562 -> 4f8cc999b15)

2022-09-16 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 5aeae892562 [SPARK-40471][BUILD] Upgrade RoaringBitmap to 0.9.32
 add 4f8cc999b15 [SPARK-40436][BUILD] Upgrade Scala to 2.12.17

No new revisions were added by this update.

Summary of changes:
 dev/deps/spark-deps-hadoop-2-hive-2.3 | 6 +++---
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 6 +++---
 pom.xml   | 6 +++---
 project/SparkBuild.scala  | 4 ++--
 4 files changed, 11 insertions(+), 11 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40471][BUILD] Upgrade RoaringBitmap to 0.9.32

2022-09-16 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5aeae892562 [SPARK-40471][BUILD] Upgrade RoaringBitmap to 0.9.32
5aeae892562 is described below

commit 5aeae892562f484cc0f4ff301c93291ed052e92a
Author: yangjie01 
AuthorDate: Fri Sep 16 22:26:10 2022 -0700

[SPARK-40471][BUILD] Upgrade RoaringBitmap to 0.9.32

### What changes were proposed in this pull request?
This pr aims upgrade RoaringBitmap 0.9.32

### Why are the changes needed?
This is a bug fix version:

- https://github.com/RoaringBitmap/RoaringBitmap/issues/575
- https://github.com/RoaringBitmap/RoaringBitmap/pull/578

other changes as follows:

- https://github.com/RoaringBitmap/RoaringBitmap/compare/0.9.31...0.9.32

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

Closes #37914 from LuciferYang/SPARK-40471.

Authored-by: yangjie01 
Signed-off-by: Dongjoon Hyun 
---
 core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt | 10 +-
 core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt | 10 +-
 core/benchmarks/MapStatusesConvertBenchmark-results.txt   |  8 
 dev/deps/spark-deps-hadoop-2-hive-2.3 |  4 ++--
 dev/deps/spark-deps-hadoop-3-hive-2.3 |  4 ++--
 pom.xml   |  2 +-
 6 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt 
b/core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt
index 96fa24175c5..adac80834e4 100644
--- a/core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt
+++ b/core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt
@@ -2,12 +2,12 @@
 MapStatuses Convert Benchmark
 

 
-OpenJDK 64-Bit Server VM 11.0.15+10-LTS on Linux 5.13.0-1031-azure
-Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
+OpenJDK 64-Bit Server VM 11.0.16+8-LTS on Linux 5.15.0-1019-azure
+Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz
 MapStatuses Convert:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Num Maps: 5 Fetch partitions:500   1324   1333 
  7  0.0  1324283680.0   1.0X
-Num Maps: 5 Fetch partitions:1000  2650   2670 
 32  0.0  2650318387.0   0.5X
-Num Maps: 5 Fetch partitions:1500  4018   4059 
 53  0.0  4017921009.0   0.3X
+Num Maps: 5 Fetch partitions:500   1269   1276 
  8  0.0  1268666001.0   1.0X
+Num Maps: 5 Fetch partitions:1000  2672   2695 
 39  0.0  2671542753.0   0.5X
+Num Maps: 5 Fetch partitions:1500  4034   4069 
 50  0.0  4033696987.0   0.3X
 
 
diff --git a/core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt 
b/core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt
index 0ba8d756dfc..9911ae3326f 100644
--- a/core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt
+++ b/core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt
@@ -2,12 +2,12 @@
 MapStatuses Convert Benchmark
 

 
-OpenJDK 64-Bit Server VM 17.0.3+7-LTS on Linux 5.13.0-1031-azure
-Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
+OpenJDK 64-Bit Server VM 17.0.4+8-LTS on Linux 5.15.0-1019-azure
+Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
 MapStatuses Convert:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
 

-Num Maps: 5 Fetch partitions:500   1092   1104 
 22  0.0  1091691925.0   1.0X
-Num Maps: 5 Fetch partitions:1000  2172   2192 
 29  0.0  2171702137.0   0.5X
-Num Maps: 5 Fetch partitions:1500  3268   3291 
 27  0.0  3267904436.0   0.3X
+Num Maps: 5 Fetch partitions:500   1228   1238 
 17  0.0  1228191051.0   1.0X
+Num Maps: 5 Fetch partitions:1000  2380   2393 
 16  0.0  2379601524.0   0.5X
+Num Maps: 5 Fetch partitions:1500  3803   

[spark] branch master updated: [SPARK-40447][PS] Implement `kendall` correlation in `DataFrame.corr`

2022-09-16 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5b2bd1c9c0c [SPARK-40447][PS] Implement `kendall` correlation in 
`DataFrame.corr`
5b2bd1c9c0c is described below

commit 5b2bd1c9c0cb109f8a801dfcfb6ba1305bf864c6
Author: Ruifeng Zheng 
AuthorDate: Sat Sep 17 07:30:31 2022 +0800

[SPARK-40447][PS] Implement `kendall` correlation in `DataFrame.corr`

### What changes were proposed in this pull request?
Implement `kendall` correlation in `DataFrame.corr`

### Why are the changes needed?
for API coverage

### Does this PR introduce _any_ user-facing change?
yes, new correlation option:
```
In [1]: import pyspark.pandas as ps

In [2]: df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)], 
columns=['dogs', 'cats'])

In [3]: df.corr('kendall')

  dogs  cats
dogs  1.00 -0.912871
cats -0.912871  1.00

In [4]: df.to_pandas().corr('kendall')
/Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:975: 
PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's 
memory. It should only be used if the resulting pandas DataFrame is expected to 
be small.
  warnings.warn(message, PandasAPIOnSparkAdviceWarning)
Out[4]:
  dogs  cats
dogs  1.00 -0.912871
cats -0.912871  1.00
```

### How was this patch tested?
added UT

Closes #37913 from zhengruifeng/ps_df_kendall.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/pandas/frame.py| 260 +-
 python/pyspark/pandas/tests/test_stats.py |  32 ++--
 2 files changed, 204 insertions(+), 88 deletions(-)

diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 4149868dde9..d7b26cacda3 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -1424,9 +1424,10 @@ class DataFrame(Frame, Generic[T]):
 
 Parameters
 --
-method : {'pearson', 'spearman'}
+method : {'pearson', 'spearman', 'kendall'}
 * pearson : standard correlation coefficient
 * spearman : Spearman rank correlation
+* kendall : Kendall Tau correlation coefficient
 min_periods : int, optional
 Minimum number of observations required per pair of columns
 to have a valid result.
@@ -1435,12 +1436,21 @@ class DataFrame(Frame, Generic[T]):
 
 Returns
 ---
-y : DataFrame
+DataFrame
 
 See Also
 
+DataFrame.corrwith
 Series.corr
 
+Notes
+-
+1. Pearson, Kendall and Spearman correlation are currently computed 
using pairwise
+   complete observations.
+
+2. The complexity of Spearman correlation is O(#row * #row), if the 
dataset is too
+   large, sampling ahead of correlation computation is recommended.
+
 Examples
 
 >>> df = ps.DataFrame([(.2, .3), (.0, .6), (.6, .0), (.2, .1)],
@@ -1455,16 +1465,13 @@ class DataFrame(Frame, Generic[T]):
 dogs  1.00 -0.948683
 cats -0.948683  1.00
 
-Notes
--
-There are behavior differences between pandas-on-Spark and pandas.
-
-* the `method` argument only accepts 'pearson', 'spearman'
+>>> df.corr('kendall')
+  dogs  cats
+dogs  1.00 -0.912871
+cats -0.912871  1.00
 """
 if method not in ["pearson", "spearman", "kendall"]:
 raise ValueError(f"Invalid method {method}")
-if method == "kendall":
-raise NotImplementedError("method doesn't support kendall for now")
 if min_periods is not None and not isinstance(min_periods, int):
 raise TypeError(f"Invalid min_periods type 
{type(min_periods).__name__}")
 
@@ -1537,87 +1544,196 @@ class DataFrame(Frame, Generic[T]):
 .otherwise(F.col(f"{tmp_tuple_col_name}.{tmp_value_2_col_name}"))
 .alias(tmp_value_2_col_name),
 )
+not_null_cond = (
+F.col(tmp_value_1_col_name).isNotNull() & 
F.col(tmp_value_2_col_name).isNotNull()
+)
 
-# convert values to avg ranks for spearman correlation
-if method == "spearman":
-tmp_row_number_col_name = verify_temp_column_name(sdf, 
"__tmp_row_number_col__")
-tmp_dense_rank_col_name = verify_temp_column_name(sdf, 
"__tmp_dense_rank_col__")
-window = Window.partitionBy(tmp_index_1_col_name, 
tmp_index_2_col_name)
-
-# tmp_value_1_col_name: value -> avg rank
-# for example:
-# values:   3, 4, 5, 7, 7, 7, 9, 

[spark] branch master updated (4e0fea2393b -> 4e5ece2be34)

2022-09-16 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 4e0fea2393b [SPARK-40169][SQL] Don't pushdown Parquet filters with no 
reference to data schema
 add 4e5ece2be34 [SPARK-40445][PS] Refactor `Resampler` for consistency and 
simplicity

No new revisions were added by this update.

Summary of changes:
 python/pyspark/pandas/groupby.py  | 22 ++---
 python/pyspark/pandas/resample.py | 67 ++-
 python/pyspark/pandas/series.py   |  2 +-
 python/pyspark/pandas/window.py   |  2 +-
 4 files changed, 43 insertions(+), 50 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-40169][SQL] Don't pushdown Parquet filters with no reference to data schema

2022-09-16 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new e90a57e1aa5 [SPARK-40169][SQL] Don't pushdown Parquet filters with no 
reference to data schema
e90a57e1aa5 is described below

commit e90a57e1aa5afdb5e9f04f2ddcb34916c2939b2e
Author: Chao Sun 
AuthorDate: Fri Sep 16 10:46:36 2022 -0700

[SPARK-40169][SQL] Don't pushdown Parquet filters with no reference to data 
schema

### What changes were proposed in this pull request?

Currently in Parquet V1 read path, Spark will pushdown data filters even if 
they have no reference in the Parquet read schema. This can cause correctness 
issues as described in 
[SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

The root cause, it seems, is because in the V1 path, we first use 
`AttributeReference` equality to filter out data columns without partition 
columns, and then use `AttributeSet` equality to filter out filters with only 
references to data columns.
There's inconsistency in the two steps, when case sensitive check is false.

Take the following scenario as example:
- data column: `[COL, a]`
- partition column: `[col]`
- filter: `col > 10`

With `AttributeReference` equality, `COL` is not considered equal to `col` 
(because their names are different), and thus the filtered out data column set 
is still `[COL, a]`. However, when calculating filters with only reference to 
data columns, `COL` is **considered equal** to `col`. Consequently, the filter 
`col > 10`, when checking with `[COL, a]`, is considered to have reference to 
data columns, and thus will be pushed down to Parquet as data filter.

On the Parquet side, since `col` doesn't exist in the file schema (it only 
has `COL`), when column index enabled, it will incorrectly return wrong number 
of rows. See [PARQUET-2170](https://issues.apache.org/jira/browse/PARQUET-2170) 
for more detail.

In general, where data columns overlap with partition columns and case 
sensitivity is false, partition filters will not be filter out before we 
calculate filters with only reference to data columns, which is incorrect.

### Why are the changes needed?

This fixes the correctness bug described in 
[SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

There are existing test cases for this issue from 
[SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833). This also 
modified them to test the scenarios when case sensitivity is on or off.

Closes #37881 from sunchao/SPARK-40169.

Authored-by: Chao Sun 
Signed-off-by: Chao Sun 
---
 .../execution/datasources/FileSourceStrategy.scala |  2 +-
 .../datasources/parquet/ParquetFileFormat.scala|  5 ---
 .../datasources/parquet/ParquetQuerySuite.scala| 38 ++
 3 files changed, 25 insertions(+), 20 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 1bfde7515dc..50a6519604f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -183,7 +183,7 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
 
   // Partition keys are not available in the statistics of the files.
   // `dataColumns` might have partition columns, we need to filter them 
out.
-  val dataColumnsWithoutPartitionCols = 
dataColumns.filterNot(partitionColumns.contains)
+  val dataColumnsWithoutPartitionCols = 
dataColumns.filterNot(partitionSet.contains)
   val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f =>
 if (f.references.intersect(partitionSet).nonEmpty) {
   extractPredicatesWithinOutputSet(f, 
AttributeSet(dataColumnsWithoutPartitionCols))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 6b3922d11a4..e5d33b84bf0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -228,11 +228,6 @@ class ParquetFileFormat
   SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
   sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
 
-// See PARQUET-2170.
-// Disable column index 

[spark] branch branch-3.3 updated: [SPARK-40169][SQL] Don't pushdown Parquet filters with no reference to data schema

2022-09-16 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new ba6d17288c3 [SPARK-40169][SQL] Don't pushdown Parquet filters with no 
reference to data schema
ba6d17288c3 is described below

commit ba6d17288c3287e8dc1f7cb95db0233a45732dc0
Author: Chao Sun 
AuthorDate: Fri Sep 16 10:46:36 2022 -0700

[SPARK-40169][SQL] Don't pushdown Parquet filters with no reference to data 
schema

### What changes were proposed in this pull request?

Currently in Parquet V1 read path, Spark will pushdown data filters even if 
they have no reference in the Parquet read schema. This can cause correctness 
issues as described in 
[SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

The root cause, it seems, is because in the V1 path, we first use 
`AttributeReference` equality to filter out data columns without partition 
columns, and then use `AttributeSet` equality to filter out filters with only 
references to data columns.
There's inconsistency in the two steps, when case sensitive check is false.

Take the following scenario as example:
- data column: `[COL, a]`
- partition column: `[col]`
- filter: `col > 10`

With `AttributeReference` equality, `COL` is not considered equal to `col` 
(because their names are different), and thus the filtered out data column set 
is still `[COL, a]`. However, when calculating filters with only reference to 
data columns, `COL` is **considered equal** to `col`. Consequently, the filter 
`col > 10`, when checking with `[COL, a]`, is considered to have reference to 
data columns, and thus will be pushed down to Parquet as data filter.

On the Parquet side, since `col` doesn't exist in the file schema (it only 
has `COL`), when column index enabled, it will incorrectly return wrong number 
of rows. See [PARQUET-2170](https://issues.apache.org/jira/browse/PARQUET-2170) 
for more detail.

In general, where data columns overlap with partition columns and case 
sensitivity is false, partition filters will not be filter out before we 
calculate filters with only reference to data columns, which is incorrect.

### Why are the changes needed?

This fixes the correctness bug described in 
[SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833).

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

There are existing test cases for this issue from 
[SPARK-39833](https://issues.apache.org/jira/browse/SPARK-39833). This also 
modified them to test the scenarios when case sensitivity is on or off.

Closes #37881 from sunchao/SPARK-40169.

Authored-by: Chao Sun 
Signed-off-by: Chao Sun 
---
 .../execution/datasources/FileSourceStrategy.scala |  2 +-
 .../datasources/parquet/ParquetFileFormat.scala|  5 ---
 .../datasources/parquet/ParquetQuerySuite.scala| 38 ++
 3 files changed, 25 insertions(+), 20 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 9356e46a691..37a0444 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -184,7 +184,7 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
 
   // Partition keys are not available in the statistics of the files.
   // `dataColumns` might have partition columns, we need to filter them 
out.
-  val dataColumnsWithoutPartitionCols = 
dataColumns.filterNot(partitionColumns.contains)
+  val dataColumnsWithoutPartitionCols = 
dataColumns.filterNot(partitionSet.contains)
   val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f =>
 if (f.references.intersect(partitionSet).nonEmpty) {
   extractPredicatesWithinOutputSet(f, 
AttributeSet(dataColumnsWithoutPartitionCols))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 2fa0854c983..9765e7c7801 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -230,11 +230,6 @@ class ParquetFileFormat
   SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
   sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
 
-// See PARQUET-2170.
-// Disable column index 

[spark] branch master updated (d71b180295e -> 4e0fea2393b)

2022-09-16 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from d71b180295e [SPARK-40398][CORE][SQL] Use Loop instead of Arrays.stream 
api
 add 4e0fea2393b [SPARK-40169][SQL] Don't pushdown Parquet filters with no 
reference to data schema

No new revisions were added by this update.

Summary of changes:
 .../execution/datasources/FileSourceStrategy.scala |  2 +-
 .../datasources/parquet/ParquetFileFormat.scala|  5 ---
 .../datasources/parquet/ParquetQuerySuite.scala| 38 ++
 3 files changed, 25 insertions(+), 20 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40398][CORE][SQL] Use Loop instead of Arrays.stream api

2022-09-16 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d71b180295e [SPARK-40398][CORE][SQL] Use Loop instead of Arrays.stream 
api
d71b180295e is described below

commit d71b180295ea89b39047cff8397c5b3c2fe0bd20
Author: yangjie01 
AuthorDate: Fri Sep 16 08:29:31 2022 -0500

[SPARK-40398][CORE][SQL] Use Loop instead of Arrays.stream api

### What changes were proposed in this pull request?
This PR replaces `Arrays.stream` api with loop where performance 
improvement can be obtained.

### Why are the changes needed?
Minor performance improvement.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass Github actions

Closes #37843 from LuciferYang/ExpressionArrayToStrings.

Authored-by: yangjie01 
Signed-off-by: Sean Owen 
---
 .../network/shuffle/OneForOneBlockFetcher.java | 24 -
 .../sql/connector/expressions/Expression.java  | 20 +--
 .../sql/connector/metric/CustomAvgMetric.java  |  7 ++-
 .../sql/connector/metric/CustomSumMetric.java  |  8 +--
 .../sql/connector/util/V2ExpressionSQLBuilder.java | 62 +-
 .../datasources/v2/V2PredicateSuite.scala  |  4 +-
 6 files changed, 87 insertions(+), 38 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
index a788b508e7b..b93db3f570b 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
@@ -113,10 +113,30 @@ public class OneForOneBlockFetcher {
* @return whether the array contains only shuffle block IDs
*/
   private boolean areShuffleBlocksOrChunks(String[] blockIds) {
-if (Arrays.stream(blockIds).anyMatch(blockId -> 
!blockId.startsWith(SHUFFLE_BLOCK_PREFIX))) {
+if (isAnyBlockNotStartWithShuffleBlockPrefix(blockIds)) {
   // It comes here because there is a blockId which doesn't have 
"shuffle_" prefix so we
   // check if all the block ids are shuffle chunk Ids.
-  return Arrays.stream(blockIds).allMatch(blockId -> 
blockId.startsWith(SHUFFLE_CHUNK_PREFIX));
+  return isAllBlocksStartWithShuffleChunkPrefix(blockIds);
+}
+return true;
+  }
+
+  // SPARK-40398: Replace `Arrays.stream().anyMatch()` with this method due to 
perf gain.
+  private static boolean isAnyBlockNotStartWithShuffleBlockPrefix(String[] 
blockIds) {
+for (String blockId : blockIds) {
+  if (!blockId.startsWith(SHUFFLE_BLOCK_PREFIX)) {
+return true;
+  }
+}
+return false;
+  }
+
+  // SPARK-40398: Replace `Arrays.stream().allMatch()` with this method due to 
perf gain.
+  private static boolean isAllBlocksStartWithShuffleChunkPrefix(String[] 
blockIds) {
+for (String blockId : blockIds) {
+  if (!blockId.startsWith(SHUFFLE_CHUNK_PREFIX)) {
+return false;
+  }
 }
 return true;
   }
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Expression.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Expression.java
index 76dfe73f666..25953ec32e4 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Expression.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/expressions/Expression.java
@@ -17,7 +17,9 @@
 
 package org.apache.spark.sql.connector.expressions;
 
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.spark.annotation.Evolving;
 
@@ -30,6 +32,13 @@ import org.apache.spark.annotation.Evolving;
 public interface Expression {
   Expression[] EMPTY_EXPRESSION = new Expression[0];
 
+  /**
+   * `EMPTY_EXPRESSION` is only used as an input when the
+   * default `references` method builds the result array to avoid
+   * repeatedly allocating an empty array.
+   */
+  NamedReference[] EMPTY_NAMED_REFERENCE = new NamedReference[0];
+
   /**
* Format the expression as a human readable SQL-like string.
*/
@@ -44,7 +53,12 @@ public interface Expression {
* List of fields or columns that are referenced by this expression.
*/
   default NamedReference[] references() {
-return Arrays.stream(children()).map(e -> e.references())
-  .flatMap(Arrays::stream).distinct().toArray(NamedReference[]::new);
+// SPARK-40398: Replace `Arrays.stream()...distinct()`
+// to this for perf gain, the result order is not important.
+Set set = new HashSet<>();
+for (Expression e : children()) {
+  Collections.addAll(set, 

[spark] branch branch-3.2 updated: [SPARK-40470][SQL] Handle GetArrayStructFields and GetMapValue in "arrays_zip" function

2022-09-16 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 1b84e44fce7 [SPARK-40470][SQL] Handle GetArrayStructFields and 
GetMapValue in "arrays_zip" function
1b84e44fce7 is described below

commit 1b84e44fce7da84382c4874fe3875d55c6647ddf
Author: Ivan Sadikov 
AuthorDate: Fri Sep 16 22:05:03 2022 +0900

[SPARK-40470][SQL] Handle GetArrayStructFields and GetMapValue in 
"arrays_zip" function

### What changes were proposed in this pull request?

This is a follow-up for https://github.com/apache/spark/pull/37833.

The PR fixes column names in `arrays_zip` function for the cases when 
`GetArrayStructFields` and `GetMapValue` expressions are used (see unit tests 
for more details).

Before the patch, the column names would be indexes or an AnalysisException 
would be thrown in the case of `GetArrayStructFields` example.

### Why are the changes needed?

Fixes an inconsistency issue in Spark 3.2 and onwards where the fields 
would be labeled as indexes instead of column names.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added unit tests that reproduce the issue and confirmed that the patch 
fixes them.

Closes #37911 from sadikovi/SPARK-40470.

Authored-by: Ivan Sadikov 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 9b0f979141ba2c4124d96bc5da69ea5cac51df0d)
Signed-off-by: Hyukjin Kwon 
---
 .../expressions/collectionOperations.scala |  4 +-
 .../apache/spark/sql/DataFrameFunctionsSuite.scala | 45 ++
 2 files changed, 48 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 6f66450844f..9919b31ca12 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -191,7 +191,9 @@ case class ArraysZip(children: Seq[Expression], names: 
Seq[Expression])
 case (u: UnresolvedAttribute, _) => Literal(u.nameParts.last)
 case (e: NamedExpression, _) if e.resolved => Literal(e.name)
 case (e: NamedExpression, _) => NamePlaceholder
-case (e: GetStructField, _) => Literal(e.extractFieldName)
+case (g: GetStructField, _) => Literal(g.extractFieldName)
+case (g: GetArrayStructFields, _) => Literal(g.field.name)
+case (g: GetMapValue, _) => Literal(g.key)
 case (_, idx) => Literal(idx.toString)
   })
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 820d760bd72..8e43de53c34 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -641,6 +641,51 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
 assert(fieldNames.toSeq === Seq("arr_1", "arr_2", "arr_3"))
   }
 
+  test("SPARK-40470: array_zip should return field names in 
GetArrayStructFields") {
+val df = spark.read.json(Seq(
+  """
+  {
+"arr": [
+  {
+"obj": {
+  "nested": {
+"field1": [1],
+"field2": [2]
+  }
+}
+  }
+]
+  }
+  """).toDS())
+
+val res = df
+  .selectExpr("arrays_zip(arr.obj.nested.field1, arr.obj.nested.field2) as 
arr")
+  .select(col("arr.field1"), col("arr.field2"))
+
+val fieldNames = res.schema.fieldNames
+assert(fieldNames.toSeq === Seq("field1", "field2"))
+
+checkAnswer(res, Row(Seq(Seq(1)), Seq(Seq(2))) :: Nil)
+  }
+
+  test("SPARK-40470: arrays_zip should return field names in GetMapValue") {
+val df = spark.sql("""
+  select
+map(
+  'arr_1', array(1, 2),
+  'arr_2', array(3, 4)
+) as map_obj
+  """)
+
+val res = df.selectExpr("arrays_zip(map_obj.arr_1, map_obj.arr_2) as arr")
+
+val fieldNames = res.schema.head.dataType.asInstanceOf[ArrayType]
+  .elementType.asInstanceOf[StructType].fieldNames
+assert(fieldNames.toSeq === Seq("arr_1", "arr_2"))
+
+checkAnswer(res, Row(Seq(Row(1, 3), Row(2, 4
+  }
+
   def testSizeOfMap(sizeOfNull: Any): Unit = {
 val df = Seq(
   (Map[Int, Int](1 -> 1, 2 -> 2), "x"),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For 

[spark] branch branch-3.3 updated: [SPARK-40470][SQL] Handle GetArrayStructFields and GetMapValue in "arrays_zip" function

2022-09-16 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new b9a514ea051 [SPARK-40470][SQL] Handle GetArrayStructFields and 
GetMapValue in "arrays_zip" function
b9a514ea051 is described below

commit b9a514ea0519e2da21efe2201c7f888be2640458
Author: Ivan Sadikov 
AuthorDate: Fri Sep 16 22:05:03 2022 +0900

[SPARK-40470][SQL] Handle GetArrayStructFields and GetMapValue in 
"arrays_zip" function

### What changes were proposed in this pull request?

This is a follow-up for https://github.com/apache/spark/pull/37833.

The PR fixes column names in `arrays_zip` function for the cases when 
`GetArrayStructFields` and `GetMapValue` expressions are used (see unit tests 
for more details).

Before the patch, the column names would be indexes or an AnalysisException 
would be thrown in the case of `GetArrayStructFields` example.

### Why are the changes needed?

Fixes an inconsistency issue in Spark 3.2 and onwards where the fields 
would be labeled as indexes instead of column names.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I added unit tests that reproduce the issue and confirmed that the patch 
fixes them.

Closes #37911 from sadikovi/SPARK-40470.

Authored-by: Ivan Sadikov 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 9b0f979141ba2c4124d96bc5da69ea5cac51df0d)
Signed-off-by: Hyukjin Kwon 
---
 .../expressions/collectionOperations.scala |  4 +-
 .../apache/spark/sql/DataFrameFunctionsSuite.scala | 45 ++
 2 files changed, 48 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 05a273763b9..c4bf65bb8ab 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -267,7 +267,9 @@ case class ArraysZip(children: Seq[Expression], names: 
Seq[Expression])
 case (u: UnresolvedAttribute, _) => Literal(u.nameParts.last)
 case (e: NamedExpression, _) if e.resolved => Literal(e.name)
 case (e: NamedExpression, _) => NamePlaceholder
-case (e: GetStructField, _) => Literal(e.extractFieldName)
+case (g: GetStructField, _) => Literal(g.extractFieldName)
+case (g: GetArrayStructFields, _) => Literal(g.field.name)
+case (g: GetMapValue, _) => Literal(g.key)
 case (_, idx) => Literal(idx.toString)
   })
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index a9c17045812..697cce9b50d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -740,6 +740,51 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
 assert(fieldNames.toSeq === Seq("arr_1", "arr_2", "arr_3"))
   }
 
+  test("SPARK-40470: array_zip should return field names in 
GetArrayStructFields") {
+val df = spark.read.json(Seq(
+  """
+  {
+"arr": [
+  {
+"obj": {
+  "nested": {
+"field1": [1],
+"field2": [2]
+  }
+}
+  }
+]
+  }
+  """).toDS())
+
+val res = df
+  .selectExpr("arrays_zip(arr.obj.nested.field1, arr.obj.nested.field2) as 
arr")
+  .select(col("arr.field1"), col("arr.field2"))
+
+val fieldNames = res.schema.fieldNames
+assert(fieldNames.toSeq === Seq("field1", "field2"))
+
+checkAnswer(res, Row(Seq(Seq(1)), Seq(Seq(2))) :: Nil)
+  }
+
+  test("SPARK-40470: arrays_zip should return field names in GetMapValue") {
+val df = spark.sql("""
+  select
+map(
+  'arr_1', array(1, 2),
+  'arr_2', array(3, 4)
+) as map_obj
+  """)
+
+val res = df.selectExpr("arrays_zip(map_obj.arr_1, map_obj.arr_2) as arr")
+
+val fieldNames = res.schema.head.dataType.asInstanceOf[ArrayType]
+  .elementType.asInstanceOf[StructType].fieldNames
+assert(fieldNames.toSeq === Seq("arr_1", "arr_2"))
+
+checkAnswer(res, Row(Seq(Row(1, 3), Row(2, 4
+  }
+
   def testSizeOfMap(sizeOfNull: Any): Unit = {
 val df = Seq(
   (Map[Int, Int](1 -> 1, 2 -> 2), "x"),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For 

[spark] branch master updated (50a95a456c8 -> 9b0f979141b)

2022-09-16 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 50a95a456c8 [SPARK-40467][SS] Split FlatMapGroupsWithState down to 
multiple test suites
 add 9b0f979141b [SPARK-40470][SQL] Handle GetArrayStructFields and 
GetMapValue in "arrays_zip" function

No new revisions were added by this update.

Summary of changes:
 .../expressions/collectionOperations.scala |  4 +-
 .../apache/spark/sql/DataFrameFunctionsSuite.scala | 45 ++
 2 files changed, 48 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (9c188ce6f3e -> 50a95a456c8)

2022-09-16 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 9c188ce6f3e [SPARK-40463][INFRA] Update gpg's keyserver
 add 50a95a456c8 [SPARK-40467][SS] Split FlatMapGroupsWithState down to 
multiple test suites

No new revisions were added by this update.

Summary of changes:
 .../streaming/FlatMapGroupsWithStateSuite.scala| 758 +
 ...atMapGroupsWithStateWithInitialStateSuite.scala | 365 ++
 .../spark/sql/streaming/GroupStateSuite.scala  | 458 +
 3 files changed, 825 insertions(+), 756 deletions(-)
 create mode 100644 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateWithInitialStateSuite.scala
 create mode 100644 
sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (1684412a208 -> 9c188ce6f3e)

2022-09-16 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 1684412a208 Revert "Update Dockerfile"
 add 9c188ce6f3e [SPARK-40463][INFRA] Update gpg's keyserver

No new revisions were added by this update.

Summary of changes:
 dev/create-release/spark-rm/Dockerfile | 2 +-
 dev/infra/Dockerfile   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (cd8c8746c34 -> 1684412a208)

2022-09-16 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from cd8c8746c34 [SPARK-40196][PYTHON][PS][FOLLOWUP] Skip 
SparkFunctionsTests.test_repeat
 add 1684412a208 Revert "Update Dockerfile"

No new revisions were added by this update.

Summary of changes:
 dev/infra/Dockerfile | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-40196][PYTHON][PS][FOLLOWUP] Skip SparkFunctionsTests.test_repeat

2022-09-16 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cd8c8746c34 [SPARK-40196][PYTHON][PS][FOLLOWUP] Skip 
SparkFunctionsTests.test_repeat
cd8c8746c34 is described below

commit cd8c8746c344fc48ba008830a0b816be05d98ca4
Author: Yikun Jiang 
AuthorDate: Fri Sep 16 15:07:49 2022 +0800

[SPARK-40196][PYTHON][PS][FOLLOWUP] Skip SparkFunctionsTests.test_repeat

### What changes were proposed in this pull request?
Mark `SparkFunctionsTests.test_repeat` as placeholder.

### Why are the changes needed?
```
  test_repeat 
(pyspark.pandas.tests.test_spark_functions.SparkFunctionsTests) ... FAIL 
(0.052s)

==
FAIL [0.052s]: test_repeat 
(pyspark.pandas.tests.test_spark_functions.SparkFunctionsTests)
--
Traceback (most recent call last):
  File 
"/__w/spark/spark/python/pyspark/pandas/tests/test_spark_functions.py", line 
28, in test_repeat
self.assertTrue(spark_column_equals(SF.repeat(F.lit(1), 2), 
F.repeat(F.lit(1), 2)))
AssertionError: False is not true

--
Ran 1 test in 8.471s
```

According to 
https://github.com/apache/spark/pull/37888#discussion_r971408190 we'd better 
skip it first.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
CI passed

Closes #37912 from Yikun/37888.

Authored-by: Yikun Jiang 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/pandas/tests/test_spark_functions.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/pandas/tests/test_spark_functions.py 
b/python/pyspark/pandas/tests/test_spark_functions.py
index 4b95a8eb7d5..c18dc30240c 100644
--- a/python/pyspark/pandas/tests/test_spark_functions.py
+++ b/python/pyspark/pandas/tests/test_spark_functions.py
@@ -25,7 +25,8 @@ from pyspark.testing.pandasutils import PandasOnSparkTestCase
 
 class SparkFunctionsTests(PandasOnSparkTestCase):
 def test_repeat(self):
-self.assertTrue(spark_column_equals(SF.repeat(F.lit(1), 2), 
F.repeat(F.lit(1), 2)))
+# TODO: Placeholder
+pass
 
 
 if __name__ == "__main__":


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (32fbd7e8325 -> 694cac63da3)

2022-09-16 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 32fbd7e8325 [SPARK-40196][PS][FOLLOWUP] SF.lit` -> `F.lit` in 
`window.quantile`
 add 694cac63da3 Update Dockerfile

No new revisions were added by this update.

Summary of changes:
 dev/infra/Dockerfile | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org