[spark] branch branch-3.2 updated: [SPARK-36132][SS][SQL] Support initial state for batch mode of flatMapGroupsWithState
This is an automated email from the ASF dual-hosted git repository. tdas pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 0d60cb5 [SPARK-36132][SS][SQL] Support initial state for batch mode of flatMapGroupsWithState 0d60cb5 is described below commit 0d60cb51c01c13b0febe2ff7601db7303bfff56d Author: Rahul Mahadev AuthorDate: Wed Jul 21 01:48:58 2021 -0400 [SPARK-36132][SS][SQL] Support initial state for batch mode of flatMapGroupsWithState ### What changes were proposed in this pull request? Adding support for accepting an initial state with flatMapGroupsWithState in batch mode. ### Why are the changes needed? SPARK-35897 added support for accepting an initial state for streaming queries using flatMapGroupsWithState. the code flow is separate for batch and streaming and required a different PR. ### Does this PR introduce _any_ user-facing change? Yes as discussed above flatMapGroupsWithState in batch mode can accept an initialState, previously this would throw an UnsupportedOperationException ### How was this patch tested? Added relevant unit tests in FlatMapGroupsWithStateSuite and modified the tests `JavaDatasetSuite` Closes #6 from rahulsmahadev/flatMapGroupsWithStateBatch. Authored-by: Rahul Mahadev Signed-off-by: Tathagata Das (cherry picked from commit efcce23b913ce0de961ac261050e3d6dbf261f6e) Signed-off-by: Tathagata Das --- .../analysis/UnsupportedOperationChecker.scala | 6 -- .../spark/sql/execution/SparkStrategies.scala | 11 +++- .../streaming/FlatMapGroupsWithStateExec.scala | 71 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 18 +- .../streaming/FlatMapGroupsWithStateSuite.scala| 52 5 files changed, 130 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 13c7f75..321725d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -37,12 +37,6 @@ object UnsupportedOperationChecker extends Logging { case p if p.isStreaming => throwError("Queries with streaming sources must be executed with writeStream.start()")(p) - case f: FlatMapGroupsWithState => -if (f.hasInitialState) { - throwError("Initial state is not supported in [flatMap|map]GroupsWithState" + -" operation on a batch DataFrame/Dataset")(f) -} - case _ => } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 6d10fa8..7624b15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -690,9 +690,14 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.MapGroups(f, key, value, grouping, data, objAttr, child) => execution.MapGroupsExec(f, key, value, grouping, data, objAttr, planLater(child)) :: Nil case logical.FlatMapGroupsWithState( - f, key, value, grouping, data, output, _, _, _, timeout, _, _, _, _, _, child) => -execution.MapGroupsExec( - f, key, value, grouping, data, output, timeout, planLater(child)) :: Nil + f, keyDeserializer, valueDeserializer, grouping, data, output, stateEncoder, outputMode, + isFlatMapGroupsWithState, timeout, hasInitialState, initialStateGroupAttrs, + initialStateDataAttrs, initialStateDeserializer, initialState, child) => +FlatMapGroupsWithStateExec.generateSparkPlanForBatchQueries( + f, keyDeserializer, valueDeserializer, initialStateDeserializer, grouping, + initialStateGroupAttrs, data, initialStateDataAttrs, output, timeout, + hasInitialState, planLater(initialState), planLater(child) +) :: Nil case logical.CoGroup(f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, left, right) => execution.CoGroupExec( f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index 03694d4..a00a622 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++
[spark] branch master updated: [SPARK-36132][SS][SQL] Support initial state for batch mode of flatMapGroupsWithState
This is an automated email from the ASF dual-hosted git repository. tdas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new efcce23 [SPARK-36132][SS][SQL] Support initial state for batch mode of flatMapGroupsWithState efcce23 is described below commit efcce23b913ce0de961ac261050e3d6dbf261f6e Author: Rahul Mahadev AuthorDate: Wed Jul 21 01:48:58 2021 -0400 [SPARK-36132][SS][SQL] Support initial state for batch mode of flatMapGroupsWithState ### What changes were proposed in this pull request? Adding support for accepting an initial state with flatMapGroupsWithState in batch mode. ### Why are the changes needed? SPARK-35897 added support for accepting an initial state for streaming queries using flatMapGroupsWithState. the code flow is separate for batch and streaming and required a different PR. ### Does this PR introduce _any_ user-facing change? Yes as discussed above flatMapGroupsWithState in batch mode can accept an initialState, previously this would throw an UnsupportedOperationException ### How was this patch tested? Added relevant unit tests in FlatMapGroupsWithStateSuite and modified the tests `JavaDatasetSuite` Closes #6 from rahulsmahadev/flatMapGroupsWithStateBatch. Authored-by: Rahul Mahadev Signed-off-by: Tathagata Das --- .../analysis/UnsupportedOperationChecker.scala | 6 -- .../spark/sql/execution/SparkStrategies.scala | 11 +++- .../streaming/FlatMapGroupsWithStateExec.scala | 71 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 18 +- .../streaming/FlatMapGroupsWithStateSuite.scala| 52 5 files changed, 130 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 13c7f75..321725d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -37,12 +37,6 @@ object UnsupportedOperationChecker extends Logging { case p if p.isStreaming => throwError("Queries with streaming sources must be executed with writeStream.start()")(p) - case f: FlatMapGroupsWithState => -if (f.hasInitialState) { - throwError("Initial state is not supported in [flatMap|map]GroupsWithState" + -" operation on a batch DataFrame/Dataset")(f) -} - case _ => } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 6d10fa8..7624b15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -690,9 +690,14 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.MapGroups(f, key, value, grouping, data, objAttr, child) => execution.MapGroupsExec(f, key, value, grouping, data, objAttr, planLater(child)) :: Nil case logical.FlatMapGroupsWithState( - f, key, value, grouping, data, output, _, _, _, timeout, _, _, _, _, _, child) => -execution.MapGroupsExec( - f, key, value, grouping, data, output, timeout, planLater(child)) :: Nil + f, keyDeserializer, valueDeserializer, grouping, data, output, stateEncoder, outputMode, + isFlatMapGroupsWithState, timeout, hasInitialState, initialStateGroupAttrs, + initialStateDataAttrs, initialStateDeserializer, initialState, child) => +FlatMapGroupsWithStateExec.generateSparkPlanForBatchQueries( + f, keyDeserializer, valueDeserializer, initialStateDeserializer, grouping, + initialStateGroupAttrs, data, initialStateDataAttrs, output, timeout, + hasInitialState, planLater(initialState), planLater(child) +) :: Nil case logical.CoGroup(f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, left, right) => execution.CoGroupExec( f, key, lObj, rObj, lGroup, rGroup, lAttr, rAttr, oAttr, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index 03694d4..a00a622 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -309,9 +309,7 @@ case
[spark] branch branch-3.2 updated (6041d1c -> 0b14ab1)
This is an automated email from the ASF dual-hosted git repository. viirya pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git. from 6041d1c [SPARK-36030][SQL][FOLLOW-UP] Avoid procedure syntax deprecated in Scala 2.13 add 0b14ab1 [SPARK-36030][SQL][FOLLOW-UP][3.2] Remove duplicated test suiteRemove duplicated test suite No new revisions were added by this update. Summary of changes: .../FileFormatDataWriterMetricSuite.scala | 96 -- 1 file changed, 96 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriterMetricSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (99006e5 -> df798ed)
This is an automated email from the ASF dual-hosted git repository. viirya pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 99006e5 [SPARK-36030][SQL][FOLLOW-UP] Avoid procedure syntax deprecated in Scala 2.13 add df798ed [SPARK-36030][SQL][FOLLOW-UP] Remove duplicated test suite No new revisions were added by this update. Summary of changes: .../FileFormatDataWriterMetricSuite.scala | 96 -- 1 file changed, 96 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriterMetricSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated (86d1fb4 -> 6041d1c)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git. from 86d1fb4 [SPARK-36030][SQL] Support DS v2 metrics at writing path add 6041d1c [SPARK-36030][SQL][FOLLOW-UP] Avoid procedure syntax deprecated in Scala 2.13 No new revisions were added by this update. Summary of changes: .../sql/execution/datasources/FileFormatDataWriterMetricSuite.scala | 2 +- .../spark/sql/execution/datasources/InMemoryTableMetricSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (2653201 -> 99006e5)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2653201 [SPARK-36030][SQL] Support DS v2 metrics at writing path add 99006e5 [SPARK-36030][SQL][FOLLOW-UP] Avoid procedure syntax deprecated in Scala 2.13 No new revisions were added by this update. Summary of changes: .../sql/execution/datasources/FileFormatDataWriterMetricSuite.scala | 2 +- .../spark/sql/execution/datasources/InMemoryTableMetricSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated (ab80d3c -> 86d1fb4)
This is an automated email from the ASF dual-hosted git repository. viirya pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git. from ab80d3c [SPARK-35027][CORE] Close the inputStream in FileAppender when writin… add 86d1fb4 [SPARK-36030][SQL] Support DS v2 metrics at writing path No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/connector/read/Scan.java | 3 +- .../spark/sql/connector/write/DataWriter.java | 9 ++ .../apache/spark/sql/connector/write/Write.java| 9 ++ .../sql/connector/catalog/InMemoryTable.scala | 22 + .../datasources/FileFormatDataWriter.scala | 51 --- .../datasources/v2/DataSourceV2Strategy.scala | 8 +- .../datasources/v2/WriteToDataSourceV2Exec.scala | 36 +++- .../spark/sql/execution/metric/CustomMetrics.scala | 5 +- .../execution/streaming/MicroBatchExecution.scala | 4 +- .../sql/execution/streaming/StreamExecution.scala | 6 +- .../streaming/continuous/ContinuousExecution.scala | 6 +- .../streaming/continuous/ContinuousWriteRDD.scala | 11 ++- .../continuous/WriteToContinuousDataSource.scala | 4 +- .../WriteToContinuousDataSourceExec.scala | 11 ++- .../sources/WriteToMicroBatchDataSource.scala | 6 +- .../sql/connector/SimpleWritableDataSource.scala | 8 +- .../FileFormatDataWriterMetricSuite.scala | 96 +++ .../datasources/InMemoryTableMetricSuite.scala | 96 +++ .../sql/execution/metric/CustomMetricsSuite.scala | 10 +- .../execution/ui/SQLAppStatusListenerSuite.scala | 102 - 20 files changed, 456 insertions(+), 47 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriterMetricSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (305d563 -> 2653201)
This is an automated email from the ASF dual-hosted git repository. viirya pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 305d563 [SPARK-36153][SQL][DOCS] Update transform doc to match the current code add 2653201 [SPARK-36030][SQL] Support DS v2 metrics at writing path No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/connector/read/Scan.java | 3 +- .../spark/sql/connector/write/DataWriter.java | 9 ++ .../apache/spark/sql/connector/write/Write.java| 9 ++ .../sql/connector/catalog/InMemoryTable.scala | 22 + .../datasources/FileFormatDataWriter.scala | 51 --- .../datasources/v2/DataSourceV2Strategy.scala | 8 +- .../datasources/v2/WriteToDataSourceV2Exec.scala | 36 +++- .../spark/sql/execution/metric/CustomMetrics.scala | 5 +- .../execution/streaming/MicroBatchExecution.scala | 4 +- .../sql/execution/streaming/StreamExecution.scala | 6 +- .../streaming/continuous/ContinuousExecution.scala | 6 +- .../streaming/continuous/ContinuousWriteRDD.scala | 11 ++- .../continuous/WriteToContinuousDataSource.scala | 4 +- .../WriteToContinuousDataSourceExec.scala | 11 ++- .../sources/WriteToMicroBatchDataSource.scala | 6 +- .../sql/connector/SimpleWritableDataSource.scala | 8 +- .../FileFormatDataWriterMetricSuite.scala | 96 +++ .../datasources/InMemoryTableMetricSuite.scala | 96 +++ .../sql/execution/metric/CustomMetricsSuite.scala | 10 +- .../execution/ui/SQLAppStatusListenerSuite.scala | 102 - 20 files changed, 456 insertions(+), 47 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriterMetricSuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/InMemoryTableMetricSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36153][SQL][DOCS] Update transform doc to match the current code
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 305d563 [SPARK-36153][SQL][DOCS] Update transform doc to match the current code 305d563 is described below commit 305d563329bfb7a4ef582655b88a72d826a4e8aa Author: Angerszh AuthorDate: Tue Jul 20 21:38:37 2021 -0500 [SPARK-36153][SQL][DOCS] Update transform doc to match the current code ### What changes were proposed in this pull request? Update trasform's doc to latest code. ![image](https://user-images.githubusercontent.com/46485123/126175747-672cccbc-4e42-440f-8f1e-f00b6dc1be5f.png) ### Why are the changes needed? keep consistence ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No Closes #33362 from AngersZh/SPARK-36153. Lead-authored-by: Angerszh Co-authored-by: AngersZh Signed-off-by: Sean Owen --- docs/sql-ref-syntax-qry-select-transform.md | 101 1 file changed, 88 insertions(+), 13 deletions(-) diff --git a/docs/sql-ref-syntax-qry-select-transform.md b/docs/sql-ref-syntax-qry-select-transform.md index 21966f2..5a38e14 100644 --- a/docs/sql-ref-syntax-qry-select-transform.md +++ b/docs/sql-ref-syntax-qry-select-transform.md @@ -24,6 +24,15 @@ license: | The `TRANSFORM` clause is used to specify a Hive-style transform query specification to transform the inputs by running a user-specified command or script. +Spark's script transform supports two modes: + + 1. Hive support disabled: Spark script transform can run without `spark.sql.catalogImplementation=true` + or `SparkSession.builder.enableHiveSupport()`. In this case, now Spark only uses the script transform with + `ROW FORMAT DELIMITED` and treats all values passed to the script as strings. + 2. Hive support enabled: When Spark is run with `spark.sql.catalogImplementation=true` or Spark SQL is started + with `SparkSession.builder.enableHiveSupport()`, Spark can use the script transform with both Hive SerDe and + `ROW FORMAT DELIMITED`. + ### Syntax ```sql @@ -57,19 +66,85 @@ SELECT TRANSFORM ( expression [ , ... ] ) Specifies a command or a path to script to process data. -### SerDe behavior - -Spark uses the Hive SerDe `org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe` by default, so columns will be casted -to `STRING` and combined by tabs before feeding to the user script. All `NULL` values will be converted -to the literal string `"\N"` in order to differentiate `NULL` values from empty strings. The standard output of the -user script will be treated as tab-separated `STRING` columns, any cell containing only `"\N"` will be re-interpreted -as a `NULL` value, and then the resulting STRING column will be cast to the data type specified in `col_type`. If the actual -number of output columns is less than the number of specified output columns, insufficient output columns will be -supplemented with `NULL`. If the actual number of output columns is more than the number of specified output columns, -the output columns will only select the corresponding columns and the remaining part will be discarded. -If there is no `AS` clause after `USING my_script`, an output schema will be `key: STRING, value: STRING`. -The `key` column contains all the characters before the first tab and the `value` column contains the remaining characters after the first tab. -If there is no enough tab, Spark will return `NULL` value. These defaults can be overridden with `ROW FORMAT SERDE` or `ROW FORMAT DELIMITED`. +### ROW FORMAT DELIMITED BEHAVIOR + +When Spark uses `ROW FORMAT DELIMITED` format: + - Spark uses the character `\u0001` as the default field delimiter and this delimiter can be overridden by `FIELDS TERMINATED BY`. + - Spark uses the character `\n` as the default line delimiter and this delimiter can be overridden by `LINES TERMINATED BY`. + - Spark uses a string `\N` as the default `NULL` value in order to differentiate `NULL` values + from the literal string `NULL`. This delimiter can be overridden by `NULL DEFINED AS`. + - Spark casts all columns to `STRING` and combines columns by tabs before feeding to the user script. + For complex types such as `ARRAY`/`MAP`/`STRUCT`, Spark uses `to_json` casts it to an input `JSON` string and uses + `from_json` to convert the result output `JSON` string to `ARRAY`/`MAP`/`STRUCT` data. + - `COLLECTION ITEMS TERMINATED BY` and `MAP KEYS TERMINATED BY` are delimiters to split complex data such as + `ARRAY`/`MAP`/`STRUCT`, Spark uses `to_json` and `from_json` to handle complex data types with `JSON` format. So + `COLLECTION ITEMS TERMINATED BY` and `MAP KEYS TERMINATED BY` won't work in default row format. + - The standard output of the
[spark] branch master updated (1a8c675 -> 7ceefca)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1a8c675 [SPARK-35027][CORE] Close the inputStream in FileAppender when writin… add 7ceefca [SPARK-35658][DOCS] Document Parquet encryption feature in Spark SQL No new revisions were added by this update. Summary of changes: docs/sql-data-sources-parquet.md | 65 1 file changed, 65 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated (73645d4 -> 25caec4)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git. from 73645d4 [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns add 25caec4 [SPARK-35027][CORE] Close the inputStream in FileAppender when writin… No new revisions were added by this update. Summary of changes: .../spark/deploy/worker/ExecutorRunner.scala | 4 +-- .../apache/spark/util/logging/FileAppender.scala | 37 -- .../spark/util/logging/RollingFileAppender.scala | 6 ++-- .../org/apache/spark/util/FileAppenderSuite.scala | 35 4 files changed, 68 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-35027][CORE] Close the inputStream in FileAppender when writin…
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 8c0e2d8 [SPARK-35027][CORE] Close the inputStream in FileAppender when writin… 8c0e2d8 is described below commit 8c0e2d838c2aa386bc164dd960d12312366d3732 Author: Jie AuthorDate: Tue Jul 20 21:23:51 2021 -0500 [SPARK-35027][CORE] Close the inputStream in FileAppender when writin… ### What changes were proposed in this pull request? 1. add "closeStreams" to FileAppender and RollingFileAppender 2. set "closeStreams" to "true" in ExecutorRunner ### Why are the changes needed? The executor will hang when due disk full or other exceptions which happened in writting to outputStream: the root cause is the "inputStream" is not closed after the error happens: 1. ExecutorRunner creates two files appenders for pipe: one for stdout, one for stderr 2. FileAppender.appendStreamToFile exits the loop when writing to outputStream 3. FileAppender closes the outputStream, but left the inputStream which refers the pipe's stdout and stderr opened 4. The executor will hang when printing the log message if the pipe is full (no one consume the outputs) 5. From the driver side, you can see the task can't be completed for ever With this fix, the step 4 will throw an exception, the driver can catch up the exception and reschedule the failed task to other executors. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new tests for the "closeStreams" in FileAppenderSuite Closes #33263 from jhu-chang/SPARK-35027. Authored-by: Jie Signed-off-by: Sean Owen (cherry picked from commit 1a8c6755a1802afdb9a73793e9348d322176125a) Signed-off-by: Sean Owen --- .../spark/deploy/worker/ExecutorRunner.scala | 4 +-- .../apache/spark/util/logging/FileAppender.scala | 37 -- .../spark/util/logging/RollingFileAppender.scala | 6 ++-- .../org/apache/spark/util/FileAppenderSuite.scala | 35 4 files changed, 68 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 2e26ccf..974c2d6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -185,11 +185,11 @@ private[deploy] class ExecutorRunner( // Redirect its stdout and stderr to files val stdout = new File(executorDir, "stdout") - stdoutAppender = FileAppender(process.getInputStream, stdout, conf) + stdoutAppender = FileAppender(process.getInputStream, stdout, conf, true) val stderr = new File(executorDir, "stderr") Files.write(header, stderr, StandardCharsets.UTF_8) - stderrAppender = FileAppender(process.getErrorStream, stderr, conf) + stderrAppender = FileAppender(process.getErrorStream, stderr, conf, true) state = ExecutorState.RUNNING worker.send(ExecutorStateChanged(appId, execId, state, None, None)) diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index 7107be2..2243239 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -26,8 +26,12 @@ import org.apache.spark.util.{IntParam, Utils} /** * Continuously appends the data from an input stream into the given file. */ -private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSize: Int = 8192) - extends Logging { +private[spark] class FileAppender( + inputStream: InputStream, + file: File, + bufferSize: Int = 8192, + closeStreams: Boolean = false +) extends Logging { @volatile private var outputStream: FileOutputStream = null @volatile private var markedForStop = false // has the appender been asked to stopped @@ -76,7 +80,13 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi } } } { -closeFile() +try { + if (closeStreams) { +inputStream.close() + } +} finally { + closeFile() +} } } catch { case e: Exception => @@ -113,7 +123,12 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi private[spark] object FileAppender extends Logging { /** Create the right appender based on Spark configuration */ - def apply(inputStream: InputStream, file: File, conf: SparkConf): FileAppender = { + def apply( +
[spark] branch branch-3.2 updated (e264c21 -> ab80d3c)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git. from e264c21 [SPARK-36172][SS] Document session window into Structured Streaming guide doc add ab80d3c [SPARK-35027][CORE] Close the inputStream in FileAppender when writin… No new revisions were added by this update. Summary of changes: .../spark/deploy/worker/ExecutorRunner.scala | 4 +-- .../apache/spark/util/logging/FileAppender.scala | 37 -- .../spark/util/logging/RollingFileAppender.scala | 6 ++-- .../org/apache/spark/util/FileAppenderSuite.scala | 35 4 files changed, 68 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0eb31a0 -> 1a8c675)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0eb31a0 [SPARK-36172][SS] Document session window into Structured Streaming guide doc add 1a8c675 [SPARK-35027][CORE] Close the inputStream in FileAppender when writin… No new revisions were added by this update. Summary of changes: .../spark/deploy/worker/ExecutorRunner.scala | 4 +-- .../apache/spark/util/logging/FileAppender.scala | 37 -- .../spark/util/logging/RollingFileAppender.scala | 6 ++-- .../org/apache/spark/util/FileAppenderSuite.scala | 35 4 files changed, 68 insertions(+), 14 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36172][SS] Document session window into Structured Streaming guide doc
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new e264c21 [SPARK-36172][SS] Document session window into Structured Streaming guide doc e264c21 is described below commit e264c21707a4c9f2df9f81a202b0568b16afa3f6 Author: Jungtaek Lim AuthorDate: Wed Jul 21 10:45:31 2021 +0900 [SPARK-36172][SS] Document session window into Structured Streaming guide doc ### What changes were proposed in this pull request? This PR documents a new feature "native support of session window" into Structured Streaming guide doc. Screenshots are following: ![스크린샷 2021-07-20 오후 5 04 20](https://user-images.githubusercontent.com/1317309/126284848-526ec056-1028-4a70-a1f4-ae275d4b5437.png) ![스크린샷 2021-07-20 오후 3 34 38](https://user-images.githubusercontent.com/1317309/126276763-763cf841-aef7-412a-aa03-d93273f0c850.png) ### Why are the changes needed? This change is needed to explain a new feature to the end users. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Documentation changes. Closes #33433 from HeartSaVioR/SPARK-36172. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim (cherry picked from commit 0eb31a06d6f2b7583b6a9c646baeff58094f8d6c) Signed-off-by: Jungtaek Lim --- .../img/structured-streaming-time-window-types.jpg | Bin 0 -> 56637 bytes docs/img/structured-streaming.pptx | Bin 1126657 -> 1130887 bytes docs/structured-streaming-programming-guide.md | 71 + 3 files changed, 71 insertions(+) diff --git a/docs/img/structured-streaming-time-window-types.jpg b/docs/img/structured-streaming-time-window-types.jpg new file mode 100644 index 000..7e0ad1b Binary files /dev/null and b/docs/img/structured-streaming-time-window-types.jpg differ diff --git a/docs/img/structured-streaming.pptx b/docs/img/structured-streaming.pptx index 2ffd9f2..b35bf75 100644 Binary files a/docs/img/structured-streaming.pptx and b/docs/img/structured-streaming.pptx differ diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 1eabcdd..3d02eb7 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1063,6 +1063,77 @@ then drops intermediate state of a window < watermark, and appends the final counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is appended to the Result Table only after the watermark is updated to `12:11`. + Types of time windows + +Spark supports three types of time windows: tumbling (fixed), sliding and session. + +![The types of time windows](img/structured-streaming-time-window-types.jpg) + +Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals. An input +can only be bound to a single window. + +Sliding windows are similar to the tumbling windows from the point of being "fixed-sized", but windows +can overlap if the duration of slide is smaller than the duration of window, and in this case an input +can be bound to the multiple windows. + +Tumbling and sliding window use `window` function, which has been described on above examples. + +Session windows have different characteristic compared to the previous two types. Session window has a dynamic size +of the window length, depending on the inputs. A session window starts with an input, and expands itself +if following input has been received within gap duration. A session window closes when there's no input +received within gap duration after receiving the latest input. + +Session window uses `session_window` function. The usage of the function is similar to the `window` function. + + + + +{% highlight scala %} +import spark.implicits._ + +val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String } + +// Group the data by session window and userId, and compute the count of each group +val sessionizedCounts = events +.withWatermark("timestamp", "10 minutes") +.groupBy( +session_window($"timestamp", "5 minutes"), +$"userId") +.count() +{% endhighlight %} + + + + +{% highlight java %} +Dataset events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String } + +// Group the data by session window and userId, and compute the count of each group +Dataset sessionizedCounts = events +.withWatermark("timestamp", "10 minutes") +.groupBy( +session_window(col("timestamp"), "5 minutes"), +col("userId")) +.count(); +{% endhighlight %} + + + +{% highlight python %} +events = ... # streaming DataFrame of schema { timestamp: Timestamp,
[spark] branch master updated (376fadc -> 0eb31a0)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 376fadc [SPARK-36186][PYTHON] Add as_ordered/as_unordered to CategoricalAccessor and CategoricalIndex add 0eb31a0 [SPARK-36172][SS] Document session window into Structured Streaming guide doc No new revisions were added by this update. Summary of changes: .../img/structured-streaming-time-window-types.jpg | Bin 0 -> 56637 bytes docs/img/structured-streaming.pptx | Bin 1126657 -> 1130887 bytes docs/structured-streaming-programming-guide.md | 71 + 3 files changed, 71 insertions(+) create mode 100644 docs/img/structured-streaming-time-window-types.jpg - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36186][PYTHON] Add as_ordered/as_unordered to CategoricalAccessor and CategoricalIndex
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new a3a13da [SPARK-36186][PYTHON] Add as_ordered/as_unordered to CategoricalAccessor and CategoricalIndex a3a13da is described below commit a3a13da26c19b4241bf2f76273a82fe8598eddf5 Author: Takuya UESHIN AuthorDate: Tue Jul 20 18:23:54 2021 -0700 [SPARK-36186][PYTHON] Add as_ordered/as_unordered to CategoricalAccessor and CategoricalIndex ### What changes were proposed in this pull request? Add `as_ordered`/`as_unordered` to `CategoricalAccessor` and `CategoricalIndex`. ### Why are the changes needed? We should implement `as_ordered`/`as_unordered` in `CategoricalAccessor` and `CategoricalIndex` yet. ### Does this PR introduce _any_ user-facing change? Yes, users will be able to use `as_ordered`/`as_unordered`. ### How was this patch tested? Added some tests. Closes #33400 from ueshin/issues/SPARK-36186/as_ordered_unordered. Authored-by: Takuya UESHIN Signed-off-by: Takuya UESHIN (cherry picked from commit 376fadc89cffac97aebe49a7cf4a4bc978b1d09e) Signed-off-by: Takuya UESHIN --- .../source/reference/pyspark.pandas/indexing.rst | 2 + .../source/reference/pyspark.pandas/series.rst | 2 + python/pyspark/pandas/categorical.py | 116 +++-- python/pyspark/pandas/indexes/category.py | 72 - python/pyspark/pandas/missing/indexes.py | 2 - .../pyspark/pandas/tests/indexes/test_category.py | 10 ++ python/pyspark/pandas/tests/test_categorical.py| 22 7 files changed, 214 insertions(+), 12 deletions(-) diff --git a/python/docs/source/reference/pyspark.pandas/indexing.rst b/python/docs/source/reference/pyspark.pandas/indexing.rst index e91f699..4f84d91 100644 --- a/python/docs/source/reference/pyspark.pandas/indexing.rst +++ b/python/docs/source/reference/pyspark.pandas/indexing.rst @@ -175,6 +175,8 @@ Categorical components CategoricalIndex.codes CategoricalIndex.categories CategoricalIndex.ordered + CategoricalIndex.as_ordered + CategoricalIndex.as_unordered .. _api.multiindex: diff --git a/python/docs/source/reference/pyspark.pandas/series.rst b/python/docs/source/reference/pyspark.pandas/series.rst index a199d70..b718d79 100644 --- a/python/docs/source/reference/pyspark.pandas/series.rst +++ b/python/docs/source/reference/pyspark.pandas/series.rst @@ -401,6 +401,8 @@ the ``Series.cat`` accessor. Series.cat.categories Series.cat.ordered Series.cat.codes + Series.cat.as_ordered + Series.cat.as_unordered .. _api.series.plot: diff --git a/python/pyspark/pandas/categorical.py b/python/pyspark/pandas/categorical.py index 3495b35..b8cc88c 100644 --- a/python/pyspark/pandas/categorical.py +++ b/python/pyspark/pandas/categorical.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from typing import TYPE_CHECKING, cast +from typing import Optional, TYPE_CHECKING, cast import pandas as pd from pandas.api.types import CategoricalDtype @@ -62,6 +62,10 @@ class CategoricalAccessor(object): self._data = series @property +def _dtype(self) -> CategoricalDtype: +return cast(CategoricalDtype, self._data.dtype) + +@property def categories(self) -> pd.Index: """ The categories of this categorical. @@ -82,7 +86,7 @@ class CategoricalAccessor(object): >>> s.cat.categories Index(['a', 'b', 'c'], dtype='object') """ -return cast(CategoricalDtype, self._data.dtype).categories +return self._dtype.categories @categories.setter def categories(self, categories: pd.Index) -> None: @@ -109,7 +113,7 @@ class CategoricalAccessor(object): >>> s.cat.ordered False """ -return cast(CategoricalDtype, self._data.dtype).ordered +return self._dtype.ordered @property def codes(self) -> "ps.Series": @@ -152,11 +156,109 @@ class CategoricalAccessor(object): def add_categories(self, new_categories: pd.Index, inplace: bool = False) -> "ps.Series": raise NotImplementedError() -def as_ordered(self, inplace: bool = False) -> "ps.Series": -raise NotImplementedError() +def _set_ordered(self, *, ordered: bool, inplace: bool) -> Optional["ps.Series"]: +from pyspark.pandas.frame import DataFrame + +if self.ordered == ordered: +if inplace: +return None +else: +psser = self._data +else: +internal = self._data._psdf._internal.with_new_spark_column( +self._data._column_label, +
[spark] branch master updated (c0d84e6 -> 376fadc)
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c0d84e6 [SPARK-36222][SQL] Step by days in the Sequence expression for dates add 376fadc [SPARK-36186][PYTHON] Add as_ordered/as_unordered to CategoricalAccessor and CategoricalIndex No new revisions were added by this update. Summary of changes: .../source/reference/pyspark.pandas/indexing.rst | 2 + .../source/reference/pyspark.pandas/series.rst | 2 + python/pyspark/pandas/categorical.py | 116 +++-- python/pyspark/pandas/indexes/category.py | 72 - python/pyspark/pandas/missing/indexes.py | 2 - .../pyspark/pandas/tests/indexes/test_category.py | 10 ++ python/pyspark/pandas/tests/test_categorical.py| 22 7 files changed, 214 insertions(+), 12 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36222][SQL] Step by days in the Sequence expression for dates
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 9a7c59c [SPARK-36222][SQL] Step by days in the Sequence expression for dates 9a7c59c is described below commit 9a7c59c99ce69411485acf382dfc9be053927b59 Author: gengjiaan AuthorDate: Tue Jul 20 19:16:56 2021 +0300 [SPARK-36222][SQL] Step by days in the Sequence expression for dates ### What changes were proposed in this pull request? The current implement of `Sequence` expression not support step by days for dates. ``` spark-sql> select sequence(date'2021-07-01', date'2021-07-10', interval '3' day); Error in query: cannot resolve 'sequence(DATE '2021-07-01', DATE '2021-07-10', INTERVAL '3' DAY)' due to data type mismatch: sequence uses the wrong parameter type. The parameter type must conform to: 1. The start and stop expressions must resolve to the same type. 2. If start and stop expressions resolve to the 'date' or 'timestamp' type then the step expression must resolve to the 'interval' or 'interval year to month' or 'interval day to second' type, otherwise to the same type as the start and stop expressions. ; line 1 pos 7; 'Project [unresolvedalias(sequence(2021-07-01, 2021-07-10, Some(INTERVAL '3' DAY), Some(Europe/Moscow)), None)] +- OneRowRelation ``` ### Why are the changes needed? `DayTimeInterval` has day granularity should as step for dates. ### Does this PR introduce _any_ user-facing change? 'Yes'. Sequence expression will supports step by `DayTimeInterval` has day granularity for dates. ### How was this patch tested? New tests. Closes #33439 from beliefer/SPARK-36222. Authored-by: gengjiaan Signed-off-by: Max Gekk (cherry picked from commit c0d84e6cf1046b7944796038414ef21fe9c7e3b5) Signed-off-by: Max Gekk --- .../expressions/collectionOperations.scala | 17 -- .../expressions/CollectionExpressionsSuite.scala | 61 -- 2 files changed, 68 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 730b8d0..2c3312a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -2574,7 +2574,8 @@ case class Sequence( DayTimeIntervalType.acceptsType(stepType) case DateType => stepOpt.isEmpty || CalendarIntervalType.acceptsType(stepType) || - YearMonthIntervalType.acceptsType(stepType) + YearMonthIntervalType.acceptsType(stepType) || + DayTimeIntervalType.acceptsType(stepType) case _: IntegralType => stepOpt.isEmpty || stepType.sameType(startType) case _ => false @@ -2626,8 +2627,10 @@ case class Sequence( case DateType => if (stepOpt.isEmpty || CalendarIntervalType.acceptsType(stepOpt.get.dataType)) { new TemporalSequenceImpl[Int](IntegerType, start.dataType, MICROS_PER_DAY, _.toInt, zoneId) - } else { + } else if (YearMonthIntervalType.acceptsType(stepOpt.get.dataType)) { new PeriodSequenceImpl[Int](IntegerType, start.dataType, MICROS_PER_DAY, _.toInt, zoneId) + } else { +new DurationSequenceImpl[Int](IntegerType, start.dataType, MICROS_PER_DAY, _.toInt, zoneId) } } @@ -2807,15 +2810,19 @@ object Sequence { val intervalType: DataType = DayTimeIntervalType() def splitStep(input: Any): (Int, Int, Long) = { - (0, 0, input.asInstanceOf[Long]) + val duration = input.asInstanceOf[Long] + val days = IntervalUtils.getDays(duration) + val micros = duration - days * MICROS_PER_DAY + (0, days, micros) } def stepSplitCode( stepMonths: String, stepDays: String, stepMicros: String, step: String): String = { s""" |final int $stepMonths = 0; - |final int $stepDays = 0; - |final long $stepMicros = $step; + |final int $stepDays = + | (int) org.apache.spark.sql.catalyst.util.IntervalUtils.getDays($step); + |final long $stepMicros = $step - $stepDays * ${MICROS_PER_DAY}L; """.stripMargin } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala index bfecbf5..caa5e96 100644 ---
[spark] branch master updated (bf680bf -> c0d84e6)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from bf680bf [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns add c0d84e6 [SPARK-36222][SQL] Step by days in the Sequence expression for dates No new revisions were added by this update. Summary of changes: .../expressions/collectionOperations.scala | 17 -- .../expressions/CollectionExpressionsSuite.scala | 61 -- 2 files changed, 68 insertions(+), 10 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated (5565d1c -> 73645d4)
This is an automated email from the ASF dual-hosted git repository. viirya pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git. from 5565d1c [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1] add 73645d4 [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns No new revisions were added by this update. Summary of changes: .../src/main/scala/org/apache/spark/sql/Dataset.scala | 6 +++--- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 17 + 2 files changed, 20 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns
This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 02a5e56 [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns 02a5e56 is described below commit 02a5e56ba75cf6a5ddac82a5f4283be0402a5388 Author: Koert Kuipers AuthorDate: Tue Jul 20 09:09:22 2021 -0700 [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns ### What changes were proposed in this pull request? Preserve the insertion order of columns in Dataset.withColumns ### Why are the changes needed? It is the expected behavior. We preserve insertion order in all other places. ### Does this PR introduce _any_ user-facing change? No. Currently Dataset.withColumns is not actually used anywhere to insert more than one column. This change is to make sure it behaves as expected when it is used for that purpose in future. ### How was this patch tested? Added test in DatasetSuite Closes #33423 from koertkuipers/feat-withcolumns-preserve-order. Authored-by: Koert Kuipers Signed-off-by: Liang-Chi Hsieh (cherry picked from commit bf680bf25aae9619d462caee05c41cc33909338a) Signed-off-by: Liang-Chi Hsieh --- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 6 +++--- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 17 + 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5d83d1e..0d6fde3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2422,10 +2422,10 @@ class Dataset[T] private[sql]( val resolver = sparkSession.sessionState.analyzer.resolver val output = queryExecution.analyzed.output -val columnMap = colNames.zip(cols).toMap +val columnSeq = colNames.zip(cols) val replacedAndExistingColumns = output.map { field => - columnMap.find { case (colName, _) => + columnSeq.find { case (colName, _) => resolver(field.name, colName) } match { case Some((colName: String, col: Column)) => col.as(colName) @@ -2433,7 +2433,7 @@ class Dataset[T] private[sql]( } } -val newColumns = columnMap.filter { case (colName, col) => +val newColumns = columnSeq.filter { case (colName, col) => !output.exists(f => resolver(f.name, colName)) }.map { case (colName, col) => col.as(colName) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 1b8bb3f..074a517 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1998,6 +1998,23 @@ class DatasetSuite extends QueryTest joined, (1, 1), (2, 2), (3, 3)) } + + test("SPARK-36210: withColumns preserve insertion ordering") { +val df = Seq(1, 2, 3).toDS() + +val colNames = (1 to 10).map(i => s"value${i}") +val cols = (1 to 10).map(i => col("value") + i) + +val inserted = df.withColumns(colNames, cols) + +assert(inserted.columns === "value" +: colNames) + +checkDataset( + inserted.as[(Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int)], + (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12), + (3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)) + } } case class Bar(a: Int) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns
This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new a864388 [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns a864388 is described below commit a864388b5af94def1c6bbd06e24c433e503b4bff Author: Koert Kuipers AuthorDate: Tue Jul 20 09:09:22 2021 -0700 [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns ### What changes were proposed in this pull request? Preserve the insertion order of columns in Dataset.withColumns ### Why are the changes needed? It is the expected behavior. We preserve insertion order in all other places. ### Does this PR introduce _any_ user-facing change? No. Currently Dataset.withColumns is not actually used anywhere to insert more than one column. This change is to make sure it behaves as expected when it is used for that purpose in future. ### How was this patch tested? Added test in DatasetSuite Closes #33423 from koertkuipers/feat-withcolumns-preserve-order. Authored-by: Koert Kuipers Signed-off-by: Liang-Chi Hsieh (cherry picked from commit bf680bf25aae9619d462caee05c41cc33909338a) Signed-off-by: Liang-Chi Hsieh --- .../src/main/scala/org/apache/spark/sql/Dataset.scala | 6 +++--- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 17 + 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 12112ef..0fd10c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2406,10 +2406,10 @@ class Dataset[T] private[sql]( val resolver = sparkSession.sessionState.analyzer.resolver val output = queryExecution.analyzed.output -val columnMap = colNames.zip(cols).toMap +val columnSeq = colNames.zip(cols) val replacedAndExistingColumns = output.map { field => - columnMap.find { case (colName, _) => + columnSeq.find { case (colName, _) => resolver(field.name, colName) } match { case Some((colName: String, col: Column)) => col.as(colName) @@ -2417,7 +2417,7 @@ class Dataset[T] private[sql]( } } -val newColumns = columnMap.filter { case (colName, col) => +val newColumns = columnSeq.filter { case (colName, col) => !output.exists(f => resolver(f.name, colName)) }.map { case (colName, col) => col.as(colName) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 0dae9a2..5f71390 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2040,6 +2040,23 @@ class DatasetSuite extends QueryTest joined, (1, 1), (2, 2), (3, 3)) } + + test("SPARK-36210: withColumns preserve insertion ordering") { +val df = Seq(1, 2, 3).toDS() + +val colNames = (1 to 10).map(i => s"value${i}") +val cols = (1 to 10).map(i => col("value") + i) + +val inserted = df.withColumns(colNames, cols) + +assert(inserted.columns === "value" +: colNames) + +checkDataset( + inserted.as[(Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int)], + (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), + (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12), + (3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)) + } } case class Bar(a: Int) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (ddc61e6 -> bf680bf)
This is an automated email from the ASF dual-hosted git repository. viirya pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from ddc61e6 [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1] add bf680bf [SPARK-36210][SQL] Preserve column insertion order in Dataset.withColumns No new revisions were added by this update. Summary of changes: .../src/main/scala/org/apache/spark/sql/Dataset.scala | 6 +++--- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 17 + 2 files changed, 20 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated (7588b46 -> 5565d1c)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git. from 7588b46 [SPARK-36216][PYTHON][TESTS] Increase timeout for StreamingLinearRegressionWithTests. test_parameter_convergence add 5565d1c [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1] No new revisions were added by this update. Summary of changes: .../sql/catalyst/plans/logical/Statistics.scala| 13 +++ .../logical/statsEstimation/EstimationUtils.scala | 18 ++ .../logical/statsEstimation/FilterEstimation.scala | 30 +++- .../logical/statsEstimation/JoinEstimation.scala | 13 +++ .../statsEstimation/FilterEstimationSuite.scala| 40 +- 5 files changed, 80 insertions(+), 34 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1]
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 04ada05 [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1] 04ada05 is described below commit 04ada0598d9c78253bde8378cac0a322c0ed1031 Author: Karen Feng AuthorDate: Tue Jul 20 21:32:13 2021 +0800 [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1] Forces the selectivity estimate for null-based filters to be in the range `[0,1]`. I noticed in a few TPC-DS query tests that the column statistic null count can be higher than the table statistic row count. In the current implementation, the selectivity estimate for `IsNotNull` is negative. No Unit test Closes #33286 from karenfeng/bound-selectivity-est. Authored-by: Karen Feng Signed-off-by: Wenchen Fan (cherry picked from commit ddc61e62b9af5deff1b93e22f466f2a13f281155) Signed-off-by: Wenchen Fan --- .../sql/catalyst/plans/logical/Statistics.scala| 13 +++ .../logical/statsEstimation/EstimationUtils.scala | 18 ++ .../logical/statsEstimation/FilterEstimation.scala | 30 +++- .../logical/statsEstimation/JoinEstimation.scala | 13 +++ .../statsEstimation/FilterEstimationSuite.scala| 40 +- 5 files changed, 80 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala index 1346f80..e80eae6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala @@ -24,6 +24,7 @@ import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -116,6 +117,18 @@ case class ColumnStat( maxLen = maxLen, histogram = histogram, version = version) + + def updateCountStats( + oldNumRows: BigInt, + newNumRows: BigInt, + updatedColumnStatOpt: Option[ColumnStat] = None): ColumnStat = { +val updatedColumnStat = updatedColumnStatOpt.getOrElse(this) +val newDistinctCount = EstimationUtils.updateStat(oldNumRows, newNumRows, + distinctCount, updatedColumnStat.distinctCount) +val newNullCount = EstimationUtils.updateStat(oldNumRows, newNumRows, + nullCount, updatedColumnStat.nullCount) +updatedColumnStat.copy(distinctCount = newDistinctCount, nullCount = newNullCount) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala index 967cced..dafb979 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala @@ -52,14 +52,20 @@ object EstimationUtils { } /** - * Updates (scales down) the number of distinct values if the number of rows decreases after - * some operation (such as filter, join). Otherwise keep it unchanged. + * Updates (scales down) a statistic (eg. number of distinct values) if the number of rows + * decreases after some operation (such as filter, join). Otherwise keep it unchanged. */ - def updateNdv(oldNumRows: BigInt, newNumRows: BigInt, oldNdv: BigInt): BigInt = { -if (newNumRows < oldNumRows) { - ceil(BigDecimal(oldNdv) * BigDecimal(newNumRows) / BigDecimal(oldNumRows)) + def updateStat( + oldNumRows: BigInt, + newNumRows: BigInt, + oldStatOpt: Option[BigInt], + updatedStatOpt: Option[BigInt]): Option[BigInt] = { +if (oldStatOpt.isDefined && updatedStatOpt.isDefined && updatedStatOpt.get > 1 && + newNumRows < oldNumRows) { +// no need to scale down since it is already down to 1 +Some(ceil(BigDecimal(oldStatOpt.get) * BigDecimal(newNumRows) / BigDecimal(oldNumRows))) } else { - oldNdv + updatedStatOpt } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala index 2c5beef..bc341b9 100755 ---
[spark] branch branch-3.2 updated: [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1]
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new f55f882 [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1] f55f882 is described below commit f55f8820fcbc10ed514d3afafe587b7bb68d8d5f Author: Karen Feng AuthorDate: Tue Jul 20 21:32:13 2021 +0800 [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1] ### What changes were proposed in this pull request? Forces the selectivity estimate for null-based filters to be in the range `[0,1]`. ### Why are the changes needed? I noticed in a few TPC-DS query tests that the column statistic null count can be higher than the table statistic row count. In the current implementation, the selectivity estimate for `IsNotNull` is negative. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #33286 from karenfeng/bound-selectivity-est. Authored-by: Karen Feng Signed-off-by: Wenchen Fan (cherry picked from commit ddc61e62b9af5deff1b93e22f466f2a13f281155) Signed-off-by: Wenchen Fan --- .../sql/catalyst/plans/logical/Statistics.scala| 13 + .../logical/statsEstimation/EstimationUtils.scala | 18 +- .../logical/statsEstimation/FilterEstimation.scala | 30 +-- .../logical/statsEstimation/JoinEstimation.scala | 13 +- .../statsEstimation/FilterEstimationSuite.scala| 40 ++- .../approved-plans-modified/q19.sf100/explain.txt | 254 +-- .../q19.sf100/simplified.txt | 104 .../approved-plans-modified/q68.sf100/explain.txt | 207 .../q68.sf100/simplified.txt | 141 ++- .../approved-plans-modified/q73.sf100/explain.txt | 272 +++-- .../q73.sf100/simplified.txt | 56 +++-- 11 files changed, 591 insertions(+), 557 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala index 7db3ee5..6f3ec3b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala @@ -24,6 +24,7 @@ import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -119,6 +120,18 @@ case class ColumnStat( maxLen = maxLen, histogram = histogram, version = version) + + def updateCountStats( + oldNumRows: BigInt, + newNumRows: BigInt, + updatedColumnStatOpt: Option[ColumnStat] = None): ColumnStat = { +val updatedColumnStat = updatedColumnStatOpt.getOrElse(this) +val newDistinctCount = EstimationUtils.updateStat(oldNumRows, newNumRows, + distinctCount, updatedColumnStat.distinctCount) +val newNullCount = EstimationUtils.updateStat(oldNumRows, newNumRows, + nullCount, updatedColumnStat.nullCount) +updatedColumnStat.copy(distinctCount = newDistinctCount, nullCount = newNullCount) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala index 967cced..dafb979 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala @@ -52,14 +52,20 @@ object EstimationUtils { } /** - * Updates (scales down) the number of distinct values if the number of rows decreases after - * some operation (such as filter, join). Otherwise keep it unchanged. + * Updates (scales down) a statistic (eg. number of distinct values) if the number of rows + * decreases after some operation (such as filter, join). Otherwise keep it unchanged. */ - def updateNdv(oldNumRows: BigInt, newNumRows: BigInt, oldNdv: BigInt): BigInt = { -if (newNumRows < oldNumRows) { - ceil(BigDecimal(oldNdv) * BigDecimal(newNumRows) / BigDecimal(oldNumRows)) + def updateStat( + oldNumRows: BigInt, + newNumRows: BigInt, + oldStatOpt: Option[BigInt], + updatedStatOpt: Option[BigInt]): Option[BigInt] = { +if (oldStatOpt.isDefined &&
[spark] branch master updated (033a573 -> ddc61e6)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 033a573 [SPARK-36046][SQL][FOLLOWUP] Implement prettyName for MakeTimestampNTZ and MakeTimestampLTZ add ddc61e6 [SPARK-36079][SQL] Null-based filter estimate should always be in the range [0, 1] No new revisions were added by this update. Summary of changes: .../sql/catalyst/plans/logical/Statistics.scala| 13 + .../logical/statsEstimation/EstimationUtils.scala | 18 +- .../logical/statsEstimation/FilterEstimation.scala | 30 +-- .../logical/statsEstimation/JoinEstimation.scala | 13 +- .../statsEstimation/FilterEstimationSuite.scala| 40 ++- .../approved-plans-modified/q19.sf100/explain.txt | 254 +-- .../q19.sf100/simplified.txt | 104 .../approved-plans-modified/q68.sf100/explain.txt | 207 .../q68.sf100/simplified.txt | 141 ++- .../approved-plans-modified/q73.sf100/explain.txt | 272 +++-- .../q73.sf100/simplified.txt | 56 +++-- 11 files changed, 591 insertions(+), 557 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36046][SQL][FOLLOWUP] Implement prettyName for MakeTimestampNTZ and MakeTimestampLTZ
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 0f6cf8a [SPARK-36046][SQL][FOLLOWUP] Implement prettyName for MakeTimestampNTZ and MakeTimestampLTZ 0f6cf8a is described below commit 0f6cf8abe34416ea4e9db5bd11f9f022c9fd7b7d Author: gengjiaan AuthorDate: Tue Jul 20 21:31:00 2021 +0800 [SPARK-36046][SQL][FOLLOWUP] Implement prettyName for MakeTimestampNTZ and MakeTimestampLTZ ### What changes were proposed in this pull request? This PR follows https://github.com/apache/spark/pull/33299 and implement `prettyName` for `MakeTimestampNTZ` and `MakeTimestampLTZ` based on the discussion show below https://github.com/apache/spark/pull/33299/files#r668423810 ### Why are the changes needed? This PR fix the incorrect alias usecase. ### Does this PR introduce _any_ user-facing change? 'No'. Modifications are transparent to users. ### How was this patch tested? Jenkins test. Closes #33430 from beliefer/SPARK-36046-followup. Authored-by: gengjiaan Signed-off-by: Gengliang Wang (cherry picked from commit 033a5731b44723fd7434c5ee0a021d3787a621ef) Signed-off-by: Gengliang Wang --- .../org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala | 4 ++-- .../apache/spark/sql/catalyst/expressions/datetimeExpressions.scala | 4 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 234da76..5fce4b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -555,8 +555,8 @@ object FunctionRegistry { expression[SessionWindow]("session_window"), expression[MakeDate]("make_date"), expression[MakeTimestamp]("make_timestamp"), -expression[MakeTimestampNTZ]("make_timestamp_ntz", true), -expression[MakeTimestampLTZ]("make_timestamp_ltz", true), +expression[MakeTimestampNTZ]("make_timestamp_ntz"), +expression[MakeTimestampLTZ]("make_timestamp_ltz"), expression[MakeInterval]("make_interval"), expression[MakeDTInterval]("make_dt_interval"), expression[MakeYMInterval]("make_ym_interval"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 1146ba7..bc2e33b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -2403,6 +2403,8 @@ case class MakeTimestampNTZ( MakeTimestamp(year, month, day, hour, min, sec, dataType = TimestampNTZType)) } + override def prettyName: String = "make_timestamp_ntz" + override def exprsReplaced: Seq[Expression] = Seq(year, month, day, hour, min, sec) override protected def withNewChildInternal(newChild: Expression): Expression = @@ -2473,6 +2475,8 @@ case class MakeTimestampLTZ( MakeTimestamp(year, month, day, hour, min, sec, Some(timezone), dataType = TimestampType)) } + override def prettyName: String = "make_timestamp_ltz" + override def exprsReplaced: Seq[Expression] = Seq(year, month, day, hour, min, sec) override protected def withNewChildInternal(newChild: Expression): Expression = - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (801b369 -> 033a573)
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 801b369 [SPARK-36204][INFRA][BUILD] Deduplicate Scala 2.13 daily build add 033a573 [SPARK-36046][SQL][FOLLOWUP] Implement prettyName for MakeTimestampNTZ and MakeTimestampLTZ No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala | 4 ++-- .../apache/spark/sql/catalyst/expressions/datetimeExpressions.scala | 4 2 files changed, 6 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (463fcb3 -> 801b369)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 463fcb3 [SPARK-36207][PYTHON] Expose databaseExists in pyspark.sql.catalog add 801b369 [SPARK-36204][INFRA][BUILD] Deduplicate Scala 2.13 daily build No new revisions were added by this update. Summary of changes: .github/workflows/build_and_test.yml | 54 +- .../workflows/build_and_test_scala213_daily.yml| 115 - dev/run-tests.py | 14 ++- 3 files changed, 61 insertions(+), 122 deletions(-) delete mode 100644 .github/workflows/build_and_test_scala213_daily.yml - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36207][PYTHON] Expose databaseExists in pyspark.sql.catalog
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 463fcb3 [SPARK-36207][PYTHON] Expose databaseExists in pyspark.sql.catalog 463fcb3 is described below commit 463fcb3723d4c5cffd4b787e2d7254ceaf2bca98 Author: Dominik Gehl AuthorDate: Tue Jul 20 22:10:06 2021 +0900 [SPARK-36207][PYTHON] Expose databaseExists in pyspark.sql.catalog ### What changes were proposed in this pull request? Expose databaseExists in pyspark.sql.catalog ### Why are the changes needed? Was available in scala, but not in pyspark ### Does this PR introduce _any_ user-facing change? New method databaseExists ### How was this patch tested? Unit tests in codebase Closes #33416 from dominikgehl/feature/SPARK-36207. Lead-authored-by: Dominik Gehl Co-authored-by: Dominik Gehl Signed-off-by: Hyukjin Kwon --- python/docs/source/reference/pyspark.sql.rst | 1 + python/pyspark/sql/catalog.py| 26 ++ python/pyspark/sql/catalog.pyi | 1 + python/pyspark/sql/tests/test_catalog.py | 8 4 files changed, 36 insertions(+) diff --git a/python/docs/source/reference/pyspark.sql.rst b/python/docs/source/reference/pyspark.sql.rst index 74eac3d..d8e7b41 100644 --- a/python/docs/source/reference/pyspark.sql.rst +++ b/python/docs/source/reference/pyspark.sql.rst @@ -618,6 +618,7 @@ Catalog APIs Catalog.createExternalTable Catalog.createTable Catalog.currentDatabase +Catalog.databaseExists Catalog.dropGlobalTempView Catalog.dropTempView Catalog.isCached diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 8087d63..2d74c73 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -65,6 +65,32 @@ class Catalog(object): locationUri=jdb.locationUri())) return databases +def databaseExists(self, dbName): +"""Check if the database with the specified name exists. + +.. versionadded:: 3.3.0 + +Parameters +-- +dbName : str + name of the database to check existence + +Returns +--- +bool +Indicating whether the database exists + +Examples + +>>> spark.catalog.databaseExists("test_new_database") +False +>>> df = spark.sql("CREATE DATABASE test_new_database") +>>> spark.catalog.databaseExists("test_new_database") +True +>>> df = spark.sql("DROP DATABASE test_new_database") +""" +return self._jcatalog.databaseExists(dbName) + @since(2.0) def listTables(self, dbName=None): """Returns a list of tables/views in the specified database. diff --git a/python/pyspark/sql/catalog.pyi b/python/pyspark/sql/catalog.pyi index 6892719..1eed73a 100644 --- a/python/pyspark/sql/catalog.pyi +++ b/python/pyspark/sql/catalog.pyi @@ -36,6 +36,7 @@ class Catalog: def currentDatabase(self) -> str: ... def setCurrentDatabase(self, dbName: str) -> None: ... def listDatabases(self) -> List[Database]: ... +def databaseExists(self, dbName: str) -> bool: ... def listTables(self, dbName: Optional[str] = ...) -> List[Table]: ... def listFunctions(self, dbName: Optional[str] = ...) -> List[Function]: ... def listColumns( diff --git a/python/pyspark/sql/tests/test_catalog.py b/python/pyspark/sql/tests/test_catalog.py index 8699878..90467fa 100644 --- a/python/pyspark/sql/tests/test_catalog.py +++ b/python/pyspark/sql/tests/test_catalog.py @@ -43,6 +43,14 @@ class CatalogTests(ReusedSQLTestCase): databases = [db.name for db in spark.catalog.listDatabases()] self.assertEqual(sorted(databases), ["default", "some_db"]) +def test_database_exists(self): +# SPARK-36207: testing that database_exists returns correct boolean +spark = self.spark +with self.database("some_db"): +self.assertFalse(spark.catalog.databaseExists("some_db")) +spark.sql("CREATE DATABASE some_db") +self.assertTrue(spark.catalog.databaseExists("some_db")) + def test_list_tables(self): from pyspark.sql.catalog import Table spark = self.spark - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36201][SQL][FOLLOWUP] Schema check should check inner field too
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 7cd89ef [SPARK-36201][SQL][FOLLOWUP] Schema check should check inner field too 7cd89ef is described below commit 7cd89efca5a74dcc2457c7be5f2ef65aeb90a967 Author: Angerszh AuthorDate: Tue Jul 20 21:08:03 2021 +0800 [SPARK-36201][SQL][FOLLOWUP] Schema check should check inner field too ### What changes were proposed in this pull request? When inner field have wrong schema filed name should check field name too. ![image](https://user-images.githubusercontent.com/46485123/126101009-c192d87f-1e18-4355-ad53-1419dacdeb76.png) ### Why are the changes needed? Early check early faield ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes #33409 from AngersZh/SPARK-36201. Authored-by: Angerszh Signed-off-by: Wenchen Fan (cherry picked from commit 251885772d41a572655e950a8e298315f222a803) Signed-off-by: Wenchen Fan --- .../org/apache/spark/sql/execution/command/ddl.scala | 12 ++-- .../apache/spark/sql/execution/command/tables.scala | 2 +- .../execution/datasources/orc/OrcFileFormat.scala| 10 -- .../datasources/parquet/ParquetSchemaConverter.scala | 10 -- .../spark/sql/hive/execution/HiveDDLSuite.scala | 20 5 files changed, 43 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 605d98e..140f9d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -924,23 +924,23 @@ object DDLUtils { } private[sql] def checkDataColNames(table: CatalogTable): Unit = { -checkDataColNames(table, table.dataSchema.fieldNames) +checkDataColNames(table, table.dataSchema) } - private[sql] def checkDataColNames(table: CatalogTable, colNames: Seq[String]): Unit = { + private[sql] def checkDataColNames(table: CatalogTable, schema: StructType): Unit = { table.provider.foreach { _.toLowerCase(Locale.ROOT) match { case HIVE_PROVIDER => val serde = table.storage.serde if (serde == HiveSerDe.sourceToSerDe("orc").get.serde) { -OrcFileFormat.checkFieldNames(colNames) +OrcFileFormat.checkFieldNames(schema) } else if (serde == HiveSerDe.sourceToSerDe("parquet").get.serde || serde == Some("parquet.hive.serde.ParquetHiveSerDe") || serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) { -ParquetSchemaConverter.checkFieldNames(colNames) +ParquetSchemaConverter.checkFieldNames(schema) } -case "parquet" => ParquetSchemaConverter.checkFieldNames(colNames) -case "orc" => OrcFileFormat.checkFieldNames(colNames) +case "parquet" => ParquetSchemaConverter.checkFieldNames(schema) +case "orc" => OrcFileFormat.checkFieldNames(schema) case _ => } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 0599621..f740915 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -236,7 +236,7 @@ case class AlterTableAddColumnsCommand( (colsToAdd ++ catalogTable.schema).map(_.name), "in the table definition of " + table.identifier, conf.caseSensitiveAnalysis) -DDLUtils.checkDataColNames(catalogTable, colsToAdd.map(_.name)) +DDLUtils.checkDataColNames(catalogTable, StructType(colsToAdd)) val existingSchema = CharVarcharUtils.getRawSchema(catalogTable.dataSchema) catalog.alterTableDataSchema(table, StructType(existingSchema ++ colsToAdd)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index d6593ca..9024c78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -52,8 +52,14 @@ private[sql] object OrcFileFormat { } } - def checkFieldNames(names: Seq[String]): Unit = { -names.foreach(checkFieldName) + def checkFieldNames(schema: StructType): Unit = { +schema.foreach { field => + checkFieldName(field.name) +
[spark] branch master updated (e9b18b0 -> 2518857)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e9b18b0 [SPARK-31907][DOCS][SQL] Adding location of SQL API documentation add 2518857 [SPARK-36201][SQL][FOLLOWUP] Schema check should check inner field too No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/command/ddl.scala | 12 ++-- .../apache/spark/sql/execution/command/tables.scala | 2 +- .../execution/datasources/orc/OrcFileFormat.scala| 10 -- .../datasources/parquet/ParquetSchemaConverter.scala | 10 -- .../spark/sql/hive/execution/HiveDDLSuite.scala | 20 5 files changed, 43 insertions(+), 11 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (b70c258 -> e9b18b0)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b70c258 [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition add e9b18b0 [SPARK-31907][DOCS][SQL] Adding location of SQL API documentation No new revisions were added by this update. Summary of changes: sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 677104f [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition 677104f is described below commit 677104f49531a5f5c214729b4d3e0ce91b4f4a64 Author: ulysses-you AuthorDate: Tue Jul 20 20:48:35 2021 +0800 [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition ### What changes were proposed in this pull request? * Add non-empty partition check in `CustomShuffleReaderExec` * Make sure `OptimizeLocalShuffleReader` doesn't return empty partition ### Why are the changes needed? Since SPARK-32083, AQE coalesce always return at least one partition, it should be robust to add non-empty check in `CustomShuffleReaderExec`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? not need Closes #33431 from ulysses-you/non-empty-partition. Authored-by: ulysses-you Signed-off-by: Wenchen Fan (cherry picked from commit b70c25881c8cbac21a62457ad4373a11cfe4) Signed-off-by: Wenchen Fan --- .../sql/execution/adaptive/CustomShuffleReaderExec.scala | 10 ++ .../sql/execution/adaptive/OptimizeLocalShuffleReader.scala | 12 +--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index cea3016..8975054 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -34,11 +34,14 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * * @param child It is usually `ShuffleQueryStageExec`, but can be the shuffle exchange *node during canonicalization. - * @param partitionSpecs The partition specs that defines the arrangement. + * @param partitionSpecs The partition specs that defines the arrangement, requires at least one + *partition. */ case class CustomShuffleReaderExec private( child: SparkPlan, partitionSpecs: Seq[ShufflePartitionSpec]) extends UnaryExecNode { + assert(partitionSpecs.nonEmpty, "CustomShuffleReaderExec requires at least one partition") + // If this reader is to read shuffle files locally, then all partition specs should be // `PartialMapperPartitionSpec`. if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) { @@ -52,8 +55,7 @@ case class CustomShuffleReaderExec private( // If it is a local shuffle reader with one mapper per task, then the output partitioning is // the same as the plan before shuffle. // TODO this check is based on assumptions of callers' behavior but is sufficient for now. -if (partitionSpecs.nonEmpty && -partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) && +if (partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) && partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size == partitionSpecs.length) { child match { @@ -111,7 +113,7 @@ case class CustomShuffleReaderExec private( } @transient private lazy val partitionDataSizes: Option[Seq[Long]] = { -if (partitionSpecs.nonEmpty && !isLocalReader && shuffleStage.get.mapStats.isDefined) { +if (!isLocalReader && shuffleStage.get.mapStats.isDefined) { Some(partitionSpecs.map { case p: CoalescedPartitionSpec => assert(p.dataSize.isDefined) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index c91b999..2103145 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -68,13 +68,11 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule { shuffleStage: ShuffleQueryStageExec, advisoryParallelism: Option[Int]): Seq[ShufflePartitionSpec] = { val numMappers = shuffleStage.shuffle.numMappers +// ShuffleQueryStageExec.mapStats.isDefined promise numMappers > 0 +assert(numMappers > 0) val numReducers = shuffleStage.shuffle.numPartitions val expectedParallelism = advisoryParallelism.getOrElse(numReducers) -val splitPoints = if (numMappers == 0) { - Seq.empty -} else { -
[spark] branch master updated (af978c8 -> b70c258)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from af978c8 [SPARK-36183][SQL] Push down limit 1 through Aggregate if it is group only add b70c258 [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition No new revisions were added by this update. Summary of changes: .../sql/execution/adaptive/CustomShuffleReaderExec.scala | 10 ++ .../sql/execution/adaptive/OptimizeLocalShuffleReader.scala | 12 +--- 2 files changed, 11 insertions(+), 11 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36183][SQL] Push down limit 1 through Aggregate if it is group only
This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new af978c8 [SPARK-36183][SQL] Push down limit 1 through Aggregate if it is group only af978c8 is described below commit af978c87f10c89ee3a7c927ab9b039a2b84a492a Author: Yuming Wang AuthorDate: Tue Jul 20 20:24:07 2021 +0800 [SPARK-36183][SQL] Push down limit 1 through Aggregate if it is group only ### What changes were proposed in this pull request? Push down limit 1 and turn `Aggregate` into `Project` through `Aggregate` if it is group only. For example: ```sql create table t1 using parquet as select id from range(1L); create table t2 using parquet as select id from range(1L); create view v1 as select * from t1 union select * from t2; select * from v1 limit 1; ``` Before this PR | After this PR -- | -- ![image](https://user-images.githubusercontent.com/5399861/125975690-55663515-c4c5-4a04-aedf-f8ba37581ba7.png) | ![image](https://user-images.githubusercontent.com/5399861/126168972-b2675e09-4f93-4026-b1be-af317205e57f.png) ### Why are the changes needed? Improve query performance. This is a real case from the cluster: ![image](https://user-images.githubusercontent.com/5399861/125976597-18cb68d6-b22a-4d80-b270-01b2b13d1ef5.png) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #33397 from wangyum/SPARK-36183. Authored-by: Yuming Wang Signed-off-by: Yuming Wang --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 5 + .../catalyst/optimizer/LimitPushdownSuite.scala| 24 ++ 2 files changed, 29 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index aa2221b..8cc6378 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -647,6 +647,11 @@ object LimitPushDown extends Rule[LogicalPlan] { // There is a Project between LocalLimit and Join if they do not have the same output. case LocalLimit(exp, project @ Project(_, join: Join)) => LocalLimit(exp, project.copy(child = pushLocalLimitThroughJoin(exp, join))) +// Push down limit 1 through Aggregate and turn Aggregate into Project if it is group only. +case Limit(le @ IntegerLiteral(1), a: Aggregate) if a.groupOnly => + Limit(le, Project(a.output, LocalLimit(le, a.child))) +case Limit(le @ IntegerLiteral(1), p @ Project(_, a: Aggregate)) if a.groupOnly => + Limit(le, p.copy(child = Project(a.output, LocalLimit(le, a.child } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index c2503e3..848416b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -239,4 +239,28 @@ class LimitPushdownSuite extends PlanTest { Limit(5, LocalLimit(5, x).join(y, LeftOuter, joinCondition).select("x.a".attr)).analyze comparePlans(optimized, correctAnswer) } + + test("SPARK-36183: Push down limit 1 through Aggregate if it is group only") { +// Push down when it is group only and limit 1. +comparePlans( + Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).limit(1).analyze), + LocalLimit(1, x).select("x.a".attr).limit(1).analyze) + +comparePlans( + Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).select("x.a".attr).limit(1).analyze), + LocalLimit(1, x).select("x.a".attr).select("x.a".attr).limit(1).analyze) + +comparePlans( + Optimize.execute(x.union(y).groupBy("x.a".attr)("x.a".attr).limit(1).analyze), + LocalLimit(1, LocalLimit(1, x).union(LocalLimit(1, y))).select("x.a".attr).limit(1).analyze) + +// No push down +comparePlans( + Optimize.execute(x.groupBy("x.a".attr)("x.a".attr).limit(2).analyze), + x.groupBy("x.a".attr)("x.a".attr).limit(2).analyze) + +comparePlans( + Optimize.execute(x.groupBy("x.a".attr)("x.a".attr, count("x.a".attr)).limit(1).analyze), + x.groupBy("x.a".attr)("x.a".attr, count("x.a".attr)).limit(1).analyze) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.2 updated: [SPARK-34051][DOCS][FOLLOWUP] Document about unicode literals
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 3bc9346 [SPARK-34051][DOCS][FOLLOWUP] Document about unicode literals 3bc9346 is described below commit 3bc9346a3ade89ee3a8c584ee5ef679abd6d0898 Author: Kousuke Saruta AuthorDate: Tue Jul 20 16:58:12 2021 +0800 [SPARK-34051][DOCS][FOLLOWUP] Document about unicode literals ### What changes were proposed in this pull request? This PR documents about unicode literals added in SPARK-34051 (#31096) and a past PR in `sql-ref-literals.md`. ### Why are the changes needed? Notice users about the literals. ### Does this PR introduce _any_ user-facing change? Yes, but just add a sentence. ### How was this patch tested? Built the document and confirmed the result. ``` SKIP_API=1 bundle exec jekyll build ``` ![unicode-literals](https://user-images.githubusercontent.com/4736016/126283923-944dc162-1817-47bc-a7e8-c3145225586b.png) Closes #33434 from sarutak/unicode-literal-doc. Authored-by: Kousuke Saruta Signed-off-by: Wenchen Fan (cherry picked from commit ba1294ea5a25e908c19e3d4e73c6b7a420bb006e) Signed-off-by: Wenchen Fan --- docs/sql-ref-literals.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sql-ref-literals.md b/docs/sql-ref-literals.md index 3dbed84..b23c63a 100644 --- a/docs/sql-ref-literals.md +++ b/docs/sql-ref-literals.md @@ -44,6 +44,8 @@ A string literal is used to specify a character string value. * **char** One character from the character set. Use `\` to escape special characters (e.g., `'` or `\`). +To represent unicode characters, use 16-bit or 32-bit unicode escape of the form `\u` or `\U`, +where and are 16-bit and 32-bit code points in hexadecimal respectively (e.g., `\u3042` for `あ` and `\U0001F44D` for ``). Examples - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (c77acf0 -> ba1294e)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from c77acf0 [SPARK-35546][SHUFFLE] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way add ba1294e [SPARK-34051][DOCS][FOLLOWUP] Document about unicode literals No new revisions were added by this update. Summary of changes: docs/sql-ref-literals.md | 2 ++ 1 file changed, 2 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org