[spark] branch branch-3.0 updated: [SPARK-31070][SQL] make skew join split skewed partitions more evenly
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 3f23529 [SPARK-31070][SQL] make skew join split skewed partitions more evenly 3f23529 is described below commit 3f23529cac3a306afe0ed175b8034d4f24b08acb Author: Wenchen Fan AuthorDate: Tue Mar 10 21:50:44 2020 -0700 [SPARK-31070][SQL] make skew join split skewed partitions more evenly ### What changes were proposed in this pull request? There are two problems when splitting skewed partitions: 1. It's impossible that we can't split the skewed partition, then we shouldn't create a skew join. 2. When splitting, it's possible that we create a partition for very small amount of data.. This PR fixes them 1. don't create `PartialReducerPartitionSpec` if we can't split. 2. merge small partitions to the previous partition. ### Why are the changes needed? make skew join split skewed partitions more evenly ### Does this PR introduce any user-facing change? no ### How was this patch tested? updated test Closes #27833 from cloud-fan/aqe. Authored-by: Wenchen Fan Signed-off-by: gatorsmile (cherry picked from commit d5f5720efa7232f1339976462d462a7360978ab5) Signed-off-by: gatorsmile --- .../adaptive/CoalesceShufflePartitions.scala | 2 +- .../execution/adaptive/OptimizeSkewedJoin.scala| 44 +++ ...Coalescer.scala => ShufflePartitionsUtil.scala} | 50 +- ...uite.scala => ShufflePartitionsUtilSuite.scala} | 32 -- .../adaptive/AdaptiveQueryExecSuite.scala | 14 +++--- 5 files changed, 102 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index a8e2d8e..d779a20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -66,7 +66,7 @@ case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { val distinctNumPreShufflePartitions = validMetrics.map(stats => stats.bytesByPartitionId.length).distinct if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { -val partitionSpecs = ShufflePartitionsCoalescer.coalescePartitions( +val partitionSpecs = ShufflePartitionsUtil.coalescePartitions( validMetrics.toArray, firstPartitionIndex = 0, lastPartitionIndex = distinctNumPreShufflePartitions.head, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 4387409..7f52393 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.adaptive import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import org.apache.commons.io.FileUtils @@ -111,22 +110,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { targetSize: Long): Seq[Int] = { val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId) -val partitionStartIndices = ArrayBuffer[Int]() -partitionStartIndices += 0 -var i = 0 -var postMapPartitionSize = 0L -while (i < mapPartitionSizes.length) { - val nextMapPartitionSize = mapPartitionSizes(i) - if (i > 0 && postMapPartitionSize + nextMapPartitionSize > targetSize) { -partitionStartIndices += i -postMapPartitionSize = nextMapPartitionSize - } else { -postMapPartitionSize += nextMapPartitionSize - } - i += 1 -} - -partitionStartIndices +ShufflePartitionsUtil.splitSizeListByTargetSize(mapPartitionSizes, targetSize) } private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics = { @@ -211,21 +195,25 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { } val leftParts = if (isLeftSkew) { -leftSkewDesc.addPartitionSize(leftSize) -createSkewPartitions( - partitionIndex, - getMapStartIndices(left, partitionIndex, leftTargetSize), - getNumMappers(left)) +val mapStartIndices = getMapStartIndices(left, partitionIndex,
[spark] branch branch-3.0 updated: [SPARK-31070][SQL] make skew join split skewed partitions more evenly
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 3f23529 [SPARK-31070][SQL] make skew join split skewed partitions more evenly 3f23529 is described below commit 3f23529cac3a306afe0ed175b8034d4f24b08acb Author: Wenchen Fan AuthorDate: Tue Mar 10 21:50:44 2020 -0700 [SPARK-31070][SQL] make skew join split skewed partitions more evenly ### What changes were proposed in this pull request? There are two problems when splitting skewed partitions: 1. It's impossible that we can't split the skewed partition, then we shouldn't create a skew join. 2. When splitting, it's possible that we create a partition for very small amount of data.. This PR fixes them 1. don't create `PartialReducerPartitionSpec` if we can't split. 2. merge small partitions to the previous partition. ### Why are the changes needed? make skew join split skewed partitions more evenly ### Does this PR introduce any user-facing change? no ### How was this patch tested? updated test Closes #27833 from cloud-fan/aqe. Authored-by: Wenchen Fan Signed-off-by: gatorsmile (cherry picked from commit d5f5720efa7232f1339976462d462a7360978ab5) Signed-off-by: gatorsmile --- .../adaptive/CoalesceShufflePartitions.scala | 2 +- .../execution/adaptive/OptimizeSkewedJoin.scala| 44 +++ ...Coalescer.scala => ShufflePartitionsUtil.scala} | 50 +- ...uite.scala => ShufflePartitionsUtilSuite.scala} | 32 -- .../adaptive/AdaptiveQueryExecSuite.scala | 14 +++--- 5 files changed, 102 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index a8e2d8e..d779a20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -66,7 +66,7 @@ case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { val distinctNumPreShufflePartitions = validMetrics.map(stats => stats.bytesByPartitionId.length).distinct if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { -val partitionSpecs = ShufflePartitionsCoalescer.coalescePartitions( +val partitionSpecs = ShufflePartitionsUtil.coalescePartitions( validMetrics.toArray, firstPartitionIndex = 0, lastPartitionIndex = distinctNumPreShufflePartitions.head, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 4387409..7f52393 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.adaptive import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import org.apache.commons.io.FileUtils @@ -111,22 +110,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { targetSize: Long): Seq[Int] = { val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId) -val partitionStartIndices = ArrayBuffer[Int]() -partitionStartIndices += 0 -var i = 0 -var postMapPartitionSize = 0L -while (i < mapPartitionSizes.length) { - val nextMapPartitionSize = mapPartitionSizes(i) - if (i > 0 && postMapPartitionSize + nextMapPartitionSize > targetSize) { -partitionStartIndices += i -postMapPartitionSize = nextMapPartitionSize - } else { -postMapPartitionSize += nextMapPartitionSize - } - i += 1 -} - -partitionStartIndices +ShufflePartitionsUtil.splitSizeListByTargetSize(mapPartitionSizes, targetSize) } private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics = { @@ -211,21 +195,25 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { } val leftParts = if (isLeftSkew) { -leftSkewDesc.addPartitionSize(leftSize) -createSkewPartitions( - partitionIndex, - getMapStartIndices(left, partitionIndex, leftTargetSize), - getNumMappers(left)) +val mapStartIndices = getMapStartIndices(left, partitionIndex,
[spark] branch master updated (93def95 -> d5f5720)
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 93def95 [SPARK-31095][BUILD] Upgrade netty-all to 4.1.47.Final add d5f5720 [SPARK-31070][SQL] make skew join split skewed partitions more evenly No new revisions were added by this update. Summary of changes: .../adaptive/CoalesceShufflePartitions.scala | 2 +- .../execution/adaptive/OptimizeSkewedJoin.scala| 44 +++ ...Coalescer.scala => ShufflePartitionsUtil.scala} | 50 +- ...uite.scala => ShufflePartitionsUtilSuite.scala} | 32 -- .../adaptive/AdaptiveQueryExecSuite.scala | 14 +++--- 5 files changed, 102 insertions(+), 40 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/{ShufflePartitionsCoalescer.scala => ShufflePartitionsUtil.scala} (73%) rename sql/core/src/test/scala/org/apache/spark/sql/execution/{ShufflePartitionsCoalescerSuite.scala => ShufflePartitionsUtilSuite.scala} (88%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (93def95 -> d5f5720)
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 93def95 [SPARK-31095][BUILD] Upgrade netty-all to 4.1.47.Final add d5f5720 [SPARK-31070][SQL] make skew join split skewed partitions more evenly No new revisions were added by this update. Summary of changes: .../adaptive/CoalesceShufflePartitions.scala | 2 +- .../execution/adaptive/OptimizeSkewedJoin.scala| 44 +++ ...Coalescer.scala => ShufflePartitionsUtil.scala} | 50 +- ...uite.scala => ShufflePartitionsUtilSuite.scala} | 32 -- .../adaptive/AdaptiveQueryExecSuite.scala | 14 +++--- 5 files changed, 102 insertions(+), 40 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/{ShufflePartitionsCoalescer.scala => ShufflePartitionsUtil.scala} (73%) rename sql/core/src/test/scala/org/apache/spark/sql/execution/{ShufflePartitionsCoalescerSuite.scala => ShufflePartitionsUtilSuite.scala} (88%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31095][BUILD] Upgrade netty-all to 4.1.47.Final
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new d1f5df4 [SPARK-31095][BUILD] Upgrade netty-all to 4.1.47.Final d1f5df4 is described below commit d1f5df40cb7687c5fd3145d3d629fb1069227638 Author: Dongjoon Hyun AuthorDate: Tue Mar 10 17:50:34 2020 -0700 [SPARK-31095][BUILD] Upgrade netty-all to 4.1.47.Final ### What changes were proposed in this pull request? This PR aims to bring the bug fixes from the latest netty-all. ### Why are the changes needed? - 4.1.47.Final: https://github.com/netty/netty/milestone/222?closed=1 (15 patches or issues) - 4.1.46.Final: https://github.com/netty/netty/milestone/221?closed=1 (80 patches or issues) - 4.1.45.Final: https://github.com/netty/netty/milestone/220?closed=1 (23 patches or issues) - 4.1.44.Final: https://github.com/netty/netty/milestone/218?closed=1 (113 patches or issues) - 4.1.43.Final: https://github.com/netty/netty/milestone/217?closed=1 (63 patches or issues) ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Pass the Jenkins with the existing tests. Closes #27869 from dongjoon-hyun/SPARK-31095. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit 93def95b0801842e0288a77b3a97f84d31b57366) Signed-off-by: Dongjoon Hyun --- dev/deps/spark-deps-hadoop-2.7-hive-1.2 | 2 +- dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 2 +- dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 2 +- pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 b/dev/deps/spark-deps-hadoop-2.7-hive-1.2 index 828b1a6..39f7262 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-1.2 @@ -155,7 +155,7 @@ metrics-jmx/4.1.1//metrics-jmx-4.1.1.jar metrics-json/4.1.1//metrics-json-4.1.1.jar metrics-jvm/4.1.1//metrics-jvm-4.1.1.jar minlog/1.3.0//minlog-1.3.0.jar -netty-all/4.1.42.Final//netty-all-4.1.42.Final.jar +netty-all/4.1.47.Final//netty-all-4.1.47.Final.jar objenesis/2.5.1//objenesis-2.5.1.jar okhttp/3.12.6//okhttp-3.12.6.jar okio/1.15.0//okio-1.15.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index 8a65540..26ac30d 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -170,7 +170,7 @@ metrics-jmx/4.1.1//metrics-jmx-4.1.1.jar metrics-json/4.1.1//metrics-json-4.1.1.jar metrics-jvm/4.1.1//metrics-jvm-4.1.1.jar minlog/1.3.0//minlog-1.3.0.jar -netty-all/4.1.42.Final//netty-all-4.1.42.Final.jar +netty-all/4.1.47.Final//netty-all-4.1.47.Final.jar objenesis/2.5.1//objenesis-2.5.1.jar okhttp/3.12.6//okhttp-3.12.6.jar okio/1.15.0//okio-1.15.0.jar diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index 4dddbba..e908ec8 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -183,7 +183,7 @@ metrics-json/4.1.1//metrics-json-4.1.1.jar metrics-jvm/4.1.1//metrics-jvm-4.1.1.jar minlog/1.3.0//minlog-1.3.0.jar mssql-jdbc/6.2.1.jre7//mssql-jdbc-6.2.1.jre7.jar -netty-all/4.1.42.Final//netty-all-4.1.42.Final.jar +netty-all/4.1.47.Final//netty-all-4.1.47.Final.jar nimbus-jose-jwt/4.41.1//nimbus-jose-jwt-4.41.1.jar objenesis/2.5.1//objenesis-2.5.1.jar okhttp/2.7.5//okhttp-2.7.5.jar diff --git a/pom.xml b/pom.xml index 8a46197..262f3ac 100644 --- a/pom.xml +++ b/pom.xml @@ -698,7 +698,7 @@ io.netty netty-all -4.1.42.Final +4.1.47.Final org.apache.derby - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31095][BUILD] Upgrade netty-all to 4.1.47.Final
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new d1f5df4 [SPARK-31095][BUILD] Upgrade netty-all to 4.1.47.Final d1f5df4 is described below commit d1f5df40cb7687c5fd3145d3d629fb1069227638 Author: Dongjoon Hyun AuthorDate: Tue Mar 10 17:50:34 2020 -0700 [SPARK-31095][BUILD] Upgrade netty-all to 4.1.47.Final ### What changes were proposed in this pull request? This PR aims to bring the bug fixes from the latest netty-all. ### Why are the changes needed? - 4.1.47.Final: https://github.com/netty/netty/milestone/222?closed=1 (15 patches or issues) - 4.1.46.Final: https://github.com/netty/netty/milestone/221?closed=1 (80 patches or issues) - 4.1.45.Final: https://github.com/netty/netty/milestone/220?closed=1 (23 patches or issues) - 4.1.44.Final: https://github.com/netty/netty/milestone/218?closed=1 (113 patches or issues) - 4.1.43.Final: https://github.com/netty/netty/milestone/217?closed=1 (63 patches or issues) ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Pass the Jenkins with the existing tests. Closes #27869 from dongjoon-hyun/SPARK-31095. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit 93def95b0801842e0288a77b3a97f84d31b57366) Signed-off-by: Dongjoon Hyun --- dev/deps/spark-deps-hadoop-2.7-hive-1.2 | 2 +- dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 2 +- dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 2 +- pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 b/dev/deps/spark-deps-hadoop-2.7-hive-1.2 index 828b1a6..39f7262 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-1.2 @@ -155,7 +155,7 @@ metrics-jmx/4.1.1//metrics-jmx-4.1.1.jar metrics-json/4.1.1//metrics-json-4.1.1.jar metrics-jvm/4.1.1//metrics-jvm-4.1.1.jar minlog/1.3.0//minlog-1.3.0.jar -netty-all/4.1.42.Final//netty-all-4.1.42.Final.jar +netty-all/4.1.47.Final//netty-all-4.1.47.Final.jar objenesis/2.5.1//objenesis-2.5.1.jar okhttp/3.12.6//okhttp-3.12.6.jar okio/1.15.0//okio-1.15.0.jar diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index 8a65540..26ac30d 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -170,7 +170,7 @@ metrics-jmx/4.1.1//metrics-jmx-4.1.1.jar metrics-json/4.1.1//metrics-json-4.1.1.jar metrics-jvm/4.1.1//metrics-jvm-4.1.1.jar minlog/1.3.0//minlog-1.3.0.jar -netty-all/4.1.42.Final//netty-all-4.1.42.Final.jar +netty-all/4.1.47.Final//netty-all-4.1.47.Final.jar objenesis/2.5.1//objenesis-2.5.1.jar okhttp/3.12.6//okhttp-3.12.6.jar okio/1.15.0//okio-1.15.0.jar diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index 4dddbba..e908ec8 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -183,7 +183,7 @@ metrics-json/4.1.1//metrics-json-4.1.1.jar metrics-jvm/4.1.1//metrics-jvm-4.1.1.jar minlog/1.3.0//minlog-1.3.0.jar mssql-jdbc/6.2.1.jre7//mssql-jdbc-6.2.1.jre7.jar -netty-all/4.1.42.Final//netty-all-4.1.42.Final.jar +netty-all/4.1.47.Final//netty-all-4.1.47.Final.jar nimbus-jose-jwt/4.41.1//nimbus-jose-jwt-4.41.1.jar objenesis/2.5.1//objenesis-2.5.1.jar okhttp/2.7.5//okhttp-2.7.5.jar diff --git a/pom.xml b/pom.xml index 8a46197..262f3ac 100644 --- a/pom.xml +++ b/pom.xml @@ -698,7 +698,7 @@ io.netty netty-all -4.1.42.Final +4.1.47.Final org.apache.derby - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0f54dc7 -> 93def95)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0f54dc7 [SPARK-30962][SQL][DOC] Documentation for Alter table command phase 2 add 93def95 [SPARK-31095][BUILD] Upgrade netty-all to 4.1.47.Final No new revisions were added by this update. Summary of changes: dev/deps/spark-deps-hadoop-2.7-hive-1.2 | 2 +- dev/deps/spark-deps-hadoop-2.7-hive-2.3 | 2 +- dev/deps/spark-deps-hadoop-3.2-hive-2.3 | 2 +- pom.xml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-30962][SQL][DOC] Documentation for Alter table command phase 2
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new b8e2cb3 [SPARK-30962][SQL][DOC] Documentation for Alter table command phase 2 b8e2cb3 is described below commit b8e2cb32cbc75601d6d7a841362676cf2f273bda Author: Qianyang Yu AuthorDate: Wed Mar 11 08:47:30 2020 +0900 [SPARK-30962][SQL][DOC] Documentation for Alter table command phase 2 ### What changes were proposed in this pull request? ### Why are the changes needed? Based on [JIRA 30962](https://issues.apache.org/jira/browse/SPARK-30962), we want to add all the support `Alter Table` syntax for V1 table. ### Does this PR introduce any user-facing change? Yes ### How was this patch tested? Before: The documentation looks like [Alter Table](https://github.com/apache/spark/pull/25590) After: https://user-images.githubusercontent.com/7550280/75824837-168c7e00-5d59-11ea-9751-d1dab0f5a892.png;> https://user-images.githubusercontent.com/7550280/75824859-21dfa980-5d59-11ea-8b49-3adf6eb55fc6.png;> https://user-images.githubusercontent.com/7550280/75824884-2e640200-5d59-11ea-81ef-d77d0a8efee2.png;> https://user-images.githubusercontent.com/7550280/75824910-39b72d80-5d59-11ea-84d0-bffa2499f086.png;> https://user-images.githubusercontent.com/7550280/75824937-45a2ef80-5d59-11ea-932c-314924856834.png;> https://user-images.githubusercontent.com/7550280/75824965-4cc9fd80-5d59-11ea-815b-8c1ebad310b1.png;> https://user-images.githubusercontent.com/7550280/75824978-518eb180-5d59-11ea-8a55-2fa26376b9c1.png;> https://user-images.githubusercontent.com/7550280/75825001-5bb0b000-5d59-11ea-8dd9-dcfbfa1b4330.png;> Notes: Those syntaxes are not supported by v1 Table. - `ALTER TABLE .. RENAME COLUMN` - `ALTER TABLE ... DROP (COLUMN | COLUMNS)` - `ALTER TABLE ... (ALTER | CHANGE) COLUMN? alterColumnAction` only support change comments, not other actions: `datatype, position, (SET | DROP) NOT NULL` - `ALTER TABLE .. CHANGE COLUMN?` - `ALTER TABLE REPLACE COLUMNS` - `ALTER TABLE ... RECOVER PARTITIONS` - Closes #27779 from kevinyu98/spark-30962-alterT. Authored-by: Qianyang Yu Signed-off-by: Takeshi Yamamuro (cherry picked from commit 0f54dc7c03ed975ecb7f776a0151b9325d21e85c) Signed-off-by: Takeshi Yamamuro --- docs/sql-ref-syntax-ddl-alter-table.md | 213 - 1 file changed, 210 insertions(+), 3 deletions(-) diff --git a/docs/sql-ref-syntax-ddl-alter-table.md b/docs/sql-ref-syntax-ddl-alter-table.md index 373fa8d..2dd808b 100644 --- a/docs/sql-ref-syntax-ddl-alter-table.md +++ b/docs/sql-ref-syntax-ddl-alter-table.md @@ -23,14 +23,13 @@ license: | `ALTER TABLE` statement changes the schema or properties of a table. ### RENAME -`ALTER TABLE RENAME` statement changes the table name of an existing table in the database. +`ALTER TABLE RENAME TO` statement changes the table name of an existing table in the database. Syntax {% highlight sql %} ALTER TABLE table_identifier RENAME TO table_identifier ALTER TABLE table_identifier partition_spec RENAME TO partition_spec - {% endhighlight %} Parameters @@ -83,6 +82,109 @@ ALTER TABLE table_identifier ADD COLUMNS ( col_spec [ , col_spec ... ] ) +### ALTER OR CHANGE COLUMN +`ALTER TABLE ALTER COLUMN` or `ALTER TABLE CHANGE COLUMN` statement changes column's comment. + + Syntax +{% highlight sql %} +ALTER TABLE table_identifier { ALTER | CHANGE } [ COLUMN ] col_spec alterColumnAction +{% endhighlight %} + + Parameters + + table_identifier + +Specifies a table name, which may be optionally qualified with a database name. +Syntax: + +[ database_name. ] table_name + + + + + + COLUMN col_spec + Specifies the column to be altered or be changed. + + + + alterColumnAction + + Change the comment string. + Syntax: + +COMMENT STRING + + + + + +### ADD AND DROP PARTITION + + ADD PARTITION +`ALTER TABLE ADD` statement adds partition to the partitioned table. + +# Syntax +{% highlight sql %} +ALTER TABLE table_identifier ADD [IF NOT EXISTS] +( partition_spec [ partition_spec ... ] ) +{% endhighlight %} + +# Parameters + + table_identifier + +Specifies a table name, which may be optionally qualified with a database name. +Syntax: + +[ database_name. ] table_name + + + + + + partition_spec + +Partition to be added. +Syntax: + +PARTITION ( partition_col_name = partition_col_val [ , ... ] ) + + + + + DROP PARTITION +`ALTER TABLE DROP` statement drops the partition of the table. + +#
[spark] branch master updated: [SPARK-30962][SQL][DOC] Documentation for Alter table command phase 2
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0f54dc7 [SPARK-30962][SQL][DOC] Documentation for Alter table command phase 2 0f54dc7 is described below commit 0f54dc7c03ed975ecb7f776a0151b9325d21e85c Author: Qianyang Yu AuthorDate: Wed Mar 11 08:47:30 2020 +0900 [SPARK-30962][SQL][DOC] Documentation for Alter table command phase 2 ### What changes were proposed in this pull request? ### Why are the changes needed? Based on [JIRA 30962](https://issues.apache.org/jira/browse/SPARK-30962), we want to add all the support `Alter Table` syntax for V1 table. ### Does this PR introduce any user-facing change? Yes ### How was this patch tested? Before: The documentation looks like [Alter Table](https://github.com/apache/spark/pull/25590) After: https://user-images.githubusercontent.com/7550280/75824837-168c7e00-5d59-11ea-9751-d1dab0f5a892.png;> https://user-images.githubusercontent.com/7550280/75824859-21dfa980-5d59-11ea-8b49-3adf6eb55fc6.png;> https://user-images.githubusercontent.com/7550280/75824884-2e640200-5d59-11ea-81ef-d77d0a8efee2.png;> https://user-images.githubusercontent.com/7550280/75824910-39b72d80-5d59-11ea-84d0-bffa2499f086.png;> https://user-images.githubusercontent.com/7550280/75824937-45a2ef80-5d59-11ea-932c-314924856834.png;> https://user-images.githubusercontent.com/7550280/75824965-4cc9fd80-5d59-11ea-815b-8c1ebad310b1.png;> https://user-images.githubusercontent.com/7550280/75824978-518eb180-5d59-11ea-8a55-2fa26376b9c1.png;> https://user-images.githubusercontent.com/7550280/75825001-5bb0b000-5d59-11ea-8dd9-dcfbfa1b4330.png;> Notes: Those syntaxes are not supported by v1 Table. - `ALTER TABLE .. RENAME COLUMN` - `ALTER TABLE ... DROP (COLUMN | COLUMNS)` - `ALTER TABLE ... (ALTER | CHANGE) COLUMN? alterColumnAction` only support change comments, not other actions: `datatype, position, (SET | DROP) NOT NULL` - `ALTER TABLE .. CHANGE COLUMN?` - `ALTER TABLE REPLACE COLUMNS` - `ALTER TABLE ... RECOVER PARTITIONS` - Closes #27779 from kevinyu98/spark-30962-alterT. Authored-by: Qianyang Yu Signed-off-by: Takeshi Yamamuro --- docs/sql-ref-syntax-ddl-alter-table.md | 213 - 1 file changed, 210 insertions(+), 3 deletions(-) diff --git a/docs/sql-ref-syntax-ddl-alter-table.md b/docs/sql-ref-syntax-ddl-alter-table.md index 373fa8d..2dd808b 100644 --- a/docs/sql-ref-syntax-ddl-alter-table.md +++ b/docs/sql-ref-syntax-ddl-alter-table.md @@ -23,14 +23,13 @@ license: | `ALTER TABLE` statement changes the schema or properties of a table. ### RENAME -`ALTER TABLE RENAME` statement changes the table name of an existing table in the database. +`ALTER TABLE RENAME TO` statement changes the table name of an existing table in the database. Syntax {% highlight sql %} ALTER TABLE table_identifier RENAME TO table_identifier ALTER TABLE table_identifier partition_spec RENAME TO partition_spec - {% endhighlight %} Parameters @@ -83,6 +82,109 @@ ALTER TABLE table_identifier ADD COLUMNS ( col_spec [ , col_spec ... ] ) +### ALTER OR CHANGE COLUMN +`ALTER TABLE ALTER COLUMN` or `ALTER TABLE CHANGE COLUMN` statement changes column's comment. + + Syntax +{% highlight sql %} +ALTER TABLE table_identifier { ALTER | CHANGE } [ COLUMN ] col_spec alterColumnAction +{% endhighlight %} + + Parameters + + table_identifier + +Specifies a table name, which may be optionally qualified with a database name. +Syntax: + +[ database_name. ] table_name + + + + + + COLUMN col_spec + Specifies the column to be altered or be changed. + + + + alterColumnAction + + Change the comment string. + Syntax: + +COMMENT STRING + + + + + +### ADD AND DROP PARTITION + + ADD PARTITION +`ALTER TABLE ADD` statement adds partition to the partitioned table. + +# Syntax +{% highlight sql %} +ALTER TABLE table_identifier ADD [IF NOT EXISTS] +( partition_spec [ partition_spec ... ] ) +{% endhighlight %} + +# Parameters + + table_identifier + +Specifies a table name, which may be optionally qualified with a database name. +Syntax: + +[ database_name. ] table_name + + + + + + partition_spec + +Partition to be added. +Syntax: + +PARTITION ( partition_col_name = partition_col_val [ , ... ] ) + + + + + DROP PARTITION +`ALTER TABLE DROP` statement drops the partition of the table. + +# Syntax +{% highlight sql %} +ALTER TABLE table_identifier DROP [ IF EXISTS ] partition_spec [PURGE] +{% endhighlight
[spark] branch branch-3.0 updated: [SPARK-31037][SQL][FOLLOW-UP] Replace legacy ReduceNumShufflePartitions with CoalesceShufflePartitions in comment
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 57bf23c [SPARK-31037][SQL][FOLLOW-UP] Replace legacy ReduceNumShufflePartitions with CoalesceShufflePartitions in comment 57bf23c is described below commit 57bf23c01b2cffe5011a9d15eb68eff5c28519f4 Author: yi.wu AuthorDate: Tue Mar 10 11:09:36 2020 -0700 [SPARK-31037][SQL][FOLLOW-UP] Replace legacy ReduceNumShufflePartitions with CoalesceShufflePartitions in comment ### What changes were proposed in this pull request? Replace legacy `ReduceNumShufflePartitions` with `CoalesceShufflePartitions` in comment. ### Why are the changes needed? Rule `ReduceNumShufflePartitions` has renamed to `CoalesceShufflePartitions`, we should update related comment as well. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? N/A. Closes #27865 from Ngone51/spark_31037_followup. Authored-by: yi.wu Signed-off-by: Dongjoon Hyun (cherry picked from commit 34be83e08b6f5313bdd9d165d3e203d06eff677b) Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 6 +++--- .../apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala| 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index fc88a7f..c1486aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -97,12 +97,12 @@ case class AdaptiveSparkPlanExec( @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, context.subqueryCache), // Here the 'OptimizeSkewedJoin' rule should be executed -// before 'ReduceNumShufflePartitions', as the skewed partition handled -// in 'OptimizeSkewedJoin' rule, should be omitted in 'ReduceNumShufflePartitions'. +// before 'CoalesceShufflePartitions', as the skewed partition handled +// in 'OptimizeSkewedJoin' rule, should be omitted in 'CoalesceShufflePartitions'. OptimizeSkewedJoin(conf), CoalesceShufflePartitions(conf), // The rule of 'OptimizeLocalShuffleReader' need to make use of the 'partitionStartIndices' -// in 'ReduceNumShufflePartitions' rule. So it must be after 'ReduceNumShufflePartitions' rule. +// in 'CoalesceShufflePartitions' rule. So it must be after 'CoalesceShufflePartitions' rule. OptimizeLocalShuffleReader(conf), ApplyColumnarRulesAndInsertTransitions(conf, context.session.sessionState.columnarRules), CollapseCodegenStages(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index c3bcce4..4387409 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -52,7 +52,7 @@ import org.apache.spark.sql.internal.SQLConf * (L4-1, R4-1), (L4-2, R4-1), (L4-1, R4-2), (L4-2, R4-2) * * Note that, when this rule is enabled, it also coalesces non-skewed partitions like - * `ReduceNumShufflePartitions` does. + * `CoalesceShufflePartitions` does. */ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { @@ -191,7 +191,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] // This is used to delay the creation of non-skew partitions so that we can potentially - // coalesce them like `ReduceNumShufflePartitions` does. + // coalesce them like `CoalesceShufflePartitions` does. val nonSkewPartitionIndices = mutable.ArrayBuffer.empty[Int] val leftSkewDesc = new SkewDesc val rightSkewDesc = new SkewDesc - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31037][SQL][FOLLOW-UP] Replace legacy ReduceNumShufflePartitions with CoalesceShufflePartitions in comment
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 57bf23c [SPARK-31037][SQL][FOLLOW-UP] Replace legacy ReduceNumShufflePartitions with CoalesceShufflePartitions in comment 57bf23c is described below commit 57bf23c01b2cffe5011a9d15eb68eff5c28519f4 Author: yi.wu AuthorDate: Tue Mar 10 11:09:36 2020 -0700 [SPARK-31037][SQL][FOLLOW-UP] Replace legacy ReduceNumShufflePartitions with CoalesceShufflePartitions in comment ### What changes were proposed in this pull request? Replace legacy `ReduceNumShufflePartitions` with `CoalesceShufflePartitions` in comment. ### Why are the changes needed? Rule `ReduceNumShufflePartitions` has renamed to `CoalesceShufflePartitions`, we should update related comment as well. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? N/A. Closes #27865 from Ngone51/spark_31037_followup. Authored-by: yi.wu Signed-off-by: Dongjoon Hyun (cherry picked from commit 34be83e08b6f5313bdd9d165d3e203d06eff677b) Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 6 +++--- .../apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala| 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index fc88a7f..c1486aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -97,12 +97,12 @@ case class AdaptiveSparkPlanExec( @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, context.subqueryCache), // Here the 'OptimizeSkewedJoin' rule should be executed -// before 'ReduceNumShufflePartitions', as the skewed partition handled -// in 'OptimizeSkewedJoin' rule, should be omitted in 'ReduceNumShufflePartitions'. +// before 'CoalesceShufflePartitions', as the skewed partition handled +// in 'OptimizeSkewedJoin' rule, should be omitted in 'CoalesceShufflePartitions'. OptimizeSkewedJoin(conf), CoalesceShufflePartitions(conf), // The rule of 'OptimizeLocalShuffleReader' need to make use of the 'partitionStartIndices' -// in 'ReduceNumShufflePartitions' rule. So it must be after 'ReduceNumShufflePartitions' rule. +// in 'CoalesceShufflePartitions' rule. So it must be after 'CoalesceShufflePartitions' rule. OptimizeLocalShuffleReader(conf), ApplyColumnarRulesAndInsertTransitions(conf, context.session.sessionState.columnarRules), CollapseCodegenStages(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index c3bcce4..4387409 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -52,7 +52,7 @@ import org.apache.spark.sql.internal.SQLConf * (L4-1, R4-1), (L4-2, R4-1), (L4-1, R4-2), (L4-2, R4-2) * * Note that, when this rule is enabled, it also coalesces non-skewed partitions like - * `ReduceNumShufflePartitions` does. + * `CoalesceShufflePartitions` does. */ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { @@ -191,7 +191,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] // This is used to delay the creation of non-skew partitions so that we can potentially - // coalesce them like `ReduceNumShufflePartitions` does. + // coalesce them like `CoalesceShufflePartitions` does. val nonSkewPartitionIndices = mutable.ArrayBuffer.empty[Int] val leftSkewDesc = new SkewDesc val rightSkewDesc = new SkewDesc - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (3bd6ebf -> 34be83e)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 3bd6ebf [SPARK-30189][SQL] Interval from year-month/date-time string should handle whitespaces add 34be83e [SPARK-31037][SQL][FOLLOW-UP] Replace legacy ReduceNumShufflePartitions with CoalesceShufflePartitions in comment No new revisions were added by this update. Summary of changes: .../apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 6 +++--- .../apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala| 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31037][SQL][FOLLOW-UP] Replace legacy ReduceNumShufflePartitions with CoalesceShufflePartitions in comment
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 57bf23c [SPARK-31037][SQL][FOLLOW-UP] Replace legacy ReduceNumShufflePartitions with CoalesceShufflePartitions in comment 57bf23c is described below commit 57bf23c01b2cffe5011a9d15eb68eff5c28519f4 Author: yi.wu AuthorDate: Tue Mar 10 11:09:36 2020 -0700 [SPARK-31037][SQL][FOLLOW-UP] Replace legacy ReduceNumShufflePartitions with CoalesceShufflePartitions in comment ### What changes were proposed in this pull request? Replace legacy `ReduceNumShufflePartitions` with `CoalesceShufflePartitions` in comment. ### Why are the changes needed? Rule `ReduceNumShufflePartitions` has renamed to `CoalesceShufflePartitions`, we should update related comment as well. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? N/A. Closes #27865 from Ngone51/spark_31037_followup. Authored-by: yi.wu Signed-off-by: Dongjoon Hyun (cherry picked from commit 34be83e08b6f5313bdd9d165d3e203d06eff677b) Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 6 +++--- .../apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala| 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index fc88a7f..c1486aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -97,12 +97,12 @@ case class AdaptiveSparkPlanExec( @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, context.subqueryCache), // Here the 'OptimizeSkewedJoin' rule should be executed -// before 'ReduceNumShufflePartitions', as the skewed partition handled -// in 'OptimizeSkewedJoin' rule, should be omitted in 'ReduceNumShufflePartitions'. +// before 'CoalesceShufflePartitions', as the skewed partition handled +// in 'OptimizeSkewedJoin' rule, should be omitted in 'CoalesceShufflePartitions'. OptimizeSkewedJoin(conf), CoalesceShufflePartitions(conf), // The rule of 'OptimizeLocalShuffleReader' need to make use of the 'partitionStartIndices' -// in 'ReduceNumShufflePartitions' rule. So it must be after 'ReduceNumShufflePartitions' rule. +// in 'CoalesceShufflePartitions' rule. So it must be after 'CoalesceShufflePartitions' rule. OptimizeLocalShuffleReader(conf), ApplyColumnarRulesAndInsertTransitions(conf, context.session.sessionState.columnarRules), CollapseCodegenStages(conf) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index c3bcce4..4387409 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -52,7 +52,7 @@ import org.apache.spark.sql.internal.SQLConf * (L4-1, R4-1), (L4-2, R4-1), (L4-1, R4-2), (L4-2, R4-2) * * Note that, when this rule is enabled, it also coalesces non-skewed partitions like - * `ReduceNumShufflePartitions` does. + * `CoalesceShufflePartitions` does. */ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { @@ -191,7 +191,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec] // This is used to delay the creation of non-skew partitions so that we can potentially - // coalesce them like `ReduceNumShufflePartitions` does. + // coalesce them like `CoalesceShufflePartitions` does. val nonSkewPartitionIndices = mutable.ArrayBuffer.empty[Int] val leftSkewDesc = new SkewDesc val rightSkewDesc = new SkewDesc - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (3bd6ebf -> 34be83e)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 3bd6ebf [SPARK-30189][SQL] Interval from year-month/date-time string should handle whitespaces add 34be83e [SPARK-31037][SQL][FOLLOW-UP] Replace legacy ReduceNumShufflePartitions with CoalesceShufflePartitions in comment No new revisions were added by this update. Summary of changes: .../apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 6 +++--- .../apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala| 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (3bd6ebf -> 34be83e)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 3bd6ebf [SPARK-30189][SQL] Interval from year-month/date-time string should handle whitespaces add 34be83e [SPARK-31037][SQL][FOLLOW-UP] Replace legacy ReduceNumShufflePartitions with CoalesceShufflePartitions in comment No new revisions were added by this update. Summary of changes: .../apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala | 6 +++--- .../apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala| 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-30189][SQL] Interval from year-month/date-time string should handle whitespaces
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 558a82c [SPARK-30189][SQL] Interval from year-month/date-time string should handle whitespaces 558a82c is described below commit 558a82cbd4ee6d463ed5c4460b327340511aa6d4 Author: Kent Yao AuthorDate: Tue Mar 10 22:08:58 2020 +0800 [SPARK-30189][SQL] Interval from year-month/date-time string should handle whitespaces ### What changes were proposed in this pull request? Currently, we parse interval from multi units strings or from date-time/year-month pattern strings, the former handles all whitespace, the latter not or even spaces. ### Why are the changes needed? behavior consistency ### Does this PR introduce any user-facing change? yes, interval in date-time/year-month like ``` select interval '\n-\t10\t 12:34:46.789\t' day to second -- !query 126 schema struct -- !query 126 output -10 days -12 hours -34 minutes -46.789 seconds ``` is valid now. ### How was this patch tested? add ut. Closes #26815 from yaooqinn/SPARK-30189. Authored-by: Kent Yao Signed-off-by: Wenchen Fan (cherry picked from commit 3bd6ebff81a46c3bf3664c4be1714c3002d92e85) Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/util/IntervalUtils.scala| 5 +-- .../catalyst/parser/ExpressionParserSuite.scala| 5 ++- .../sql/catalyst/util/IntervalUtilsSuite.scala | 14 +++ .../test/resources/sql-tests/inputs/interval.sql | 4 ++ .../sql-tests/results/ansi/interval.sql.out| 47 +- .../resources/sql-tests/results/interval.sql.out | 47 +- 6 files changed, 115 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala index 0a13ec8..ccf8c5e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/IntervalUtils.scala @@ -136,8 +136,7 @@ object IntervalUtils { s"Error parsing interval year-month string: ${e.getMessage}", e) } } -assert(input.length == input.trim.length) -input match { +input.trim match { case yearMonthPattern("-", yearStr, monthStr) => negateExact(toInterval(yearStr, monthStr)) case yearMonthPattern(_, yearStr, monthStr) => @@ -300,7 +299,7 @@ object IntervalUtils { val regexp = dayTimePattern.get(from -> to) require(regexp.isDefined, s"Cannot support (interval '$input' $from to $to) expression") val pattern = regexp.get.pattern -val m = pattern.matcher(input) +val m = pattern.matcher(input.trim) require(m.matches, s"Interval string must match day-time format of '$pattern': $input, " + s"$fallbackNotice") var micros: Long = 0L diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala index e8beb61..74fd48d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ExpressionParserSuite.scala @@ -694,7 +694,7 @@ class ExpressionParserSuite extends AnalysisTest { intercept("interval 10 nanoseconds", "invalid unit 'nanoseconds'") // Year-Month intervals. -val yearMonthValues = Seq("123-10", "496-0", "-2-3", "-123-0") +val yearMonthValues = Seq("123-10", "496-0", "-2-3", "-123-0", "\t -1-2\t") yearMonthValues.foreach { value => val result = Literal(IntervalUtils.fromYearMonthString(value)) checkIntervals(s"'$value' year to month", result) @@ -707,7 +707,8 @@ class ExpressionParserSuite extends AnalysisTest { "10 9:8:7.123456789", "1 0:0:0", "-1 0:0:0", - "1 0:0:1") + "1 0:0:1", + "\t 1 0:0:1 ") datTimeValues.foreach { value => val result = Literal(IntervalUtils.fromDayTimeString(value)) checkIntervals(s"'$value' day to second", result) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala index 1628a61..3d9372c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/IntervalUtilsSuite.scala @@ -137,6 +137,15 @@ class IntervalUtilsSuite extends SparkFunSuite with SQLHelper {
[spark] branch master updated (294f605 -> 3bd6ebf)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 294f605 [SPARK-31078][SQL] Respect aliases in output ordering add 3bd6ebf [SPARK-30189][SQL] Interval from year-month/date-time string should handle whitespaces No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/util/IntervalUtils.scala| 5 +-- .../catalyst/parser/ExpressionParserSuite.scala| 5 ++- .../sql/catalyst/util/IntervalUtilsSuite.scala | 14 +++ .../test/resources/sql-tests/inputs/interval.sql | 4 ++ .../sql-tests/results/ansi/interval.sql.out| 47 +- .../resources/sql-tests/results/interval.sql.out | 47 +- 6 files changed, 115 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (15df2a3f -> 294f605)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 15df2a3f [SPARK-31079][SQL] Logging QueryExecutionMetering in RuleExecutor logger add 294f605 [SPARK-31078][SQL] Respect aliases in output ordering No new revisions were added by this update. Summary of changes: ...ning.scala => AliasAwareOutputExpression.scala} | 58 -- .../execution/aggregate/HashAggregateExec.scala| 4 +- .../execution/aggregate/SortAggregateExec.scala| 8 +-- .../sql/execution/basicPhysicalOperators.scala | 9 ++-- .../apache/spark/sql/execution/PlannerSuite.scala | 19 +++ .../spark/sql/sources/BucketedReadSuite.scala | 12 + 6 files changed, 87 insertions(+), 23 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/{AliasAwareOutputPartitioning.scala => AliasAwareOutputExpression.scala} (58%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31079][SQL] Logging QueryExecutionMetering in RuleExecutor logger
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new c238455 [SPARK-31079][SQL] Logging QueryExecutionMetering in RuleExecutor logger c238455 is described below commit c2384558086b0386a20aa8098cdf7a4c823a5f04 Author: Eric Wu <492960...@qq.com> AuthorDate: Tue Mar 10 19:08:59 2020 +0800 [SPARK-31079][SQL] Logging QueryExecutionMetering in RuleExecutor logger ### What changes were proposed in this pull request? RuleExecutor already support metering for analyzer/optimizer rules. By providing such information in `PlanChangeLogger`, user can get more information when debugging rule changes . This PR enhanced `PlanChangeLogger` to display RuleExecutor metrics. This can be easily done by calling the existing API `resetMetrics` and `dumpTimeSpent`, but there might be conflicts if user is also collecting total metrics of a sql job. Thus I introduced `QueryExecutionMetrics`, as the snapshot of `QueryExecutionMetering`, to better support this feature. Information added to `PlanChangeLogger` ``` === Metrics of Executed Rules === Total number of runs: 554 Total time: 0.107756568 seconds Total number of effective runs: 11 Total time of effective runs: 0.047615486 seconds ``` ### Why are the changes needed? Provide better plan change debugging user experience ### Does this PR introduce any user-facing change? Only add more debugging info of `planChangeLog`, default log level is TRACE. ### How was this patch tested? Update existing tests to verify the new logs Closes #27846 from Eric5553/ExplainRuleExecMetrics. Authored-by: Eric Wu <492960...@qq.com> Signed-off-by: Wenchen Fan (cherry picked from commit 15df2a3f40c74cd3950cc48c95c330217e3ef401) Signed-off-by: Wenchen Fan --- .../catalyst/rules/QueryExecutionMetering.scala| 27 ++ .../spark/sql/catalyst/rules/RuleExecutor.scala| 24 ++- .../catalyst/optimizer/OptimizerLoggingSuite.scala | 9 +++- 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala index 875c46d..8efc359 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala @@ -37,6 +37,10 @@ case class QueryExecutionMetering() { timeEffectiveRunsMap.clear() } + def getMetrics(): QueryExecutionMetrics = { +QueryExecutionMetrics(totalTime, totalNumRuns, totalNumEffectiveRuns, totalEffectiveTime) + } + def totalTime: Long = { timeMap.sum() } @@ -45,6 +49,14 @@ case class QueryExecutionMetering() { numRunsMap.sum() } + def totalNumEffectiveRuns: Long = { +numEffectiveRunsMap.sum() + } + + def totalEffectiveTime: Long = { +timeEffectiveRunsMap.sum() + } + def incExecutionTimeBy(ruleName: String, delta: Long): Unit = { timeMap.addAndGet(ruleName, delta) } @@ -95,3 +107,18 @@ case class QueryExecutionMetering() { """.stripMargin } } + +case class QueryExecutionMetrics( +time: Long, +numRuns: Long, +numEffectiveRuns: Long, +timeEffective: Long) { + + def -(metrics: QueryExecutionMetrics): QueryExecutionMetrics = { +QueryExecutionMetrics( + this.time - metrics.time, + this.numRuns - metrics.numRuns, + this.numEffectiveRuns - metrics.numEffectiveRuns, + this.timeEffective - metrics.timeEffective) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index da5242b..bff04d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -21,6 +21,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils @@ -37,6 +38,10 @@ object RuleExecutor { def resetMetrics(): Unit = { queryExecutionMeter.resetMetrics() } + + def getCurrentMetrics(): QueryExecutionMetrics = { +queryExecutionMeter.getMetrics() + } } abstract class
[spark] branch master updated (8ee41f3 -> 15df2a3f)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 8ee41f3 [SPARK-30992][DSTREAMS] Arrange scattered config of streaming module add 15df2a3f [SPARK-31079][SQL] Logging QueryExecutionMetering in RuleExecutor logger No new revisions were added by this update. Summary of changes: .../catalyst/rules/QueryExecutionMetering.scala| 27 ++ .../spark/sql/catalyst/rules/RuleExecutor.scala| 24 ++- .../catalyst/optimizer/OptimizerLoggingSuite.scala | 9 +++- 3 files changed, 58 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-30992][DSTREAMS] Arrange scattered config of streaming module
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 69dcea2 [SPARK-30992][DSTREAMS] Arrange scattered config of streaming module 69dcea2 is described below commit 69dcea284961668b28d702e90e9068a3b80cbc8a Author: beliefer AuthorDate: Tue Mar 10 18:04:09 2020 +0900 [SPARK-30992][DSTREAMS] Arrange scattered config of streaming module ### What changes were proposed in this pull request? I found a lot scattered config in `Streaming`.I think should arrange these config in unified position. ### Why are the changes needed? Arrange scattered config ### Does this PR introduce any user-facing change? No ### How was this patch tested? Exists UT Closes #27744 from beliefer/arrange-scattered-streaming-config. Authored-by: beliefer Signed-off-by: HyukjinKwon (cherry picked from commit 8ee41f3576689f3d164131d1e6041bd347394364) Signed-off-by: HyukjinKwon --- .../org/apache/spark/streaming/StreamingConf.scala | 161 + .../apache/spark/streaming/StreamingContext.scala | 3 +- .../apache/spark/streaming/dstream/DStream.scala | 3 +- .../spark/streaming/receiver/BlockGenerator.scala | 5 +- .../spark/streaming/receiver/RateLimiter.scala | 5 +- .../spark/streaming/scheduler/JobGenerator.scala | 6 +- .../spark/streaming/scheduler/JobScheduler.scala | 2 +- .../spark/streaming/scheduler/RateController.scala | 3 +- .../streaming/scheduler/rate/RateEstimator.scala | 11 +- .../ui/StreamingJobProgressListener.scala | 4 +- .../org/apache/spark/streaming/util/StateMap.scala | 4 +- .../spark/streaming/util/WriteAheadLogUtils.scala | 42 ++ .../streaming/ReceiverInputDStreamSuite.scala | 3 +- 13 files changed, 201 insertions(+), 51 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala new file mode 100644 index 000..71aefd6 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import java.util.concurrent.TimeUnit + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.streaming.util.OpenHashMapBasedStateMap.DELTA_CHAIN_LENGTH_THRESHOLD + +object StreamingConf { + + private[streaming] val BACKPRESSURE_ENABLED = +ConfigBuilder("spark.streaming.backpressure.enabled") + .booleanConf + .createWithDefault(false) + + private[streaming] val RECEIVER_MAX_RATE = +ConfigBuilder("spark.streaming.receiver.maxRate") + .longConf + .createWithDefault(Long.MaxValue) + + private[streaming] val BACKPRESSURE_INITIAL_RATE = +ConfigBuilder("spark.streaming.backpressure.initialRate") + .fallbackConf(RECEIVER_MAX_RATE) + + private[streaming] val BLOCK_INTERVAL = +ConfigBuilder("spark.streaming.blockInterval") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("200ms") + + private[streaming] val RECEIVER_WAL_ENABLE_CONF_KEY = +ConfigBuilder("spark.streaming.receiver.writeAheadLog.enable") + .booleanConf + .createWithDefault(false) + + private[streaming] val RECEIVER_WAL_CLASS_CONF_KEY = +ConfigBuilder("spark.streaming.receiver.writeAheadLog.class") + .stringConf + .createOptional + + private[streaming] val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY = +ConfigBuilder("spark.streaming.receiver.writeAheadLog.rollingIntervalSecs") + .intConf + .createWithDefault(60) + + private[streaming] val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = +ConfigBuilder("spark.streaming.receiver.writeAheadLog.maxFailures") + .intConf + .createWithDefault(3) + + private[streaming] val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY = +ConfigBuilder("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite") + .booleanConf +
[spark] branch master updated (815c792 -> 8ee41f3)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 815c792 [SPARK-31065][SQL] Match schema_of_json to the schema inference of JSON data source add 8ee41f3 [SPARK-30992][DSTREAMS] Arrange scattered config of streaming module No new revisions were added by this update. Summary of changes: .../org/apache/spark/streaming/StreamingConf.scala | 161 + .../apache/spark/streaming/StreamingContext.scala | 3 +- .../apache/spark/streaming/dstream/DStream.scala | 3 +- .../spark/streaming/receiver/BlockGenerator.scala | 5 +- .../spark/streaming/receiver/RateLimiter.scala | 5 +- .../spark/streaming/scheduler/JobGenerator.scala | 6 +- .../spark/streaming/scheduler/JobScheduler.scala | 2 +- .../spark/streaming/scheduler/RateController.scala | 3 +- .../streaming/scheduler/rate/RateEstimator.scala | 11 +- .../ui/StreamingJobProgressListener.scala | 4 +- .../org/apache/spark/streaming/util/StateMap.scala | 4 +- .../spark/streaming/util/WriteAheadLogUtils.scala | 42 ++ .../streaming/ReceiverInputDStreamSuite.scala | 3 +- 13 files changed, 201 insertions(+), 51 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/StreamingConf.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-31065][SQL] Match schema_of_json to the schema inference of JSON data source
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0985f13 [SPARK-31065][SQL] Match schema_of_json to the schema inference of JSON data source 0985f13 is described below commit 0985f13bc66a99319820d0d9ba5b3f2a254f61a4 Author: HyukjinKwon AuthorDate: Tue Mar 10 00:33:32 2020 -0700 [SPARK-31065][SQL] Match schema_of_json to the schema inference of JSON data source This PR proposes two things: 1. Convert `null` to `string` type during schema inference of `schema_of_json` as JSON datasource does. This is a bug fix as well because `null` string is not the proper DDL formatted string and it is unable for SQL parser to recognise it as a type string. We should match it to JSON datasource and return a string type so `schema_of_json` returns a proper DDL formatted string. 2. Let `schema_of_json` respect `dropFieldIfAllNull` option during schema inference. To let `schema_of_json` return a proper DDL formatted string, and respect `dropFieldIfAllNull` option. Yes, it does. ```scala import collection.JavaConverters._ import org.apache.spark.sql.functions._ spark.range(1).select(schema_of_json(lit("""{"id": ""}"""))).show() spark.range(1).select(schema_of_json(lit("""{"id": "a", "drop": {"drop": null}}"""), Map("dropFieldIfAllNull" -> "true").asJava)).show(false) ``` **Before:** ``` struct struct,id:string> ``` **After:** ``` struct struct ``` Manually tested, and unittests were added. Closes #27854 from HyukjinKwon/SPARK-31065. Authored-by: HyukjinKwon Signed-off-by: Dongjoon Hyun (cherry picked from commit 815c7929c290d6eed86dc5c924f9f7d48cff179d) Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/expressions/jsonExpressions.scala | 13 +++- .../spark/sql/catalyst/json/JsonInferSchema.scala | 13 .../org/apache/spark/sql/JsonFunctionsSuite.scala | 35 ++ 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 61afdb6..a63e541 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -773,7 +773,18 @@ case class SchemaOfJson( override def eval(v: InternalRow): Any = { val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser => parser.nextToken() - jsonInferSchema.inferField(parser) + // To match with schema inference from JSON datasource. + jsonInferSchema.inferField(parser) match { +case st: StructType => + jsonInferSchema.canonicalizeType(st, jsonOptions).getOrElse(StructType(Nil)) +case at: ArrayType if at.elementType.isInstanceOf[StructType] => + jsonInferSchema +.canonicalizeType(at.elementType, jsonOptions) +.map(ArrayType(_, containsNull = at.containsNull)) +.getOrElse(ArrayType(StructType(Nil), containsNull = at.containsNull)) +case other: DataType => + jsonInferSchema.canonicalizeType(other, jsonOptions).getOrElse(StringType) + } } UTF8String.fromString(dt.catalogString) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 82dd6d0..3dd8694 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -92,12 +92,10 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { } json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition, mergeResult) -canonicalizeType(rootType, options) match { - case Some(st: StructType) => st - case _ => -// canonicalizeType erases all empty structs, including the only one we want to keep -StructType(Nil) -} +canonicalizeType(rootType, options) + .find(_.isInstanceOf[StructType]) + // canonicalizeType erases all empty structs, including the only one we want to keep + .getOrElse(StructType(Nil)).asInstanceOf[StructType] } /** @@ -198,7 +196,8 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { * Recursively canonicalizes inferred types, e.g., removes StructTypes with no fields, * drops NullTypes or converts them to
[spark] branch master updated: [SPARK-31065][SQL] Match schema_of_json to the schema inference of JSON data source
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 815c792 [SPARK-31065][SQL] Match schema_of_json to the schema inference of JSON data source 815c792 is described below commit 815c7929c290d6eed86dc5c924f9f7d48cff179d Author: HyukjinKwon AuthorDate: Tue Mar 10 00:33:32 2020 -0700 [SPARK-31065][SQL] Match schema_of_json to the schema inference of JSON data source ### What changes were proposed in this pull request? This PR proposes two things: 1. Convert `null` to `string` type during schema inference of `schema_of_json` as JSON datasource does. This is a bug fix as well because `null` string is not the proper DDL formatted string and it is unable for SQL parser to recognise it as a type string. We should match it to JSON datasource and return a string type so `schema_of_json` returns a proper DDL formatted string. 2. Let `schema_of_json` respect `dropFieldIfAllNull` option during schema inference. ### Why are the changes needed? To let `schema_of_json` return a proper DDL formatted string, and respect `dropFieldIfAllNull` option. ### Does this PR introduce any user-facing change? Yes, it does. ```scala import collection.JavaConverters._ import org.apache.spark.sql.functions._ spark.range(1).select(schema_of_json(lit("""{"id": ""}"""))).show() spark.range(1).select(schema_of_json(lit("""{"id": "a", "drop": {"drop": null}}"""), Map("dropFieldIfAllNull" -> "true").asJava)).show(false) ``` **Before:** ``` struct struct,id:string> ``` **After:** ``` struct struct ``` ### How was this patch tested? Manually tested, and unittests were added. Closes #27854 from HyukjinKwon/SPARK-31065. Authored-by: HyukjinKwon Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/expressions/jsonExpressions.scala | 13 +++- .../spark/sql/catalyst/json/JsonInferSchema.scala | 13 .../org/apache/spark/sql/JsonFunctionsSuite.scala | 36 ++ 3 files changed, 54 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index aa4b464..4c2a511 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -777,7 +777,18 @@ case class SchemaOfJson( override def eval(v: InternalRow): Any = { val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, json)) { parser => parser.nextToken() - jsonInferSchema.inferField(parser) + // To match with schema inference from JSON datasource. + jsonInferSchema.inferField(parser) match { +case st: StructType => + jsonInferSchema.canonicalizeType(st, jsonOptions).getOrElse(StructType(Nil)) +case at: ArrayType if at.elementType.isInstanceOf[StructType] => + jsonInferSchema +.canonicalizeType(at.elementType, jsonOptions) +.map(ArrayType(_, containsNull = at.containsNull)) +.getOrElse(ArrayType(StructType(Nil), containsNull = at.containsNull)) +case other: DataType => + jsonInferSchema.canonicalizeType(other, jsonOptions).getOrElse(StringType) + } } UTF8String.fromString(dt.catalogString) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 82dd6d0..3dd8694 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -92,12 +92,10 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { } json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition, mergeResult) -canonicalizeType(rootType, options) match { - case Some(st: StructType) => st - case _ => -// canonicalizeType erases all empty structs, including the only one we want to keep -StructType(Nil) -} +canonicalizeType(rootType, options) + .find(_.isInstanceOf[StructType]) + // canonicalizeType erases all empty structs, including the only one we want to keep + .getOrElse(StructType(Nil)).asInstanceOf[StructType] } /** @@ -198,7 +196,8 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { * Recursively canonicalizes inferred types,
[spark] branch branch-3.0 updated: [SPARK-31096][SQL] Replace `Array` with `Seq` in AQE `CustomShuffleReaderExec`
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 3fb450c [SPARK-31096][SQL] Replace `Array` with `Seq` in AQE `CustomShuffleReaderExec` 3fb450c is described below commit 3fb450c0b7c4e415a29d51cd10e6be6ad8dff114 Author: maryannxue AuthorDate: Tue Mar 10 14:15:44 2020 +0800 [SPARK-31096][SQL] Replace `Array` with `Seq` in AQE `CustomShuffleReaderExec` ### What changes were proposed in this pull request? This PR changes the type of `CustomShuffleReaderExec`'s `partitionSpecs` from `Array` to `Seq`, since `Array` compares references not values for equality, which could lead to potential plan reuse problem. ### Why are the changes needed? Unlike `Seq`, `Array` compares references not values for equality, which could lead to potential plan reuse problem. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Passes existing UTs. Closes #27857 from maryannxue/aqe-customreader-fix. Authored-by: maryannxue Signed-off-by: Wenchen Fan --- .../adaptive/CustomShuffleReaderExec.scala | 4 +-- .../adaptive/OptimizeLocalShuffleReader.scala | 10 +++--- .../execution/adaptive/OptimizeSkewedJoin.scala| 12 +++- .../adaptive/ShufflePartitionsCoalescer.scala | 6 ++-- .../ShufflePartitionsCoalescerSuite.scala | 36 +++--- 5 files changed, 33 insertions(+), 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index be372bb..ba3f725 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExcha */ case class CustomShuffleReaderExec private( child: SparkPlan, -partitionSpecs: Array[ShufflePartitionSpec], +partitionSpecs: Seq[ShufflePartitionSpec], description: String) extends UnaryExecNode { override def output: Seq[Attribute] = child.output @@ -71,7 +71,7 @@ case class CustomShuffleReaderExec private( cachedShuffleRDD = child match { case stage: ShuffleQueryStageExec => new ShuffledRowRDD( -stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs) +stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray) case _ => throw new IllegalStateException("operating on canonicalization plan") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index e441763..fb6b40c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -77,21 +77,21 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { // partition start indices based on block size to avoid data skew. private def getPartitionSpecs( shuffleStage: ShuffleQueryStageExec, - advisoryParallelism: Option[Int]): Array[ShufflePartitionSpec] = { + advisoryParallelism: Option[Int]): Seq[ShufflePartitionSpec] = { val shuffleDep = shuffleStage.shuffle.shuffleDependency val numReducers = shuffleDep.partitioner.numPartitions val expectedParallelism = advisoryParallelism.getOrElse(numReducers) val numMappers = shuffleDep.rdd.getNumPartitions val splitPoints = if (numMappers == 0) { - Array.empty + Seq.empty } else { - equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers)).toArray + equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers)) } (0 until numMappers).flatMap { mapIndex => (splitPoints :+ numReducers).sliding(2).map { -case Array(start, end) => PartialMapperPartitionSpec(mapIndex, start, end) +case Seq(start, end) => PartialMapperPartitionSpec(mapIndex, start, end) } -}.toArray +} } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 2e8adcf..c3bcce4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++
[spark] branch master updated (a229943 -> de6d9e4)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a229943 [SPARK-30902][SQL][FOLLOW-UP] Allow ReplaceTableAsStatement to have none provider add de6d9e4 [SPARK-31096][SQL] Replace `Array` with `Seq` in AQE `CustomShuffleReaderExec` No new revisions were added by this update. Summary of changes: .../adaptive/CustomShuffleReaderExec.scala | 4 +-- .../adaptive/OptimizeLocalShuffleReader.scala | 10 +++--- .../execution/adaptive/OptimizeSkewedJoin.scala| 12 +++- .../adaptive/ShufflePartitionsCoalescer.scala | 6 ++-- .../ShufflePartitionsCoalescerSuite.scala | 36 +++--- 5 files changed, 33 insertions(+), 35 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org