[spark] branch master updated (0bfcf9c -> c4257b1)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0bfcf9c [SPARK-28322][SQL] Add support to Decimal type for integral divide add c4257b1 [SPARK-28541][WEBUI] Document Storage page No new revisions were added by this update. Summary of changes: docs/img/webui-storage-detail.png | Bin 0 -> 190970 bytes docs/img/webui-storage-tab.png| Bin 0 -> 78441 bytes docs/web-ui.md| 48 ++ 3 files changed, 48 insertions(+) create mode 100644 docs/img/webui-storage-detail.png create mode 100644 docs/img/webui-storage-tab.png - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a50959a -> 0bfcf9c)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a50959a [SPARK-27937][CORE] Revert partial logic for auto namespace discovery add 0bfcf9c [SPARK-28322][SQL] Add support to Decimal type for integral divide No new revisions were added by this update. Summary of changes: .../sql/catalyst/analysis/DecimalPrecision.scala | 17 ++ .../sql/catalyst/expressions/arithmetic.scala | 51 +++- .../scala/org/apache/spark/sql/types/Decimal.scala | 6 +- .../expressions/ArithmeticExpressionSuite.scala| 31 +++ .../resources/sql-tests/inputs/operator-div.sql| 9 +- .../resources/sql-tests/inputs/pgSQL/numeric.sql | 10 +- .../sql-tests/results/operator-div.sql.out | 90 ++- .../sql-tests/results/pgSQL/numeric.sql.out| 274 - 8 files changed, 334 insertions(+), 154 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (39c1127 -> a50959a)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 39c1127 [SPARK-28753][SQL] Dynamically reuse subqueries in AQE add a50959a [SPARK-27937][CORE] Revert partial logic for auto namespace discovery No new revisions were added by this update. Summary of changes: .../security/HadoopFSDelegationTokenProvider.scala | 30 ++ .../HadoopFSDelegationTokenProviderSuite.scala | 66 -- docs/running-on-yarn.md| 11 ++-- 3 files changed, 9 insertions(+), 98 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28753][SQL] Dynamically reuse subqueries in AQE
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 39c1127 [SPARK-28753][SQL] Dynamically reuse subqueries in AQE 39c1127 is described below commit 39c11273e0dfa51042d5aa9696263ab67d658d15 Author: maryannxue AuthorDate: Tue Aug 20 19:58:29 2019 +0800 [SPARK-28753][SQL] Dynamically reuse subqueries in AQE ### What changes were proposed in this pull request? This PR changes subquery reuse in Adaptive Query Execution from compile-time static reuse to execution-time dynamic reuse. This PR adds a `ReuseAdaptiveSubquery` rule that applies to a query stage after it is created and before it is executed. The new dynamic reuse enables subqueries to be reused across all different subquery levels. ### Why are the changes needed? This is an improvement to the current subquery reuse in Adaptive Query Execution, which allows subquery reuse to happen in a lazy fashion as well as at different subquery levels. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Passed existing tests. Closes #25471 from maryannxue/aqe-dynamic-sub-reuse. Authored-by: maryannxue Signed-off-by: Wenchen Fan --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 37 +++--- .../adaptive/InsertAdaptiveSparkPlan.scala | 57 +- .../adaptive/PlanAdaptiveSubqueries.scala | 2 +- ...ubqueries.scala => ReuseAdaptiveSubquery.scala} | 27 ++ 4 files changed, 62 insertions(+), 61 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index e7bbbd7..4f13568 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -60,7 +60,8 @@ import org.apache.spark.util.ThreadUtils case class AdaptiveSparkPlanExec( initialPlan: SparkPlan, @transient session: SparkSession, -@transient subqueryMap: Map[Long, ExecSubqueryExpression], +@transient preprocessingRules: Seq[Rule[SparkPlan]], +@transient subqueryCache: TrieMap[SparkPlan, BaseSubqueryExec], @transient stageCache: TrieMap[SparkPlan, QueryStageExec], @transient queryExecution: QueryExecution) extends LeafExecNode { @@ -73,24 +74,27 @@ case class AdaptiveSparkPlanExec( override protected def batches: Seq[Batch] = Seq() } + @transient private val ensureRequirements = EnsureRequirements(conf) + // A list of physical plan rules to be applied before creation of query stages. The physical // plan should reach a final status of query stages (i.e., no more addition or removal of // Exchange nodes) after running these rules. - @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( -PlanAdaptiveSubqueries(subqueryMap), -EnsureRequirements(conf) + private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( +ensureRequirements ) // A list of physical optimizer rules to be applied to a new stage before its execution. These // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( +ReuseAdaptiveSubquery(conf, subqueryCache), ReduceNumShufflePartitions(conf), ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf, session.sessionState.columnarRules), CollapseCodegenStages(conf) ) - @volatile private var currentPhysicalPlan = initialPlan + @volatile private var currentPhysicalPlan = +applyPhysicalRules(initialPlan, queryStagePreparationRules) private var isFinalPlan = false @@ -205,6 +209,16 @@ case class AdaptiveSparkPlanExec( depth + 1, lastChildren :+ true, append, verbose, "", addSuffix = false, maxFields) } + override def hashCode(): Int = initialPlan.hashCode() + + override def equals(obj: Any): Boolean = { +if (!obj.isInstanceOf[AdaptiveSparkPlanExec]) { + return false +} + +this.initialPlan == obj.asInstanceOf[AdaptiveSparkPlanExec].initialPlan + } + /** * This method is called recursively to traverse the plan tree bottom-up and create a new query * stage or try reusing an existing stage if the current node is an [[Exchange]] node and all of @@ -356,7 +370,7 @@ case class AdaptiveSparkPlanExec( val optimized = optimizer.execute(logicalPlan) SparkSession.setActiveSession(session) val sparkPlan = session.sessionState.planner.plan(ReturnAnswer(optimized)).next() -val newPlan = applyPhysicalRules(sparkPlan, queryStagePreparationRules) +val
[spark] branch master updated (b37c8d5 -> d045221)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b37c8d5 [SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter add d045221 [SPARK-28635][SQL] create CatalogManager to track registered v2 catalogs No new revisions were added by this update. Summary of changes: .../spark/sql/catalog/v2/CatalogManager.scala | 100 .../spark/sql/catalog/v2/LookupCatalog.scala | 33 +--- .../spark/sql/catalyst/analysis/Analyzer.scala | 7 +- .../catalyst/catalog/v2/LookupCatalogSuite.scala | 33 ++-- .../scala/org/apache/spark/sql/SparkSession.scala | 6 - .../datasources/DataSourceResolution.scala | 13 +- .../sql/internal/BaseSessionStateBuilder.scala | 4 +- .../apache/spark/sql/internal/SessionState.scala | 3 + .../execution/command/PlanResolutionSuite.scala| 66 +--- .../DataSourceV2DataFrameSessionCatalogSuite.scala | 8 +- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 172 ++--- .../spark/sql/hive/HiveSessionStateBuilder.scala | 4 +- 12 files changed, 273 insertions(+), 176 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogManager.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new aff5e2b [SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter aff5e2b is described below commit aff5e2bdca501fc24fb7d56f966d933c96a37b5b Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Tue Aug 20 00:56:53 2019 -0700 [SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter # What changes were proposed in this pull request? This patch modifies the explanation of guarantee for ForeachWriter as it doesn't guarantee same output for `(partitionId, epochId)`. Refer the description of [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details. Spark itself still guarantees same output for same epochId (batch) if the preconditions are met, 1) source is always providing the same input records for same offset request. 2) the query is idempotent in overall (indeterministic calculation like now(), random() can break this). Assuming breaking preconditions as an exceptional case (the preconditions are implicitly required even before), we still can describe the guarantee with `epochId`, though it will be harder to leverage the guarantee: 1) ForeachWriter should implement a feature to track whether all the partitions are written successfully for given `epochId` 2) There's pretty less chance to leverage the fact, as the chance for Spark to successfully write all partitions and fail to checkpoint the batch i [...] Credit to zsxwing on discovering the broken guarantee. ## How was this patch tested? This is just a documentation change, both on javadoc and guide doc. Closes #25407 from HeartSaVioR/SPARK-28650. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Shixiong Zhu (cherry picked from commit b37c8d5cea2e31e7821d848e42277f8fb7b68f30) Signed-off-by: Shixiong Zhu --- docs/structured-streaming-programming-guide.md | 14 ++ .../main/scala/org/apache/spark/sql/ForeachWriter.scala| 13 + 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 2c3348a..fa5664d 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1827,7 +1827,7 @@ Here are the details of all the sinks in Spark. Foreach Sink Append, Update, Complete None -Depends on ForeachWriter implementation +Yes (at-least-once) More details in the next section @@ -2235,13 +2235,11 @@ When the streaming query is started, Spark calls the function or the object’s - The close() method (if it exists) is called if an open() method exists and returns successfully (irrespective of the return value), except if the JVM or Python process crashes in the middle. -- **Note:** The partitionId and epochId in the open() method can be used to deduplicate generated data - when failures cause reprocessing of some input data. This depends on the execution mode of the query. - If the streaming query is being executed in the micro-batch mode, then every partition represented - by a unique tuple (partition_id, epoch_id) is guaranteed to have the same data. - Hence, (partition_id, epoch_id) can be used to deduplicate and/or transactionally commit - data and achieve exactly-once guarantees. However, if the streaming query is being executed - in the continuous mode, then this guarantee does not hold and therefore should not be used for deduplication. +- **Note:** Spark does not guarantee same output for (partitionId, epochId), so deduplication + cannot be achieved with (partitionId, epochId). e.g. source provides different number of + partitions for some reasons, Spark optimization changes number of partitions, etc. + See [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details. + If you need deduplication on output, try out `foreachBatch` instead. Triggers The trigger settings of a streaming query defines the timing of streaming data processing, whether diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala index 52b8c83..5cf294e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala @@ -50,14 +50,11 @@ import org.apache.spark.annotation.InterfaceStability * * Important points to note: * - * The `partitionId` and `epochId` can be used to deduplicate generated data when failures - * cause reprocessing of some input data. This depends on the execution mode
[spark] branch master updated: [SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b37c8d5 [SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter b37c8d5 is described below commit b37c8d5cea2e31e7821d848e42277f8fb7b68f30 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Tue Aug 20 00:56:53 2019 -0700 [SPARK-28650][SS][DOC] Correct explanation of guarantee for ForeachWriter # What changes were proposed in this pull request? This patch modifies the explanation of guarantee for ForeachWriter as it doesn't guarantee same output for `(partitionId, epochId)`. Refer the description of [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details. Spark itself still guarantees same output for same epochId (batch) if the preconditions are met, 1) source is always providing the same input records for same offset request. 2) the query is idempotent in overall (indeterministic calculation like now(), random() can break this). Assuming breaking preconditions as an exceptional case (the preconditions are implicitly required even before), we still can describe the guarantee with `epochId`, though it will be harder to leverage the guarantee: 1) ForeachWriter should implement a feature to track whether all the partitions are written successfully for given `epochId` 2) There's pretty less chance to leverage the fact, as the chance for Spark to successfully write all partitions and fail to checkpoint the batch i [...] Credit to zsxwing on discovering the broken guarantee. ## How was this patch tested? This is just a documentation change, both on javadoc and guide doc. Closes #25407 from HeartSaVioR/SPARK-28650. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Shixiong Zhu --- docs/structured-streaming-programming-guide.md | 14 ++ .../main/scala/org/apache/spark/sql/ForeachWriter.scala| 13 + 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index e07a0e5..b0d3e16 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1843,7 +1843,7 @@ Here are the details of all the sinks in Spark. Foreach Sink Append, Update, Complete None -Depends on ForeachWriter implementation +Yes (at-least-once) More details in the next section @@ -2251,13 +2251,11 @@ When the streaming query is started, Spark calls the function or the object’s - The close() method (if it exists) is called if an open() method exists and returns successfully (irrespective of the return value), except if the JVM or Python process crashes in the middle. -- **Note:** The partitionId and epochId in the open() method can be used to deduplicate generated data - when failures cause reprocessing of some input data. This depends on the execution mode of the query. - If the streaming query is being executed in the micro-batch mode, then every partition represented - by a unique tuple (partition_id, epoch_id) is guaranteed to have the same data. - Hence, (partition_id, epoch_id) can be used to deduplicate and/or transactionally commit - data and achieve exactly-once guarantees. However, if the streaming query is being executed - in the continuous mode, then this guarantee does not hold and therefore should not be used for deduplication. +- **Note:** Spark does not guarantee same output for (partitionId, epochId), so deduplication + cannot be achieved with (partitionId, epochId). e.g. source provides different number of + partitions for some reasons, Spark optimization changes number of partitions, etc. + See [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details. + If you need deduplication on output, try out `foreachBatch` instead. Triggers The trigger settings of a streaming query define the timing of streaming data processing, whether diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala index 5c0fe79..a0b0a34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ForeachWriter.scala @@ -50,14 +50,11 @@ import org.apache.spark.annotation.Evolving * * Important points to note: * - * The `partitionId` and `epochId` can be used to deduplicate generated data when failures - * cause reprocessing of some input data. This depends on the execution mode of the query. If - * the streaming query is being executed in the micro-batch mode, then every partition - *
[spark] branch master updated (bc75ed6 -> 79464be)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from bc75ed6 [SPARK-28483][CORE] Fix canceling a spark job using barrier mode but barrier tasks blocking on BarrierTaskContext.barrier() add 79464be [SPARK-28662][SQL] Create Hive Partitioned Table DDL should fail when partition column type missed No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 10 +- .../org/apache/spark/sql/hive/execution/HiveDDLSuite.scala | 8 2 files changed, 17 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0d3a783 -> bc75ed6)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0d3a783 [SPARK-28699][CORE] Fix a corner case for aborting indeterminate stage add bc75ed6 [SPARK-28483][CORE] Fix canceling a spark job using barrier mode but barrier tasks blocking on BarrierTaskContext.barrier() No new revisions were added by this update. Summary of changes: .../org/apache/spark/BarrierTaskContext.scala | 21 - .../org/apache/spark/rpc/RpcEndpointRef.scala | 29 .../org/apache/spark/rpc/netty/NettyRpcEnv.scala | 24 -- .../scala/org/apache/spark/rpc/netty/Outbox.scala | 12 - .../scala/org/apache/spark/util/ThreadUtils.scala | 7 ++- .../scala/org/apache/spark/rpc/RpcEnvSuite.scala | 42 ++ .../spark/scheduler/BarrierTaskContextSuite.scala | 51 ++ 7 files changed, 178 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org