[spark] branch branch-3.2 updated: [SPARK-36132][SS][SQL] Support initial state for batch mode of flatMapGroupsWithState

2021-07-20 Thread tdas
This is an automated email from the ASF dual-hosted git repository.

tdas pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 0d60cb5  [SPARK-36132][SS][SQL] Support initial state for batch mode 
of flatMapGroupsWithState
0d60cb5 is described below

commit 0d60cb51c01c13b0febe2ff7601db7303bfff56d
Author: Rahul Mahadev 
AuthorDate: Wed Jul 21 01:48:58 2021 -0400

[SPARK-36132][SS][SQL] Support initial state for batch mode of 
flatMapGroupsWithState

### What changes were proposed in this pull request?
Adding support for accepting an initial state with flatMapGroupsWithState 
in batch mode.

### Why are the changes needed?
SPARK-35897  added support for accepting an initial state for streaming 
queries using flatMapGroupsWithState. the code flow is separate for batch and 
streaming and required a different PR.

### Does this PR introduce _any_ user-facing change?

Yes as discussed above flatMapGroupsWithState in batch mode can accept an 
initialState, previously this would throw an UnsupportedOperationException

### How was this patch tested?

Added relevant unit tests in FlatMapGroupsWithStateSuite and modified the  
tests `JavaDatasetSuite`

Closes #6 from rahulsmahadev/flatMapGroupsWithStateBatch.

Authored-by: Rahul Mahadev 
Signed-off-by: Tathagata Das 
(cherry picked from commit efcce23b913ce0de961ac261050e3d6dbf261f6e)
Signed-off-by: Tathagata Das 
---
 .../analysis/UnsupportedOperationChecker.scala |  6 --
 .../spark/sql/execution/SparkStrategies.scala  | 11 +++-
 .../streaming/FlatMapGroupsWithStateExec.scala | 71 +-
 .../org/apache/spark/sql/JavaDatasetSuite.java | 18 +-
 .../streaming/FlatMapGroupsWithStateSuite.scala| 52 
 5 files changed, 130 insertions(+), 28 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 13c7f75..321725d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -37,12 +37,6 @@ object UnsupportedOperationChecker extends Logging {
   case p if p.isStreaming =>
 throwError("Queries with streaming sources must be executed with 
writeStream.start()")(p)
 
-  case f: FlatMapGroupsWithState =>
-if (f.hasInitialState) {
-  throwError("Initial state is not supported in 
[flatMap|map]GroupsWithState" +
-" operation on a batch DataFrame/Dataset")(f)
-}
-
   case _ =>
 }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 6d10fa8..7624b15 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -690,9 +690,14 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case logical.MapGroups(f, key, value, grouping, data, objAttr, child) =>
 execution.MapGroupsExec(f, key, value, grouping, data, objAttr, 
planLater(child)) :: Nil
   case logical.FlatMapGroupsWithState(
-  f, key, value, grouping, data, output, _, _, _, timeout, _, _, _, _, 
_, child) =>
-execution.MapGroupsExec(
-  f, key, value, grouping, data, output, timeout, planLater(child)) :: 
Nil
+  f, keyDeserializer, valueDeserializer, grouping, data, output, 
stateEncoder, outputMode,
+  isFlatMapGroupsWithState, timeout, hasInitialState, 
initialStateGroupAttrs,
+  initialStateDataAttrs, initialStateDeserializer, initialState, 
child) =>
+FlatMapGroupsWithStateExec.generateSparkPlanForBatchQueries(
+  f, keyDeserializer, valueDeserializer, initialStateDeserializer, 
grouping,
+  initialStateGroupAttrs, data, initialStateDataAttrs, output, timeout,
+  hasInitialState, planLater(initialState), planLater(child)
+) :: Nil
   case logical.CoGroup(f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, 
oAttr, left, right) =>
 execution.CoGroupExec(
   f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index 03694d4..a00a622 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ 

[spark] branch master updated: [SPARK-36132][SS][SQL] Support initial state for batch mode of flatMapGroupsWithState

2021-07-20 Thread tdas
This is an automated email from the ASF dual-hosted git repository.

tdas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new efcce23  [SPARK-36132][SS][SQL] Support initial state for batch mode 
of flatMapGroupsWithState
efcce23 is described below

commit efcce23b913ce0de961ac261050e3d6dbf261f6e
Author: Rahul Mahadev 
AuthorDate: Wed Jul 21 01:48:58 2021 -0400

[SPARK-36132][SS][SQL] Support initial state for batch mode of 
flatMapGroupsWithState

### What changes were proposed in this pull request?
Adding support for accepting an initial state with flatMapGroupsWithState 
in batch mode.

### Why are the changes needed?
SPARK-35897  added support for accepting an initial state for streaming 
queries using flatMapGroupsWithState. the code flow is separate for batch and 
streaming and required a different PR.

### Does this PR introduce _any_ user-facing change?

Yes as discussed above flatMapGroupsWithState in batch mode can accept an 
initialState, previously this would throw an UnsupportedOperationException

### How was this patch tested?

Added relevant unit tests in FlatMapGroupsWithStateSuite and modified the  
tests `JavaDatasetSuite`

Closes #6 from rahulsmahadev/flatMapGroupsWithStateBatch.

Authored-by: Rahul Mahadev 
Signed-off-by: Tathagata Das 
---
 .../analysis/UnsupportedOperationChecker.scala |  6 --
 .../spark/sql/execution/SparkStrategies.scala  | 11 +++-
 .../streaming/FlatMapGroupsWithStateExec.scala | 71 +-
 .../org/apache/spark/sql/JavaDatasetSuite.java | 18 +-
 .../streaming/FlatMapGroupsWithStateSuite.scala| 52 
 5 files changed, 130 insertions(+), 28 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 13c7f75..321725d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -37,12 +37,6 @@ object UnsupportedOperationChecker extends Logging {
   case p if p.isStreaming =>
 throwError("Queries with streaming sources must be executed with 
writeStream.start()")(p)
 
-  case f: FlatMapGroupsWithState =>
-if (f.hasInitialState) {
-  throwError("Initial state is not supported in 
[flatMap|map]GroupsWithState" +
-" operation on a batch DataFrame/Dataset")(f)
-}
-
   case _ =>
 }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 6d10fa8..7624b15 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -690,9 +690,14 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case logical.MapGroups(f, key, value, grouping, data, objAttr, child) =>
 execution.MapGroupsExec(f, key, value, grouping, data, objAttr, 
planLater(child)) :: Nil
   case logical.FlatMapGroupsWithState(
-  f, key, value, grouping, data, output, _, _, _, timeout, _, _, _, _, 
_, child) =>
-execution.MapGroupsExec(
-  f, key, value, grouping, data, output, timeout, planLater(child)) :: 
Nil
+  f, keyDeserializer, valueDeserializer, grouping, data, output, 
stateEncoder, outputMode,
+  isFlatMapGroupsWithState, timeout, hasInitialState, 
initialStateGroupAttrs,
+  initialStateDataAttrs, initialStateDeserializer, initialState, 
child) =>
+FlatMapGroupsWithStateExec.generateSparkPlanForBatchQueries(
+  f, keyDeserializer, valueDeserializer, initialStateDeserializer, 
grouping,
+  initialStateGroupAttrs, data, initialStateDataAttrs, output, timeout,
+  hasInitialState, planLater(initialState), planLater(child)
+) :: Nil
   case logical.CoGroup(f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, 
oAttr, left, right) =>
 execution.CoGroupExec(
   f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
index 03694d4..a00a622 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala
@@ -309,9 +309,7 @@ case 

[spark] branch branch-3.2 updated (6041d1c -> 0b14ab1)

2021-07-20 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 6041d1c  [SPARK-36030][SQL][FOLLOW-UP] Avoid procedure syntax 
deprecated in Scala 2.13
 add 0b14ab1  [SPARK-36030][SQL][FOLLOW-UP][3.2] Remove duplicated test 
suiteRemove duplicated test suite

No new revisions were added by this update.

Summary of changes:
 .../FileFormatDataWriterMetricSuite.scala  | 96 --
 1 file changed, 96 deletions(-)
 delete mode 100644 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriterMetricSuite.scala

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (99006e5 -> df798ed)

2021-07-20 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 99006e5  [SPARK-36030][SQL][FOLLOW-UP] Avoid procedure syntax 
deprecated in Scala 2.13
 add df798ed  [SPARK-36030][SQL][FOLLOW-UP] Remove duplicated test suite

No new revisions were added by this update.

Summary of changes:
 .../FileFormatDataWriterMetricSuite.scala  | 96 --
 1 file changed, 96 deletions(-)
 delete mode 100644 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriterMetricSuite.scala

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated (86d1fb4 -> 6041d1c)

2021-07-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 86d1fb4  [SPARK-36030][SQL] Support DS v2 metrics at writing path
 add 6041d1c  [SPARK-36030][SQL][FOLLOW-UP] Avoid procedure syntax 
deprecated in Scala 2.13

No new revisions were added by this update.

Summary of changes:
 .../sql/execution/datasources/FileFormatDataWriterMetricSuite.scala | 2 +-
 .../spark/sql/execution/datasources/InMemoryTableMetricSuite.scala  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (2653201 -> 99006e5)

2021-07-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 2653201  [SPARK-36030][SQL] Support DS v2 metrics at writing path
 add 99006e5  [SPARK-36030][SQL][FOLLOW-UP] Avoid procedure syntax 
deprecated in Scala 2.13

No new revisions were added by this update.

Summary of changes:
 .../sql/execution/datasources/FileFormatDataWriterMetricSuite.scala | 2 +-
 .../spark/sql/execution/datasources/InMemoryTableMetricSuite.scala  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated (ab80d3c -> 86d1fb4)

2021-07-20 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git.


from ab80d3c  [SPARK-35027][CORE] Close the inputStream in FileAppender 
when writin…
 add 86d1fb4  [SPARK-36030][SQL] Support DS v2 metrics at writing path

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/connector/read/Scan.java  |   3 +-
 .../spark/sql/connector/write/DataWriter.java  |   9 ++
 .../apache/spark/sql/connector/write/Write.java|   9 ++
 .../sql/connector/catalog/InMemoryTable.scala  |  22 +
 .../datasources/FileFormatDataWriter.scala |  51 ---
 .../datasources/v2/DataSourceV2Strategy.scala  |   8 +-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  36 +++-
 .../spark/sql/execution/metric/CustomMetrics.scala |   5 +-
 .../execution/streaming/MicroBatchExecution.scala  |   4 +-
 .../sql/execution/streaming/StreamExecution.scala  |   6 +-
 .../streaming/continuous/ContinuousExecution.scala |   6 +-
 .../streaming/continuous/ContinuousWriteRDD.scala  |  11 ++-
 .../continuous/WriteToContinuousDataSource.scala   |   4 +-
 .../WriteToContinuousDataSourceExec.scala  |  11 ++-
 .../sources/WriteToMicroBatchDataSource.scala  |   6 +-
 .../sql/connector/SimpleWritableDataSource.scala   |   8 +-
 .../FileFormatDataWriterMetricSuite.scala  |  96 +++
 .../datasources/InMemoryTableMetricSuite.scala |  96 +++
 .../sql/execution/metric/CustomMetricsSuite.scala  |  10 +-
 .../execution/ui/SQLAppStatusListenerSuite.scala   | 102 -
 20 files changed, 456 insertions(+), 47 deletions(-)
 create mode 100644 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriterMetricSuite.scala
 create mode 100644 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (305d563 -> 2653201)

2021-07-20 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 305d563  [SPARK-36153][SQL][DOCS] Update transform doc to match the 
current code
 add 2653201  [SPARK-36030][SQL] Support DS v2 metrics at writing path

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/connector/read/Scan.java  |   3 +-
 .../spark/sql/connector/write/DataWriter.java  |   9 ++
 .../apache/spark/sql/connector/write/Write.java|   9 ++
 .../sql/connector/catalog/InMemoryTable.scala  |  22 +
 .../datasources/FileFormatDataWriter.scala |  51 ---
 .../datasources/v2/DataSourceV2Strategy.scala  |   8 +-
 .../datasources/v2/WriteToDataSourceV2Exec.scala   |  36 +++-
 .../spark/sql/execution/metric/CustomMetrics.scala |   5 +-
 .../execution/streaming/MicroBatchExecution.scala  |   4 +-
 .../sql/execution/streaming/StreamExecution.scala  |   6 +-
 .../streaming/continuous/ContinuousExecution.scala |   6 +-
 .../streaming/continuous/ContinuousWriteRDD.scala  |  11 ++-
 .../continuous/WriteToContinuousDataSource.scala   |   4 +-
 .../WriteToContinuousDataSourceExec.scala  |  11 ++-
 .../sources/WriteToMicroBatchDataSource.scala  |   6 +-
 .../sql/connector/SimpleWritableDataSource.scala   |   8 +-
 .../FileFormatDataWriterMetricSuite.scala  |  96 +++
 .../datasources/InMemoryTableMetricSuite.scala |  96 +++
 .../sql/execution/metric/CustomMetricsSuite.scala  |  10 +-
 .../execution/ui/SQLAppStatusListenerSuite.scala   | 102 -
 20 files changed, 456 insertions(+), 47 deletions(-)
 create mode 100644 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriterMetricSuite.scala
 create mode 100644 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-36153][SQL][DOCS] Update transform doc to match the current code

2021-07-20 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 305d563  [SPARK-36153][SQL][DOCS] Update transform doc to match the 
current code
305d563 is described below

commit 305d563329bfb7a4ef582655b88a72d826a4e8aa
Author: Angerszh 
AuthorDate: Tue Jul 20 21:38:37 2021 -0500

[SPARK-36153][SQL][DOCS] Update transform doc to match the current code

### What changes were proposed in this pull request?
Update trasform's doc to latest code.

![image](https://user-images.githubusercontent.com/46485123/126175747-672cccbc-4e42-440f-8f1e-f00b6dc1be5f.png)

### Why are the changes needed?
keep consistence

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
No

Closes #33362 from AngersZh/SPARK-36153.

Lead-authored-by: Angerszh 
Co-authored-by: AngersZh 
Signed-off-by: Sean Owen 
---
 docs/sql-ref-syntax-qry-select-transform.md | 101 
 1 file changed, 88 insertions(+), 13 deletions(-)

diff --git a/docs/sql-ref-syntax-qry-select-transform.md 
b/docs/sql-ref-syntax-qry-select-transform.md
index 21966f2..5a38e14 100644
--- a/docs/sql-ref-syntax-qry-select-transform.md
+++ b/docs/sql-ref-syntax-qry-select-transform.md
@@ -24,6 +24,15 @@ license: |
 The `TRANSFORM` clause is used to specify a Hive-style transform query 
specification 
 to transform the inputs by running a user-specified command or script.
 
+Spark's script transform supports two modes:
+
+  1. Hive support disabled: Spark script transform can run without 
`spark.sql.catalogImplementation=true` 
+  or `SparkSession.builder.enableHiveSupport()`. In this case, now Spark only 
uses the script transform with 
+  `ROW FORMAT DELIMITED` and treats all values passed to the script as 
strings. 
+  2. Hive support enabled: When Spark is run with 
`spark.sql.catalogImplementation=true` or Spark SQL is started
+  with `SparkSession.builder.enableHiveSupport()`, Spark can use the script 
transform with both Hive SerDe and 
+  `ROW FORMAT DELIMITED`.
+
 ### Syntax
 
 ```sql
@@ -57,19 +66,85 @@ SELECT TRANSFORM ( expression [ , ... ] )
 
 Specifies a command or a path to script to process data.
 
-### SerDe behavior
-
-Spark uses the Hive SerDe `org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe` 
by default, so columns will be casted
-to `STRING` and combined by tabs before feeding to the user script. All `NULL` 
values will be converted
-to the literal string `"\N"` in order to differentiate `NULL` values from 
empty strings. The standard output of the
-user script will be treated as tab-separated `STRING` columns, any cell 
containing only `"\N"` will be re-interpreted
-as a `NULL` value, and then the resulting STRING column will be cast to the 
data type specified in `col_type`. If the actual
-number of output columns is less than the number of specified output columns, 
insufficient output columns will be
-supplemented with `NULL`. If the actual number of output columns is more than 
the number of specified output columns,
-the output columns will only select the corresponding columns and the 
remaining part will be discarded.
-If there is no `AS` clause after `USING my_script`, an output schema will be 
`key: STRING, value: STRING`.
-The `key` column contains all the characters before the first tab and the 
`value` column contains the remaining characters after the first tab.
-If there is no enough tab, Spark will return `NULL` value. These defaults can 
be overridden with `ROW FORMAT SERDE` or `ROW FORMAT DELIMITED`. 
+### ROW FORMAT DELIMITED BEHAVIOR
+
+When Spark uses `ROW FORMAT DELIMITED` format:
+ - Spark uses the character `\u0001` as the default field delimiter and this 
delimiter can be overridden by `FIELDS TERMINATED BY`.
+ - Spark uses the character `\n` as the default line delimiter and this 
delimiter can be overridden by `LINES TERMINATED BY`.
+ - Spark uses a string `\N` as the default `NULL` value in order to 
differentiate `NULL` values 
+ from the literal string `NULL`. This delimiter can be overridden by `NULL 
DEFINED AS`.
+ - Spark casts all columns to `STRING` and combines columns by tabs before 
feeding to the user script.
+ For complex types such as `ARRAY`/`MAP`/`STRUCT`, Spark uses `to_json` casts 
it to an input `JSON` string and uses 
+ `from_json` to convert the result output `JSON` string to 
`ARRAY`/`MAP`/`STRUCT` data.
+ - `COLLECTION ITEMS TERMINATED BY` and `MAP KEYS TERMINATED BY` are 
delimiters to split complex data such as 
+ `ARRAY`/`MAP`/`STRUCT`, Spark uses `to_json` and `from_json` to handle 
complex data types with `JSON` format. So 
+ `COLLECTION ITEMS TERMINATED BY` and `MAP KEYS TERMINATED BY` won't work in 
default row format.
+ - The standard output of the 

[spark] branch master updated (1a8c675 -> 7ceefca)

2021-07-20 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 1a8c675  [SPARK-35027][CORE] Close the inputStream in FileAppender 
when writin…
 add 7ceefca  [SPARK-35658][DOCS] Document Parquet encryption feature in 
Spark SQL

No new revisions were added by this update.

Summary of changes:
 docs/sql-data-sources-parquet.md | 65 
 1 file changed, 65 insertions(+)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated (73645d4 -> 25caec4)

2021-07-20 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 73645d4  [SPARK-36210][SQL] Preserve column insertion order in 
Dataset.withColumns
 add 25caec4  [SPARK-35027][CORE] Close the inputStream in FileAppender 
when writin…

No new revisions were added by this update.

Summary of changes:
 .../spark/deploy/worker/ExecutorRunner.scala   |  4 +--
 .../apache/spark/util/logging/FileAppender.scala   | 37 --
 .../spark/util/logging/RollingFileAppender.scala   |  6 ++--
 .../org/apache/spark/util/FileAppenderSuite.scala  | 35 
 4 files changed, 68 insertions(+), 14 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated: [SPARK-35027][CORE] Close the inputStream in FileAppender when writin…

2021-07-20 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 8c0e2d8  [SPARK-35027][CORE] Close the inputStream in FileAppender 
when writin…
8c0e2d8 is described below

commit 8c0e2d838c2aa386bc164dd960d12312366d3732
Author: Jie 
AuthorDate: Tue Jul 20 21:23:51 2021 -0500

[SPARK-35027][CORE] Close the inputStream in FileAppender when writin…

### What changes were proposed in this pull request?

1. add "closeStreams" to FileAppender and RollingFileAppender
2. set "closeStreams" to "true" in ExecutorRunner

### Why are the changes needed?

The executor will hang when due disk full or other exceptions which 
happened in writting to outputStream: the root cause is the "inputStream" is 
not closed after the error happens:
1. ExecutorRunner creates two files appenders for pipe: one for stdout, one 
for stderr
2. FileAppender.appendStreamToFile exits the loop when writing to 
outputStream
3. FileAppender closes the outputStream, but left the inputStream which 
refers the pipe's stdout and stderr opened
4. The executor will hang when printing the log message if the pipe is full 
(no one consume the outputs)
5. From the driver side, you can see the task can't be completed for ever

With this fix, the step 4 will throw an exception, the driver can catch up 
the exception and reschedule the failed task to other executors.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add new tests for the "closeStreams" in FileAppenderSuite

Closes #33263 from jhu-chang/SPARK-35027.

Authored-by: Jie 
Signed-off-by: Sean Owen 
(cherry picked from commit 1a8c6755a1802afdb9a73793e9348d322176125a)
Signed-off-by: Sean Owen 
---
 .../spark/deploy/worker/ExecutorRunner.scala   |  4 +--
 .../apache/spark/util/logging/FileAppender.scala   | 37 --
 .../spark/util/logging/RollingFileAppender.scala   |  6 ++--
 .../org/apache/spark/util/FileAppenderSuite.scala  | 35 
 4 files changed, 68 insertions(+), 14 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 2e26ccf..974c2d6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -185,11 +185,11 @@ private[deploy] class ExecutorRunner(
 
   // Redirect its stdout and stderr to files
   val stdout = new File(executorDir, "stdout")
-  stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
+  stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true)
 
   val stderr = new File(executorDir, "stderr")
   Files.write(header, stderr, StandardCharsets.UTF_8)
-  stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
+  stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true)
 
   state = ExecutorState.RUNNING
   worker.send(ExecutorStateChanged(appId, execId, state, None, None))
diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala 
b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
index 7107be2..2243239 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -26,8 +26,12 @@ import org.apache.spark.util.{IntParam, Utils}
 /**
  * Continuously appends the data from an input stream into the given file.
  */
-private[spark] class FileAppender(inputStream: InputStream, file: File, 
bufferSize: Int = 8192)
-  extends Logging {
+private[spark] class FileAppender(
+  inputStream: InputStream,
+  file: File,
+  bufferSize: Int = 8192,
+  closeStreams: Boolean = false
+) extends Logging {
   @volatile private var outputStream: FileOutputStream = null
   @volatile private var markedForStop = false // has the appender been 
asked to stopped
 
@@ -76,7 +80,13 @@ private[spark] class FileAppender(inputStream: InputStream, 
file: File, bufferSi
   }
 }
   } {
-closeFile()
+try {
+  if (closeStreams) {
+inputStream.close()
+  }
+} finally {
+  closeFile()
+}
   }
 } catch {
   case e: Exception =>
@@ -113,7 +123,12 @@ private[spark] class FileAppender(inputStream: 
InputStream, file: File, bufferSi
 private[spark] object FileAppender extends Logging {
 
   /** Create the right appender based on Spark configuration */
-  def apply(inputStream: InputStream, file: File, conf: SparkConf): 
FileAppender = {
+  def apply(
+

[spark] branch branch-3.2 updated (e264c21 -> ab80d3c)

2021-07-20 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git.


from e264c21  [SPARK-36172][SS] Document session window into Structured 
Streaming guide doc
 add ab80d3c  [SPARK-35027][CORE] Close the inputStream in FileAppender 
when writin…

No new revisions were added by this update.

Summary of changes:
 .../spark/deploy/worker/ExecutorRunner.scala   |  4 +--
 .../apache/spark/util/logging/FileAppender.scala   | 37 --
 .../spark/util/logging/RollingFileAppender.scala   |  6 ++--
 .../org/apache/spark/util/FileAppenderSuite.scala  | 35 
 4 files changed, 68 insertions(+), 14 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (0eb31a0 -> 1a8c675)

2021-07-20 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0eb31a0  [SPARK-36172][SS] Document session window into Structured 
Streaming guide doc
 add 1a8c675  [SPARK-35027][CORE] Close the inputStream in FileAppender 
when writin…

No new revisions were added by this update.

Summary of changes:
 .../spark/deploy/worker/ExecutorRunner.scala   |  4 +--
 .../apache/spark/util/logging/FileAppender.scala   | 37 --
 .../spark/util/logging/RollingFileAppender.scala   |  6 ++--
 .../org/apache/spark/util/FileAppenderSuite.scala  | 35 
 4 files changed, 68 insertions(+), 14 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-36172][SS] Document session window into Structured Streaming guide doc

2021-07-20 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new e264c21  [SPARK-36172][SS] Document session window into Structured 
Streaming guide doc
e264c21 is described below

commit e264c21707a4c9f2df9f81a202b0568b16afa3f6
Author: Jungtaek Lim 
AuthorDate: Wed Jul 21 10:45:31 2021 +0900

[SPARK-36172][SS] Document session window into Structured Streaming guide 
doc

### What changes were proposed in this pull request?

This PR documents a new feature "native support of session window" into 
Structured Streaming guide doc.

Screenshots are following:

![스크린샷 2021-07-20 오후 5 04 
20](https://user-images.githubusercontent.com/1317309/126284848-526ec056-1028-4a70-a1f4-ae275d4b5437.png)

![스크린샷 2021-07-20 오후 3 34 
38](https://user-images.githubusercontent.com/1317309/126276763-763cf841-aef7-412a-aa03-d93273f0c850.png)

### Why are the changes needed?

This change is needed to explain a new feature to the end users.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Documentation changes.

Closes #33433 from HeartSaVioR/SPARK-36172.

Authored-by: Jungtaek Lim 
Signed-off-by: Jungtaek Lim 
(cherry picked from commit 0eb31a06d6f2b7583b6a9c646baeff58094f8d6c)
Signed-off-by: Jungtaek Lim 
---
 .../img/structured-streaming-time-window-types.jpg | Bin 0 -> 56637 bytes
 docs/img/structured-streaming.pptx | Bin 1126657 -> 1130887 
bytes
 docs/structured-streaming-programming-guide.md |  71 +
 3 files changed, 71 insertions(+)

diff --git a/docs/img/structured-streaming-time-window-types.jpg 
b/docs/img/structured-streaming-time-window-types.jpg
new file mode 100644
index 000..7e0ad1b
Binary files /dev/null and 
b/docs/img/structured-streaming-time-window-types.jpg differ
diff --git a/docs/img/structured-streaming.pptx 
b/docs/img/structured-streaming.pptx
index 2ffd9f2..b35bf75 100644
Binary files a/docs/img/structured-streaming.pptx and 
b/docs/img/structured-streaming.pptx differ
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 1eabcdd..3d02eb7 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1063,6 +1063,77 @@ then drops intermediate state of a window < watermark, 
and appends the final
 counts to the Result Table/sink. For example, the final counts of window 
`12:00 - 12:10` is 
 appended to the Result Table only after the watermark is updated to `12:11`. 
 
+ Types of time windows
+
+Spark supports three types of time windows: tumbling (fixed), sliding and 
session.
+
+![The types of time windows](img/structured-streaming-time-window-types.jpg)
+
+Tumbling windows are a series of fixed-sized, non-overlapping and contiguous 
time intervals. An input
+can only be bound to a single window.
+
+Sliding windows are similar to the tumbling windows from the point of being 
"fixed-sized", but windows
+can overlap if the duration of slide is smaller than the duration of window, 
and in this case an input
+can be bound to the multiple windows.
+
+Tumbling and sliding window use `window` function, which has been described on 
above examples.
+
+Session windows have different characteristic compared to the previous two 
types. Session window has a dynamic size
+of the window length, depending on the inputs. A session window starts with an 
input, and expands itself
+if following input has been received within gap duration. A session window 
closes when there's no input
+received within gap duration after receiving the latest input.
+
+Session window uses `session_window` function. The usage of the function is 
similar to the `window` function.
+
+
+
+
+{% highlight scala %}
+import spark.implicits._
+
+val events = ... // streaming DataFrame of schema { timestamp: Timestamp, 
userId: String }
+
+// Group the data by session window and userId, and compute the count of each 
group
+val sessionizedCounts = events
+.withWatermark("timestamp", "10 minutes")
+.groupBy(
+session_window($"timestamp", "5 minutes"),
+$"userId")
+.count()
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+Dataset events = ... // streaming DataFrame of schema { timestamp: 
Timestamp, userId: String }
+
+// Group the data by session window and userId, and compute the count of each 
group
+Dataset sessionizedCounts = events
+.withWatermark("timestamp", "10 minutes")
+.groupBy(
+session_window(col("timestamp"), "5 minutes"),
+col("userId"))
+.count();
+{% endhighlight %}
+
+
+
+{% highlight python %}
+events = ...  # streaming DataFrame of schema { timestamp: Timestamp, 

[spark] branch master updated (376fadc -> 0eb31a0)

2021-07-20 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 376fadc  [SPARK-36186][PYTHON] Add as_ordered/as_unordered to 
CategoricalAccessor and CategoricalIndex
 add 0eb31a0  [SPARK-36172][SS] Document session window into Structured 
Streaming guide doc

No new revisions were added by this update.

Summary of changes:
 .../img/structured-streaming-time-window-types.jpg | Bin 0 -> 56637 bytes
 docs/img/structured-streaming.pptx | Bin 1126657 -> 1130887 
bytes
 docs/structured-streaming-programming-guide.md |  71 +
 3 files changed, 71 insertions(+)
 create mode 100644 docs/img/structured-streaming-time-window-types.jpg

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-36186][PYTHON] Add as_ordered/as_unordered to CategoricalAccessor and CategoricalIndex

2021-07-20 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new a3a13da  [SPARK-36186][PYTHON] Add as_ordered/as_unordered to 
CategoricalAccessor and CategoricalIndex
a3a13da is described below

commit a3a13da26c19b4241bf2f76273a82fe8598eddf5
Author: Takuya UESHIN 
AuthorDate: Tue Jul 20 18:23:54 2021 -0700

[SPARK-36186][PYTHON] Add as_ordered/as_unordered to CategoricalAccessor 
and CategoricalIndex

### What changes were proposed in this pull request?

Add `as_ordered`/`as_unordered` to `CategoricalAccessor` and 
`CategoricalIndex`.

### Why are the changes needed?

We should implement `as_ordered`/`as_unordered` in `CategoricalAccessor` 
and `CategoricalIndex` yet.

### Does this PR introduce _any_ user-facing change?

Yes, users will be able to use `as_ordered`/`as_unordered`.

### How was this patch tested?

Added some tests.

Closes #33400 from ueshin/issues/SPARK-36186/as_ordered_unordered.

Authored-by: Takuya UESHIN 
Signed-off-by: Takuya UESHIN 
(cherry picked from commit 376fadc89cffac97aebe49a7cf4a4bc978b1d09e)
Signed-off-by: Takuya UESHIN 
---
 .../source/reference/pyspark.pandas/indexing.rst   |   2 +
 .../source/reference/pyspark.pandas/series.rst |   2 +
 python/pyspark/pandas/categorical.py   | 116 +++--
 python/pyspark/pandas/indexes/category.py  |  72 -
 python/pyspark/pandas/missing/indexes.py   |   2 -
 .../pyspark/pandas/tests/indexes/test_category.py  |  10 ++
 python/pyspark/pandas/tests/test_categorical.py|  22 
 7 files changed, 214 insertions(+), 12 deletions(-)

diff --git a/python/docs/source/reference/pyspark.pandas/indexing.rst 
b/python/docs/source/reference/pyspark.pandas/indexing.rst
index e91f699..4f84d91 100644
--- a/python/docs/source/reference/pyspark.pandas/indexing.rst
+++ b/python/docs/source/reference/pyspark.pandas/indexing.rst
@@ -175,6 +175,8 @@ Categorical components
CategoricalIndex.codes
CategoricalIndex.categories
CategoricalIndex.ordered
+   CategoricalIndex.as_ordered
+   CategoricalIndex.as_unordered
 
 .. _api.multiindex:
 
diff --git a/python/docs/source/reference/pyspark.pandas/series.rst 
b/python/docs/source/reference/pyspark.pandas/series.rst
index a199d70..b718d79 100644
--- a/python/docs/source/reference/pyspark.pandas/series.rst
+++ b/python/docs/source/reference/pyspark.pandas/series.rst
@@ -401,6 +401,8 @@ the ``Series.cat`` accessor.
Series.cat.categories
Series.cat.ordered
Series.cat.codes
+   Series.cat.as_ordered
+   Series.cat.as_unordered
 
 .. _api.series.plot:
 
diff --git a/python/pyspark/pandas/categorical.py 
b/python/pyspark/pandas/categorical.py
index 3495b35..b8cc88c 100644
--- a/python/pyspark/pandas/categorical.py
+++ b/python/pyspark/pandas/categorical.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from typing import TYPE_CHECKING, cast
+from typing import Optional, TYPE_CHECKING, cast
 
 import pandas as pd
 from pandas.api.types import CategoricalDtype
@@ -62,6 +62,10 @@ class CategoricalAccessor(object):
 self._data = series
 
 @property
+def _dtype(self) -> CategoricalDtype:
+return cast(CategoricalDtype, self._data.dtype)
+
+@property
 def categories(self) -> pd.Index:
 """
 The categories of this categorical.
@@ -82,7 +86,7 @@ class CategoricalAccessor(object):
 >>> s.cat.categories
 Index(['a', 'b', 'c'], dtype='object')
 """
-return cast(CategoricalDtype, self._data.dtype).categories
+return self._dtype.categories
 
 @categories.setter
 def categories(self, categories: pd.Index) -> None:
@@ -109,7 +113,7 @@ class CategoricalAccessor(object):
 >>> s.cat.ordered
 False
 """
-return cast(CategoricalDtype, self._data.dtype).ordered
+return self._dtype.ordered
 
 @property
 def codes(self) -> "ps.Series":
@@ -152,11 +156,109 @@ class CategoricalAccessor(object):
 def add_categories(self, new_categories: pd.Index, inplace: bool = False) 
-> "ps.Series":
 raise NotImplementedError()
 
-def as_ordered(self, inplace: bool = False) -> "ps.Series":
-raise NotImplementedError()
+def _set_ordered(self, *, ordered: bool, inplace: bool) -> 
Optional["ps.Series"]:
+from pyspark.pandas.frame import DataFrame
+
+if self.ordered == ordered:
+if inplace:
+return None
+else:
+psser = self._data
+else:
+internal = self._data._psdf._internal.with_new_spark_column(
+self._data._column_label,
+

[spark] branch master updated (c0d84e6 -> 376fadc)

2021-07-20 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from c0d84e6  [SPARK-36222][SQL] Step by days in the Sequence expression 
for dates
 add 376fadc  [SPARK-36186][PYTHON] Add as_ordered/as_unordered to 
CategoricalAccessor and CategoricalIndex

No new revisions were added by this update.

Summary of changes:
 .../source/reference/pyspark.pandas/indexing.rst   |   2 +
 .../source/reference/pyspark.pandas/series.rst |   2 +
 python/pyspark/pandas/categorical.py   | 116 +++--
 python/pyspark/pandas/indexes/category.py  |  72 -
 python/pyspark/pandas/missing/indexes.py   |   2 -
 .../pyspark/pandas/tests/indexes/test_category.py  |  10 ++
 python/pyspark/pandas/tests/test_categorical.py|  22 
 7 files changed, 214 insertions(+), 12 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-36222][SQL] Step by days in the Sequence expression for dates

2021-07-20 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 9a7c59c  [SPARK-36222][SQL] Step by days in the Sequence expression 
for dates
9a7c59c is described below

commit 9a7c59c99ce69411485acf382dfc9be053927b59
Author: gengjiaan 
AuthorDate: Tue Jul 20 19:16:56 2021 +0300

[SPARK-36222][SQL] Step by days in the Sequence expression for dates

### What changes were proposed in this pull request?
The current implement of `Sequence` expression not support step by days for 
dates.
```
spark-sql> select sequence(date'2021-07-01', date'2021-07-10', interval '3' 
day);
Error in query: cannot resolve 'sequence(DATE '2021-07-01', DATE 
'2021-07-10', INTERVAL '3' DAY)' due to data type mismatch:
sequence uses the wrong parameter type. The parameter type must conform to:
1. The start and stop expressions must resolve to the same type.
2. If start and stop expressions resolve to the 'date' or 'timestamp' type
then the step expression must resolve to the 'interval' or
'interval year to month' or 'interval day to second' type,
otherwise to the same type as the start and stop expressions.
 ; line 1 pos 7;
'Project [unresolvedalias(sequence(2021-07-01, 2021-07-10, Some(INTERVAL 
'3' DAY), Some(Europe/Moscow)), None)]
+- OneRowRelation
```

### Why are the changes needed?
`DayTimeInterval` has day granularity should as step for dates.

### Does this PR introduce _any_ user-facing change?
'Yes'.
Sequence expression will supports step by `DayTimeInterval` has day 
granularity for dates.

### How was this patch tested?
New tests.

Closes #33439 from beliefer/SPARK-36222.

Authored-by: gengjiaan 
Signed-off-by: Max Gekk 
(cherry picked from commit c0d84e6cf1046b7944796038414ef21fe9c7e3b5)
Signed-off-by: Max Gekk 
---
 .../expressions/collectionOperations.scala | 17 --
 .../expressions/CollectionExpressionsSuite.scala   | 61 --
 2 files changed, 68 insertions(+), 10 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 730b8d0..2c3312a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -2574,7 +2574,8 @@ case class Sequence(
   DayTimeIntervalType.acceptsType(stepType)
   case DateType =>
 stepOpt.isEmpty || CalendarIntervalType.acceptsType(stepType) ||
-  YearMonthIntervalType.acceptsType(stepType)
+  YearMonthIntervalType.acceptsType(stepType) ||
+  DayTimeIntervalType.acceptsType(stepType)
   case _: IntegralType =>
 stepOpt.isEmpty || stepType.sameType(startType)
   case _ => false
@@ -2626,8 +2627,10 @@ case class Sequence(
 case DateType =>
   if (stepOpt.isEmpty || 
CalendarIntervalType.acceptsType(stepOpt.get.dataType)) {
 new TemporalSequenceImpl[Int](IntegerType, start.dataType, 
MICROS_PER_DAY, _.toInt, zoneId)
-  } else {
+  } else if (YearMonthIntervalType.acceptsType(stepOpt.get.dataType)) {
 new PeriodSequenceImpl[Int](IntegerType, start.dataType, 
MICROS_PER_DAY, _.toInt, zoneId)
+  } else {
+new DurationSequenceImpl[Int](IntegerType, start.dataType, 
MICROS_PER_DAY, _.toInt, zoneId)
   }
   }
 
@@ -2807,15 +2810,19 @@ object Sequence {
 val intervalType: DataType = DayTimeIntervalType()
 
 def splitStep(input: Any): (Int, Int, Long) = {
-  (0, 0, input.asInstanceOf[Long])
+  val duration = input.asInstanceOf[Long]
+  val days = IntervalUtils.getDays(duration)
+  val micros = duration - days * MICROS_PER_DAY
+  (0, days, micros)
 }
 
 def stepSplitCode(
 stepMonths: String, stepDays: String, stepMicros: String, step: 
String): String = {
   s"""
  |final int $stepMonths = 0;
- |final int $stepDays = 0;
- |final long $stepMicros = $step;
+ |final int $stepDays =
+ |  (int) 
org.apache.spark.sql.catalyst.util.IntervalUtils.getDays($step);
+ |final long $stepMicros = $step - $stepDays * ${MICROS_PER_DAY}L;
""".stripMargin
 }
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
index bfecbf5..caa5e96 100644
--- 

[spark] branch master updated (bf680bf -> c0d84e6)

2021-07-20 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from bf680bf  [SPARK-36210][SQL] Preserve column insertion order in 
Dataset.withColumns
 add c0d84e6  [SPARK-36222][SQL] Step by days in the Sequence expression 
for dates

No new revisions were added by this update.

Summary of changes:
 .../expressions/collectionOperations.scala | 17 --
 .../expressions/CollectionExpressionsSuite.scala   | 61 --
 2 files changed, 68 insertions(+), 10 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated (5565d1c -> 73645d4)

2021-07-20 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 5565d1c  [SPARK-36079][SQL] Null-based filter estimate should always 
be in the range [0, 1]
 add 73645d4  [SPARK-36210][SQL] Preserve column insertion order in 
Dataset.withColumns

No new revisions were added by this update.

Summary of changes:
 .../src/main/scala/org/apache/spark/sql/Dataset.scala   |  6 +++---
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala  | 17 +
 2 files changed, 20 insertions(+), 3 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated: [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns

2021-07-20 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 02a5e56  [SPARK-36210][SQL] Preserve column insertion order in 
Dataset.withColumns
02a5e56 is described below

commit 02a5e56ba75cf6a5ddac82a5f4283be0402a5388
Author: Koert Kuipers 
AuthorDate: Tue Jul 20 09:09:22 2021 -0700

[SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns

### What changes were proposed in this pull request?
Preserve the insertion order of columns in Dataset.withColumns

### Why are the changes needed?
It is the expected behavior. We preserve insertion order in all other 
places.

### Does this PR introduce _any_ user-facing change?
No. Currently Dataset.withColumns is not actually used anywhere to insert 
more than one column. This change is to make sure it behaves as expected when 
it is used for that purpose in future.

### How was this patch tested?
Added test in DatasetSuite

Closes #33423 from koertkuipers/feat-withcolumns-preserve-order.

Authored-by: Koert Kuipers 
Signed-off-by: Liang-Chi Hsieh 
(cherry picked from commit bf680bf25aae9619d462caee05c41cc33909338a)
Signed-off-by: Liang-Chi Hsieh 
---
 .../src/main/scala/org/apache/spark/sql/Dataset.scala   |  6 +++---
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala  | 17 +
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 5d83d1e..0d6fde3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2422,10 +2422,10 @@ class Dataset[T] private[sql](
 val resolver = sparkSession.sessionState.analyzer.resolver
 val output = queryExecution.analyzed.output
 
-val columnMap = colNames.zip(cols).toMap
+val columnSeq = colNames.zip(cols)
 
 val replacedAndExistingColumns = output.map { field =>
-  columnMap.find { case (colName, _) =>
+  columnSeq.find { case (colName, _) =>
 resolver(field.name, colName)
   } match {
 case Some((colName: String, col: Column)) => col.as(colName)
@@ -2433,7 +2433,7 @@ class Dataset[T] private[sql](
   }
 }
 
-val newColumns = columnMap.filter { case (colName, col) =>
+val newColumns = columnSeq.filter { case (colName, col) =>
   !output.exists(f => resolver(f.name, colName))
 }.map { case (colName, col) => col.as(colName) }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 1b8bb3f..074a517 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1998,6 +1998,23 @@ class DatasetSuite extends QueryTest
   joined,
   (1, 1), (2, 2), (3, 3))
   }
+
+  test("SPARK-36210: withColumns preserve insertion ordering") {
+val df = Seq(1, 2, 3).toDS()
+
+val colNames = (1 to 10).map(i => s"value${i}")
+val cols = (1 to 10).map(i => col("value") + i)
+
+val inserted = df.withColumns(colNames, cols)
+
+assert(inserted.columns === "value" +: colNames)
+
+checkDataset(
+  inserted.as[(Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int)],
+  (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
+  (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12),
+  (3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
+  }
 }
 
 case class Bar(a: Int)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns

2021-07-20 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new a864388  [SPARK-36210][SQL] Preserve column insertion order in 
Dataset.withColumns
a864388 is described below

commit a864388b5af94def1c6bbd06e24c433e503b4bff
Author: Koert Kuipers 
AuthorDate: Tue Jul 20 09:09:22 2021 -0700

[SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns

### What changes were proposed in this pull request?
Preserve the insertion order of columns in Dataset.withColumns

### Why are the changes needed?
It is the expected behavior. We preserve insertion order in all other 
places.

### Does this PR introduce _any_ user-facing change?
No. Currently Dataset.withColumns is not actually used anywhere to insert 
more than one column. This change is to make sure it behaves as expected when 
it is used for that purpose in future.

### How was this patch tested?
Added test in DatasetSuite

Closes #33423 from koertkuipers/feat-withcolumns-preserve-order.

Authored-by: Koert Kuipers 
Signed-off-by: Liang-Chi Hsieh 
(cherry picked from commit bf680bf25aae9619d462caee05c41cc33909338a)
Signed-off-by: Liang-Chi Hsieh 
---
 .../src/main/scala/org/apache/spark/sql/Dataset.scala   |  6 +++---
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala  | 17 +
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 12112ef..0fd10c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2406,10 +2406,10 @@ class Dataset[T] private[sql](
 val resolver = sparkSession.sessionState.analyzer.resolver
 val output = queryExecution.analyzed.output
 
-val columnMap = colNames.zip(cols).toMap
+val columnSeq = colNames.zip(cols)
 
 val replacedAndExistingColumns = output.map { field =>
-  columnMap.find { case (colName, _) =>
+  columnSeq.find { case (colName, _) =>
 resolver(field.name, colName)
   } match {
 case Some((colName: String, col: Column)) => col.as(colName)
@@ -2417,7 +2417,7 @@ class Dataset[T] private[sql](
   }
 }
 
-val newColumns = columnMap.filter { case (colName, col) =>
+val newColumns = columnSeq.filter { case (colName, col) =>
   !output.exists(f => resolver(f.name, colName))
 }.map { case (colName, col) => col.as(colName) }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 0dae9a2..5f71390 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -2040,6 +2040,23 @@ class DatasetSuite extends QueryTest
   joined,
   (1, 1), (2, 2), (3, 3))
   }
+
+  test("SPARK-36210: withColumns preserve insertion ordering") {
+val df = Seq(1, 2, 3).toDS()
+
+val colNames = (1 to 10).map(i => s"value${i}")
+val cols = (1 to 10).map(i => col("value") + i)
+
+val inserted = df.withColumns(colNames, cols)
+
+assert(inserted.columns === "value" +: colNames)
+
+checkDataset(
+  inserted.as[(Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int)],
+  (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11),
+  (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12),
+  (3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13))
+  }
 }
 
 case class Bar(a: Int)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (ddc61e6 -> bf680bf)

2021-07-20 Thread viirya
This is an automated email from the ASF dual-hosted git repository.

viirya pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from ddc61e6  [SPARK-36079][SQL] Null-based filter estimate should always 
be in the range [0, 1]
 add bf680bf  [SPARK-36210][SQL] Preserve column insertion order in 
Dataset.withColumns

No new revisions were added by this update.

Summary of changes:
 .../src/main/scala/org/apache/spark/sql/Dataset.scala   |  6 +++---
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala  | 17 +
 2 files changed, 20 insertions(+), 3 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.0 updated (7588b46 -> 5565d1c)

2021-07-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 7588b46  [SPARK-36216][PYTHON][TESTS] Increase timeout for 
StreamingLinearRegressionWithTests. test_parameter_convergence
 add 5565d1c  [SPARK-36079][SQL] Null-based filter estimate should always 
be in the range [0, 1]

No new revisions were added by this update.

Summary of changes:
 .../sql/catalyst/plans/logical/Statistics.scala| 13 +++
 .../logical/statsEstimation/EstimationUtils.scala  | 18 ++
 .../logical/statsEstimation/FilterEstimation.scala | 30 +++-
 .../logical/statsEstimation/JoinEstimation.scala   | 13 +++
 .../statsEstimation/FilterEstimationSuite.scala| 40 +-
 5 files changed, 80 insertions(+), 34 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.1 updated: [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1]

2021-07-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
 new 04ada05  [SPARK-36079][SQL] Null-based filter estimate should always 
be in the range [0, 1]
04ada05 is described below

commit 04ada0598d9c78253bde8378cac0a322c0ed1031
Author: Karen Feng 
AuthorDate: Tue Jul 20 21:32:13 2021 +0800

[SPARK-36079][SQL] Null-based filter estimate should always be in the range 
[0, 1]

Forces the selectivity estimate for null-based filters to be in the range 
`[0,1]`.

I noticed in a few TPC-DS query tests that the column statistic null count 
can be higher than the table statistic row count. In the current 
implementation, the selectivity estimate for `IsNotNull` is negative.

No

Unit test

Closes #33286 from karenfeng/bound-selectivity-est.

Authored-by: Karen Feng 
Signed-off-by: Wenchen Fan 
(cherry picked from commit ddc61e62b9af5deff1b93e22f466f2a13f281155)
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/plans/logical/Statistics.scala| 13 +++
 .../logical/statsEstimation/EstimationUtils.scala  | 18 ++
 .../logical/statsEstimation/FilterEstimation.scala | 30 +++-
 .../logical/statsEstimation/JoinEstimation.scala   | 13 +++
 .../statsEstimation/FilterEstimationSuite.scala| 40 +-
 5 files changed, 80 insertions(+), 34 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 1346f80..e80eae6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -24,6 +24,7 @@ import net.jpountz.lz4.{LZ4BlockInputStream, 
LZ4BlockOutputStream}
 
 import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
 import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -116,6 +117,18 @@ case class ColumnStat(
   maxLen = maxLen,
   histogram = histogram,
   version = version)
+
+  def updateCountStats(
+  oldNumRows: BigInt,
+  newNumRows: BigInt,
+  updatedColumnStatOpt: Option[ColumnStat] = None): ColumnStat = {
+val updatedColumnStat = updatedColumnStatOpt.getOrElse(this)
+val newDistinctCount = EstimationUtils.updateStat(oldNumRows, newNumRows,
+  distinctCount, updatedColumnStat.distinctCount)
+val newNullCount = EstimationUtils.updateStat(oldNumRows, newNumRows,
+  nullCount, updatedColumnStat.nullCount)
+updatedColumnStat.copy(distinctCount = newDistinctCount, nullCount = 
newNullCount)
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index 967cced..dafb979 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -52,14 +52,20 @@ object EstimationUtils {
   }
 
   /**
-   * Updates (scales down) the number of distinct values if the number of rows 
decreases after
-   * some operation (such as filter, join). Otherwise keep it unchanged.
+   * Updates (scales down) a statistic (eg. number of distinct values) if the 
number of rows
+   * decreases after some operation (such as filter, join). Otherwise keep it 
unchanged.
*/
-  def updateNdv(oldNumRows: BigInt, newNumRows: BigInt, oldNdv: BigInt): 
BigInt = {
-if (newNumRows < oldNumRows) {
-  ceil(BigDecimal(oldNdv) * BigDecimal(newNumRows) / 
BigDecimal(oldNumRows))
+  def updateStat(
+  oldNumRows: BigInt,
+  newNumRows: BigInt,
+  oldStatOpt: Option[BigInt],
+  updatedStatOpt: Option[BigInt]): Option[BigInt] = {
+if (oldStatOpt.isDefined && updatedStatOpt.isDefined && updatedStatOpt.get 
> 1 &&
+  newNumRows < oldNumRows) {
+// no need to scale down since it is already down to 1
+Some(ceil(BigDecimal(oldStatOpt.get) * BigDecimal(newNumRows) / 
BigDecimal(oldNumRows)))
 } else {
-  oldNdv
+  updatedStatOpt
 }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index 2c5beef..bc341b9 100755
--- 

[spark] branch branch-3.2 updated: [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1]

2021-07-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new f55f882  [SPARK-36079][SQL] Null-based filter estimate should always 
be in the range [0, 1]
f55f882 is described below

commit f55f8820fcbc10ed514d3afafe587b7bb68d8d5f
Author: Karen Feng 
AuthorDate: Tue Jul 20 21:32:13 2021 +0800

[SPARK-36079][SQL] Null-based filter estimate should always be in the range 
[0, 1]

### What changes were proposed in this pull request?

Forces the selectivity estimate for null-based filters to be in the range 
`[0,1]`.

### Why are the changes needed?

I noticed in a few TPC-DS query tests that the column statistic null count 
can be higher than the table statistic row count. In the current 
implementation, the selectivity estimate for `IsNotNull` is negative.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #33286 from karenfeng/bound-selectivity-est.

Authored-by: Karen Feng 
Signed-off-by: Wenchen Fan 
(cherry picked from commit ddc61e62b9af5deff1b93e22f466f2a13f281155)
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/plans/logical/Statistics.scala|  13 +
 .../logical/statsEstimation/EstimationUtils.scala  |  18 +-
 .../logical/statsEstimation/FilterEstimation.scala |  30 +--
 .../logical/statsEstimation/JoinEstimation.scala   |  13 +-
 .../statsEstimation/FilterEstimationSuite.scala|  40 ++-
 .../approved-plans-modified/q19.sf100/explain.txt  | 254 +--
 .../q19.sf100/simplified.txt   | 104 
 .../approved-plans-modified/q68.sf100/explain.txt  | 207 
 .../q68.sf100/simplified.txt   | 141 ++-
 .../approved-plans-modified/q73.sf100/explain.txt  | 272 +++--
 .../q73.sf100/simplified.txt   |  56 +++--
 11 files changed, 591 insertions(+), 557 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 7db3ee5..6f3ec3b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -24,6 +24,7 @@ import net.jpountz.lz4.{LZ4BlockInputStream, 
LZ4BlockOutputStream}
 
 import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
 import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -119,6 +120,18 @@ case class ColumnStat(
   maxLen = maxLen,
   histogram = histogram,
   version = version)
+
+  def updateCountStats(
+  oldNumRows: BigInt,
+  newNumRows: BigInt,
+  updatedColumnStatOpt: Option[ColumnStat] = None): ColumnStat = {
+val updatedColumnStat = updatedColumnStatOpt.getOrElse(this)
+val newDistinctCount = EstimationUtils.updateStat(oldNumRows, newNumRows,
+  distinctCount, updatedColumnStat.distinctCount)
+val newNullCount = EstimationUtils.updateStat(oldNumRows, newNumRows,
+  nullCount, updatedColumnStat.nullCount)
+updatedColumnStat.copy(distinctCount = newDistinctCount, nullCount = 
newNullCount)
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index 967cced..dafb979 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -52,14 +52,20 @@ object EstimationUtils {
   }
 
   /**
-   * Updates (scales down) the number of distinct values if the number of rows 
decreases after
-   * some operation (such as filter, join). Otherwise keep it unchanged.
+   * Updates (scales down) a statistic (eg. number of distinct values) if the 
number of rows
+   * decreases after some operation (such as filter, join). Otherwise keep it 
unchanged.
*/
-  def updateNdv(oldNumRows: BigInt, newNumRows: BigInt, oldNdv: BigInt): 
BigInt = {
-if (newNumRows < oldNumRows) {
-  ceil(BigDecimal(oldNdv) * BigDecimal(newNumRows) / 
BigDecimal(oldNumRows))
+  def updateStat(
+  oldNumRows: BigInt,
+  newNumRows: BigInt,
+  oldStatOpt: Option[BigInt],
+  updatedStatOpt: Option[BigInt]): Option[BigInt] = {
+if (oldStatOpt.isDefined && 

[spark] branch master updated (033a573 -> ddc61e6)

2021-07-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 033a573  [SPARK-36046][SQL][FOLLOWUP] Implement prettyName for 
MakeTimestampNTZ and MakeTimestampLTZ
 add ddc61e6  [SPARK-36079][SQL] Null-based filter estimate should always 
be in the range [0, 1]

No new revisions were added by this update.

Summary of changes:
 .../sql/catalyst/plans/logical/Statistics.scala|  13 +
 .../logical/statsEstimation/EstimationUtils.scala  |  18 +-
 .../logical/statsEstimation/FilterEstimation.scala |  30 +--
 .../logical/statsEstimation/JoinEstimation.scala   |  13 +-
 .../statsEstimation/FilterEstimationSuite.scala|  40 ++-
 .../approved-plans-modified/q19.sf100/explain.txt  | 254 +--
 .../q19.sf100/simplified.txt   | 104 
 .../approved-plans-modified/q68.sf100/explain.txt  | 207 
 .../q68.sf100/simplified.txt   | 141 ++-
 .../approved-plans-modified/q73.sf100/explain.txt  | 272 +++--
 .../q73.sf100/simplified.txt   |  56 +++--
 11 files changed, 591 insertions(+), 557 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-36046][SQL][FOLLOWUP] Implement prettyName for MakeTimestampNTZ and MakeTimestampLTZ

2021-07-20 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 0f6cf8a  [SPARK-36046][SQL][FOLLOWUP] Implement prettyName for 
MakeTimestampNTZ and MakeTimestampLTZ
0f6cf8a is described below

commit 0f6cf8abe34416ea4e9db5bd11f9f022c9fd7b7d
Author: gengjiaan 
AuthorDate: Tue Jul 20 21:31:00 2021 +0800

[SPARK-36046][SQL][FOLLOWUP] Implement prettyName for MakeTimestampNTZ and 
MakeTimestampLTZ

### What changes were proposed in this pull request?
This PR follows https://github.com/apache/spark/pull/33299 and implement 
`prettyName` for `MakeTimestampNTZ` and `MakeTimestampLTZ` based on the 
discussion show below
https://github.com/apache/spark/pull/33299/files#r668423810

### Why are the changes needed?
This PR fix the incorrect alias usecase.

### Does this PR introduce _any_ user-facing change?
'No'.
Modifications are transparent to users.

### How was this patch tested?
Jenkins test.

Closes #33430 from beliefer/SPARK-36046-followup.

Authored-by: gengjiaan 
Signed-off-by: Gengliang Wang 
(cherry picked from commit 033a5731b44723fd7434c5ee0a021d3787a621ef)
Signed-off-by: Gengliang Wang 
---
 .../org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala | 4 ++--
 .../apache/spark/sql/catalyst/expressions/datetimeExpressions.scala   | 4 
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 234da76..5fce4b6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -555,8 +555,8 @@ object FunctionRegistry {
 expression[SessionWindow]("session_window"),
 expression[MakeDate]("make_date"),
 expression[MakeTimestamp]("make_timestamp"),
-expression[MakeTimestampNTZ]("make_timestamp_ntz", true),
-expression[MakeTimestampLTZ]("make_timestamp_ltz", true),
+expression[MakeTimestampNTZ]("make_timestamp_ntz"),
+expression[MakeTimestampLTZ]("make_timestamp_ltz"),
 expression[MakeInterval]("make_interval"),
 expression[MakeDTInterval]("make_dt_interval"),
 expression[MakeYMInterval]("make_ym_interval"),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
index 1146ba7..bc2e33b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
@@ -2403,6 +2403,8 @@ case class MakeTimestampNTZ(
   MakeTimestamp(year, month, day, hour, min, sec, dataType = 
TimestampNTZType))
   }
 
+  override def prettyName: String = "make_timestamp_ntz"
+
   override def exprsReplaced: Seq[Expression] = Seq(year, month, day, hour, 
min, sec)
 
   override protected def withNewChildInternal(newChild: Expression): 
Expression =
@@ -2473,6 +2475,8 @@ case class MakeTimestampLTZ(
   MakeTimestamp(year, month, day, hour, min, sec, Some(timezone), dataType 
= TimestampType))
   }
 
+  override def prettyName: String = "make_timestamp_ltz"
+
   override def exprsReplaced: Seq[Expression] = Seq(year, month, day, hour, 
min, sec)
 
   override protected def withNewChildInternal(newChild: Expression): 
Expression =

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (801b369 -> 033a573)

2021-07-20 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 801b369  [SPARK-36204][INFRA][BUILD] Deduplicate Scala 2.13 daily build
 add 033a573  [SPARK-36046][SQL][FOLLOWUP] Implement prettyName for 
MakeTimestampNTZ and MakeTimestampLTZ

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala | 4 ++--
 .../apache/spark/sql/catalyst/expressions/datetimeExpressions.scala   | 4 
 2 files changed, 6 insertions(+), 2 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (463fcb3 -> 801b369)

2021-07-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 463fcb3  [SPARK-36207][PYTHON] Expose databaseExists in 
pyspark.sql.catalog
 add 801b369  [SPARK-36204][INFRA][BUILD] Deduplicate Scala 2.13 daily build

No new revisions were added by this update.

Summary of changes:
 .github/workflows/build_and_test.yml   |  54 +-
 .../workflows/build_and_test_scala213_daily.yml| 115 -
 dev/run-tests.py   |  14 ++-
 3 files changed, 61 insertions(+), 122 deletions(-)
 delete mode 100644 .github/workflows/build_and_test_scala213_daily.yml

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-36207][PYTHON] Expose databaseExists in pyspark.sql.catalog

2021-07-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 463fcb3  [SPARK-36207][PYTHON] Expose databaseExists in 
pyspark.sql.catalog
463fcb3 is described below

commit 463fcb3723d4c5cffd4b787e2d7254ceaf2bca98
Author: Dominik Gehl 
AuthorDate: Tue Jul 20 22:10:06 2021 +0900

[SPARK-36207][PYTHON] Expose databaseExists in pyspark.sql.catalog

### What changes were proposed in this pull request?
Expose databaseExists in pyspark.sql.catalog

### Why are the changes needed?
Was available in scala, but not in pyspark

### Does this PR introduce _any_ user-facing change?
New method databaseExists

### How was this patch tested?
Unit tests in codebase

Closes #33416 from dominikgehl/feature/SPARK-36207.

Lead-authored-by: Dominik Gehl 
Co-authored-by: Dominik Gehl 
Signed-off-by: Hyukjin Kwon 
---
 python/docs/source/reference/pyspark.sql.rst |  1 +
 python/pyspark/sql/catalog.py| 26 ++
 python/pyspark/sql/catalog.pyi   |  1 +
 python/pyspark/sql/tests/test_catalog.py |  8 
 4 files changed, 36 insertions(+)

diff --git a/python/docs/source/reference/pyspark.sql.rst 
b/python/docs/source/reference/pyspark.sql.rst
index 74eac3d..d8e7b41 100644
--- a/python/docs/source/reference/pyspark.sql.rst
+++ b/python/docs/source/reference/pyspark.sql.rst
@@ -618,6 +618,7 @@ Catalog APIs
 Catalog.createExternalTable
 Catalog.createTable
 Catalog.currentDatabase
+Catalog.databaseExists
 Catalog.dropGlobalTempView
 Catalog.dropTempView
 Catalog.isCached
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 8087d63..2d74c73 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -65,6 +65,32 @@ class Catalog(object):
 locationUri=jdb.locationUri()))
 return databases
 
+def databaseExists(self, dbName):
+"""Check if the database with the specified name exists.
+
+.. versionadded:: 3.3.0
+
+Parameters
+--
+dbName : str
+ name of the database to check existence
+
+Returns
+---
+bool
+Indicating whether the database exists
+
+Examples
+
+>>> spark.catalog.databaseExists("test_new_database")
+False
+>>> df = spark.sql("CREATE DATABASE test_new_database")
+>>> spark.catalog.databaseExists("test_new_database")
+True
+>>> df = spark.sql("DROP DATABASE test_new_database")
+"""
+return self._jcatalog.databaseExists(dbName)
+
 @since(2.0)
 def listTables(self, dbName=None):
 """Returns a list of tables/views in the specified database.
diff --git a/python/pyspark/sql/catalog.pyi b/python/pyspark/sql/catalog.pyi
index 6892719..1eed73a 100644
--- a/python/pyspark/sql/catalog.pyi
+++ b/python/pyspark/sql/catalog.pyi
@@ -36,6 +36,7 @@ class Catalog:
 def currentDatabase(self) -> str: ...
 def setCurrentDatabase(self, dbName: str) -> None: ...
 def listDatabases(self) -> List[Database]: ...
+def databaseExists(self, dbName: str) -> bool: ...
 def listTables(self, dbName: Optional[str] = ...) -> List[Table]: ...
 def listFunctions(self, dbName: Optional[str] = ...) -> List[Function]: ...
 def listColumns(
diff --git a/python/pyspark/sql/tests/test_catalog.py 
b/python/pyspark/sql/tests/test_catalog.py
index 8699878..90467fa 100644
--- a/python/pyspark/sql/tests/test_catalog.py
+++ b/python/pyspark/sql/tests/test_catalog.py
@@ -43,6 +43,14 @@ class CatalogTests(ReusedSQLTestCase):
 databases = [db.name for db in spark.catalog.listDatabases()]
 self.assertEqual(sorted(databases), ["default", "some_db"])
 
+def test_database_exists(self):
+# SPARK-36207: testing that database_exists returns correct boolean
+spark = self.spark
+with self.database("some_db"):
+self.assertFalse(spark.catalog.databaseExists("some_db"))
+spark.sql("CREATE DATABASE some_db")
+self.assertTrue(spark.catalog.databaseExists("some_db"))
+
 def test_list_tables(self):
 from pyspark.sql.catalog import Table
 spark = self.spark

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-36201][SQL][FOLLOWUP] Schema check should check inner field too

2021-07-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 7cd89ef  [SPARK-36201][SQL][FOLLOWUP] Schema check should check inner 
field too
7cd89ef is described below

commit 7cd89efca5a74dcc2457c7be5f2ef65aeb90a967
Author: Angerszh 
AuthorDate: Tue Jul 20 21:08:03 2021 +0800

[SPARK-36201][SQL][FOLLOWUP] Schema check should check inner field too

### What changes were proposed in this pull request?
When inner field have wrong schema filed name should check field name too.

![image](https://user-images.githubusercontent.com/46485123/126101009-c192d87f-1e18-4355-ad53-1419dacdeb76.png)

### Why are the changes needed?
Early check early faield

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #33409 from AngersZh/SPARK-36201.

Authored-by: Angerszh 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 251885772d41a572655e950a8e298315f222a803)
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/sql/execution/command/ddl.scala | 12 ++--
 .../apache/spark/sql/execution/command/tables.scala  |  2 +-
 .../execution/datasources/orc/OrcFileFormat.scala| 10 --
 .../datasources/parquet/ParquetSchemaConverter.scala | 10 --
 .../spark/sql/hive/execution/HiveDDLSuite.scala  | 20 
 5 files changed, 43 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 605d98e..140f9d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -924,23 +924,23 @@ object DDLUtils {
   }
 
   private[sql] def checkDataColNames(table: CatalogTable): Unit = {
-checkDataColNames(table, table.dataSchema.fieldNames)
+checkDataColNames(table, table.dataSchema)
   }
 
-  private[sql] def checkDataColNames(table: CatalogTable, colNames: 
Seq[String]): Unit = {
+  private[sql] def checkDataColNames(table: CatalogTable, schema: StructType): 
Unit = {
 table.provider.foreach {
   _.toLowerCase(Locale.ROOT) match {
 case HIVE_PROVIDER =>
   val serde = table.storage.serde
   if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) {
-OrcFileFormat.checkFieldNames(colNames)
+OrcFileFormat.checkFieldNames(schema)
   } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde ||
 serde == Some("parquet.hive.serde.ParquetHiveSerDe") ||
 serde == 
Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) {
-ParquetSchemaConverter.checkFieldNames(colNames)
+ParquetSchemaConverter.checkFieldNames(schema)
   }
-case "parquet" => ParquetSchemaConverter.checkFieldNames(colNames)
-case "orc" => OrcFileFormat.checkFieldNames(colNames)
+case "parquet" => ParquetSchemaConverter.checkFieldNames(schema)
+case "orc" => OrcFileFormat.checkFieldNames(schema)
 case _ =>
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 0599621..f740915 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -236,7 +236,7 @@ case class AlterTableAddColumnsCommand(
   (colsToAdd ++ catalogTable.schema).map(_.name),
   "in the table definition of " + table.identifier,
   conf.caseSensitiveAnalysis)
-DDLUtils.checkDataColNames(catalogTable, colsToAdd.map(_.name))
+DDLUtils.checkDataColNames(catalogTable, StructType(colsToAdd))
 
 val existingSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema)
 catalog.alterTableDataSchema(table, StructType(existingSchema ++ 
colsToAdd))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index d6593ca..9024c78 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -52,8 +52,14 @@ private[sql] object OrcFileFormat {
 }
   }
 
-  def checkFieldNames(names: Seq[String]): Unit = {
-names.foreach(checkFieldName)
+  def checkFieldNames(schema: StructType): Unit = {
+schema.foreach { field =>
+  checkFieldName(field.name)
+  

[spark] branch master updated (e9b18b0 -> 2518857)

2021-07-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from e9b18b0  [SPARK-31907][DOCS][SQL] Adding location of SQL API 
documentation
 add 2518857  [SPARK-36201][SQL][FOLLOWUP] Schema check should check inner 
field too

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/execution/command/ddl.scala | 12 ++--
 .../apache/spark/sql/execution/command/tables.scala  |  2 +-
 .../execution/datasources/orc/OrcFileFormat.scala| 10 --
 .../datasources/parquet/ParquetSchemaConverter.scala | 10 --
 .../spark/sql/hive/execution/HiveDDLSuite.scala  | 20 
 5 files changed, 43 insertions(+), 11 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (b70c258 -> e9b18b0)

2021-07-20 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from b70c258  [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at 
least one partition
 add e9b18b0  [SPARK-31907][DOCS][SQL] Adding location of SQL API 
documentation

No new revisions were added by this update.

Summary of changes:
 sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition

2021-07-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 677104f  [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at 
least one partition
677104f is described below

commit 677104f49531a5f5c214729b4d3e0ce91b4f4a64
Author: ulysses-you 
AuthorDate: Tue Jul 20 20:48:35 2021 +0800

[SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one 
partition

### What changes were proposed in this pull request?

* Add non-empty partition check in `CustomShuffleReaderExec`
* Make sure `OptimizeLocalShuffleReader` doesn't return empty partition

### Why are the changes needed?

Since SPARK-32083, AQE coalesce always return at least one partition, it 
should be robust to add non-empty check in `CustomShuffleReaderExec`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

not need

Closes #33431 from ulysses-you/non-empty-partition.

Authored-by: ulysses-you 
Signed-off-by: Wenchen Fan 
(cherry picked from commit b70c25881c8cbac21a62457ad4373a11cfe4)
Signed-off-by: Wenchen Fan 
---
 .../sql/execution/adaptive/CustomShuffleReaderExec.scala | 10 ++
 .../sql/execution/adaptive/OptimizeLocalShuffleReader.scala  | 12 +---
 2 files changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index cea3016..8975054 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -34,11 +34,14 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
  *
  * @param child   It is usually `ShuffleQueryStageExec`, but can be 
the shuffle exchange
  *node during canonicalization.
- * @param partitionSpecs  The partition specs that defines the arrangement.
+ * @param partitionSpecs  The partition specs that defines the arrangement, 
requires at least one
+ *partition.
  */
 case class CustomShuffleReaderExec private(
 child: SparkPlan,
 partitionSpecs: Seq[ShufflePartitionSpec]) extends UnaryExecNode {
+  assert(partitionSpecs.nonEmpty, "CustomShuffleReaderExec requires at least 
one partition")
+
   // If this reader is to read shuffle files locally, then all partition specs 
should be
   // `PartialMapperPartitionSpec`.
   if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
@@ -52,8 +55,7 @@ case class CustomShuffleReaderExec private(
 // If it is a local shuffle reader with one mapper per task, then the 
output partitioning is
 // the same as the plan before shuffle.
 // TODO this check is based on assumptions of callers' behavior but is 
sufficient for now.
-if (partitionSpecs.nonEmpty &&
-partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
+if (partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
 
partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size
 ==
   partitionSpecs.length) {
   child match {
@@ -111,7 +113,7 @@ case class CustomShuffleReaderExec private(
   }
 
   @transient private lazy val partitionDataSizes: Option[Seq[Long]] = {
-if (partitionSpecs.nonEmpty && !isLocalReader && 
shuffleStage.get.mapStats.isDefined) {
+if (!isLocalReader && shuffleStage.get.mapStats.isDefined) {
   Some(partitionSpecs.map {
 case p: CoalescedPartitionSpec =>
   assert(p.dataSize.isDefined)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
index c91b999..2103145 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
@@ -68,13 +68,11 @@ object OptimizeLocalShuffleReader extends 
CustomShuffleReaderRule {
   shuffleStage: ShuffleQueryStageExec,
   advisoryParallelism: Option[Int]): Seq[ShufflePartitionSpec] = {
 val numMappers = shuffleStage.shuffle.numMappers
+// ShuffleQueryStageExec.mapStats.isDefined promise numMappers > 0
+assert(numMappers > 0)
 val numReducers = shuffleStage.shuffle.numPartitions
 val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
-val splitPoints = if (numMappers == 0) {
-  Seq.empty
-} else {
-  

[spark] branch master updated (af978c8 -> b70c258)

2021-07-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from af978c8  [SPARK-36183][SQL] Push down limit 1 through Aggregate if it 
is group only
 add b70c258  [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at 
least one partition

No new revisions were added by this update.

Summary of changes:
 .../sql/execution/adaptive/CustomShuffleReaderExec.scala | 10 ++
 .../sql/execution/adaptive/OptimizeLocalShuffleReader.scala  | 12 +---
 2 files changed, 11 insertions(+), 11 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-36183][SQL] Push down limit 1 through Aggregate if it is group only

2021-07-20 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new af978c8  [SPARK-36183][SQL] Push down limit 1 through Aggregate if it 
is group only
af978c8 is described below

commit af978c87f10c89ee3a7c927ab9b039a2b84a492a
Author: Yuming Wang 
AuthorDate: Tue Jul 20 20:24:07 2021 +0800

[SPARK-36183][SQL] Push down limit 1 through Aggregate if it is group only

### What changes were proposed in this pull request?

Push down limit 1 and turn `Aggregate` into `Project` through `Aggregate` 
if it is group only. For example:
```sql
create table t1 using parquet as select id from range(1L);
create table t2 using parquet as select id from range(1L);
create view v1 as select * from t1 union select * from t2;
select * from v1 limit 1;
```

Before this PR | After this PR
-- | --

![image](https://user-images.githubusercontent.com/5399861/125975690-55663515-c4c5-4a04-aedf-f8ba37581ba7.png)
 | 
![image](https://user-images.githubusercontent.com/5399861/126168972-b2675e09-4f93-4026-b1be-af317205e57f.png)

### Why are the changes needed?

Improve query performance. This is a real case from the cluster:

![image](https://user-images.githubusercontent.com/5399861/125976597-18cb68d6-b22a-4d80-b270-01b2b13d1ef5.png)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #33397 from wangyum/SPARK-36183.

Authored-by: Yuming Wang 
Signed-off-by: Yuming Wang 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  5 +
 .../catalyst/optimizer/LimitPushdownSuite.scala| 24 ++
 2 files changed, 29 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index aa2221b..8cc6378 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -647,6 +647,11 @@ object LimitPushDown extends Rule[LogicalPlan] {
 // There is a Project between LocalLimit and Join if they do not have the 
same output.
 case LocalLimit(exp, project @ Project(_, join: Join)) =>
   LocalLimit(exp, project.copy(child = pushLocalLimitThroughJoin(exp, 
join)))
+// Push down limit 1 through Aggregate and turn Aggregate into Project if 
it is group only.
+case Limit(le @ IntegerLiteral(1), a: Aggregate) if a.groupOnly =>
+  Limit(le, Project(a.output, LocalLimit(le, a.child)))
+case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if 
a.groupOnly =>
+  Limit(le, p.copy(child = Project(a.output, LocalLimit(le, a.child
   }
 }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
index c2503e3..848416b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala
@@ -239,4 +239,28 @@ class LimitPushdownSuite extends PlanTest {
   Limit(5, LocalLimit(5, x).join(y, LeftOuter, 
joinCondition).select("x.a".attr)).analyze
 comparePlans(optimized, correctAnswer)
   }
+
+  test("SPARK-36183: Push down limit 1 through Aggregate if it is group only") 
{
+// Push down when it is group only and limit 1.
+comparePlans(
+  Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).limit(1).analyze),
+  LocalLimit(1, x).select("x.a".attr).limit(1).analyze)
+
+comparePlans(
+  
Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).select("x.a".attr).limit(1).analyze),
+  LocalLimit(1, x).select("x.a".attr).select("x.a".attr).limit(1).analyze)
+
+comparePlans(
+  
Optimize.execute(x.union(y).groupBy("x.a".attr)("x.a".attr).limit(1).analyze),
+  LocalLimit(1, LocalLimit(1, x).union(LocalLimit(1, 
y))).select("x.a".attr).limit(1).analyze)
+
+// No push down
+comparePlans(
+  Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).limit(2).analyze),
+  x.groupBy("x.a".attr)("x.a".attr).limit(2).analyze)
+
+comparePlans(
+  Optimize.execute(x.groupBy("x.a".attr)("x.a".attr, 
count("x.a".attr)).limit(1).analyze),
+  x.groupBy("x.a".attr)("x.a".attr, count("x.a".attr)).limit(1).analyze)
+  }
 }

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.2 updated: [SPARK-34051][DOCS][FOLLOWUP] Document about unicode literals

2021-07-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 3bc9346  [SPARK-34051][DOCS][FOLLOWUP] Document about unicode literals
3bc9346 is described below

commit 3bc9346a3ade89ee3a8c584ee5ef679abd6d0898
Author: Kousuke Saruta 
AuthorDate: Tue Jul 20 16:58:12 2021 +0800

[SPARK-34051][DOCS][FOLLOWUP] Document about unicode literals

### What changes were proposed in this pull request?

This PR documents about unicode literals added in SPARK-34051 (#31096) and 
a past PR in `sql-ref-literals.md`.

### Why are the changes needed?

Notice users about the literals.

### Does this PR introduce _any_ user-facing change?

Yes, but just add a sentence.

### How was this patch tested?

Built the document and confirmed the result.
```
SKIP_API=1 bundle exec jekyll build
```

![unicode-literals](https://user-images.githubusercontent.com/4736016/126283923-944dc162-1817-47bc-a7e8-c3145225586b.png)

Closes #33434 from sarutak/unicode-literal-doc.

Authored-by: Kousuke Saruta 
Signed-off-by: Wenchen Fan 
(cherry picked from commit ba1294ea5a25e908c19e3d4e73c6b7a420bb006e)
Signed-off-by: Wenchen Fan 
---
 docs/sql-ref-literals.md | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/docs/sql-ref-literals.md b/docs/sql-ref-literals.md
index 3dbed84..b23c63a 100644
--- a/docs/sql-ref-literals.md
+++ b/docs/sql-ref-literals.md
@@ -44,6 +44,8 @@ A string literal is used to specify a character string value.
 * **char**
 
 One character from the character set. Use `\` to escape special characters 
(e.g., `'` or `\`).
+To represent unicode characters, use 16-bit or 32-bit unicode escape of 
the form `\u` or `\U`,
+where  and  are 16-bit and 32-bit code points in hexadecimal 
respectively (e.g., `\u3042` for `あ` and `\U0001F44D` for ``).
 
  Examples
 

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (c77acf0 -> ba1294e)

2021-07-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from c77acf0  [SPARK-35546][SHUFFLE] Enable push-based shuffle when 
multiple app attempts are enabled and manage concurrent access to the state in 
a better way
 add ba1294e  [SPARK-34051][DOCS][FOLLOWUP] Document about unicode literals

No new revisions were added by this update.

Summary of changes:
 docs/sql-ref-literals.md | 2 ++
 1 file changed, 2 insertions(+)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org