[GitHub] spark pull request #20303: [SPARK-23128][SQL] A new approach to do adaptive ...
Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/20303#discussion_r164122162 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration + +import org.apache.spark.MapOutputStatistics +import org.apache.spark.broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.exchange._ +import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate +import org.apache.spark.util.ThreadUtils + +/** + * In adaptive execution mode, an execution plan is divided into multiple QueryStages. Each + * QueryStage is a sub-tree that runs in a single stage. + */ +abstract class QueryStage extends UnaryExecNode { + + var child: SparkPlan + + // Ignore this wrapper for canonicalizing. + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + /** + * Execute childStages and wait until all stages are completed. Use a thread pool to avoid + * blocking on one child stage. + */ + def executeChildStages(): Unit = { +// Handle broadcast stages +val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect { + case bqs: BroadcastQueryStageInput => bqs.childStage +} +val broadcastFutures = broadcastQueryStages.map { queryStage => + Future { queryStage.prepareBroadcast() }(QueryStage.executionContext) +} + +// Submit shuffle stages +val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) +val shuffleQueryStages: Seq[ShuffleQueryStage] = child.collect { + case sqs: ShuffleQueryStageInput => sqs.childStage +} +val shuffleStageFutures = shuffleQueryStages.map { queryStage => + Future { +SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) { + queryStage.execute() +} + }(QueryStage.executionContext) +} + +ThreadUtils.awaitResult( + Future.sequence(broadcastFutures)(implicitly, QueryStage.executionContext), Duration.Inf) +ThreadUtils.awaitResult( + Future.sequence(shuffleStageFutures)(implicitly, QueryStage.executionContext), Duration.Inf) + } + + /** + * Before executing the plan in this query stage, we execute all child stages, optimize the plan + * in this stage and determine the reducer number based on the child stages' statistics. Finally + * we do a codegen for this query stage and update the UI with the new plan. + */ + def prepareExecuteStage(): Unit = { +// 1. Execute childStages +executeChildStages() +// It is possible to optimize this stage's plan here based on the child stages' statistics. + +// 2. Determine reducer number +val queryStageInputs: Seq[ShuffleQueryStageInput] = child.collect { + case input: ShuffleQueryStageInput => input +} +val childMapOutputStatistics = queryStageInputs.map(_.childStage.mapOutputStatistics) + .filter(_ != null).toArray +if (childMapOutputStatistics.length > 0) { + val exchangeCoordinator = new ExchangeCoordinator( +conf.targetPostShuffleInputSize, +conf.minNumPostShufflePartitions) + + val partitionStartIndic
[GitHub] spark pull request #20303: [SPARK-23128][SQL] A new approach to do adaptive ...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/20303#discussion_r164089610 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration + +import org.apache.spark.MapOutputStatistics +import org.apache.spark.broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.exchange._ +import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate +import org.apache.spark.util.ThreadUtils + +/** + * In adaptive execution mode, an execution plan is divided into multiple QueryStages. Each + * QueryStage is a sub-tree that runs in a single stage. + */ +abstract class QueryStage extends UnaryExecNode { + + var child: SparkPlan + + // Ignore this wrapper for canonicalizing. + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + /** + * Execute childStages and wait until all stages are completed. Use a thread pool to avoid + * blocking on one child stage. + */ + def executeChildStages(): Unit = { +// Handle broadcast stages +val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect { + case bqs: BroadcastQueryStageInput => bqs.childStage +} +val broadcastFutures = broadcastQueryStages.map { queryStage => + Future { queryStage.prepareBroadcast() }(QueryStage.executionContext) +} + +// Submit shuffle stages +val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) +val shuffleQueryStages: Seq[ShuffleQueryStage] = child.collect { + case sqs: ShuffleQueryStageInput => sqs.childStage +} +val shuffleStageFutures = shuffleQueryStages.map { queryStage => + Future { +SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) { + queryStage.execute() +} + }(QueryStage.executionContext) +} + +ThreadUtils.awaitResult( + Future.sequence(broadcastFutures)(implicitly, QueryStage.executionContext), Duration.Inf) +ThreadUtils.awaitResult( + Future.sequence(shuffleStageFutures)(implicitly, QueryStage.executionContext), Duration.Inf) + } + + /** + * Before executing the plan in this query stage, we execute all child stages, optimize the plan + * in this stage and determine the reducer number based on the child stages' statistics. Finally + * we do a codegen for this query stage and update the UI with the new plan. + */ + def prepareExecuteStage(): Unit = { +// 1. Execute childStages +executeChildStages() +// It is possible to optimize this stage's plan here based on the child stages' statistics. + +// 2. Determine reducer number +val queryStageInputs: Seq[ShuffleQueryStageInput] = child.collect { + case input: ShuffleQueryStageInput => input +} +val childMapOutputStatistics = queryStageInputs.map(_.childStage.mapOutputStatistics) + .filter(_ != null).toArray +if (childMapOutputStatistics.length > 0) { + val exchangeCoordinator = new ExchangeCoordinator( +conf.targetPostShuffleInputSize, +conf.minNumPostShufflePartitions) + + val partitionStartInd
[GitHub] spark pull request #20303: [SPARK-23128][SQL] A new approach to do adaptive ...
Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/20303#discussion_r164070918 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala --- @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration + +import org.apache.spark.MapOutputStatistics +import org.apache.spark.broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.exchange._ +import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate +import org.apache.spark.util.ThreadUtils + +/** + * In adaptive execution mode, an execution plan is divided into multiple QueryStages. Each + * QueryStage is a sub-tree that runs in a single stage. + */ +abstract class QueryStage extends UnaryExecNode { + + var child: SparkPlan + + // Ignore this wrapper for canonicalizing. + override def doCanonicalize(): SparkPlan = child.canonicalized + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + /** + * Execute childStages and wait until all stages are completed. Use a thread pool to avoid + * blocking on one child stage. + */ + def executeChildStages(): Unit = { +// Handle broadcast stages +val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect { + case bqs: BroadcastQueryStageInput => bqs.childStage +} +val broadcastFutures = broadcastQueryStages.map { queryStage => + Future { queryStage.prepareBroadcast() }(QueryStage.executionContext) +} + +// Submit shuffle stages +val executionId = sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) +val shuffleQueryStages: Seq[ShuffleQueryStage] = child.collect { + case sqs: ShuffleQueryStageInput => sqs.childStage +} +val shuffleStageFutures = shuffleQueryStages.map { queryStage => + Future { +SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) { + queryStage.execute() +} + }(QueryStage.executionContext) +} + +ThreadUtils.awaitResult( + Future.sequence(broadcastFutures)(implicitly, QueryStage.executionContext), Duration.Inf) +ThreadUtils.awaitResult( + Future.sequence(shuffleStageFutures)(implicitly, QueryStage.executionContext), Duration.Inf) + } + + /** + * Before executing the plan in this query stage, we execute all child stages, optimize the plan + * in this stage and determine the reducer number based on the child stages' statistics. Finally + * we do a codegen for this query stage and update the UI with the new plan. + */ + def prepareExecuteStage(): Unit = { +// 1. Execute childStages +executeChildStages() +// It is possible to optimize this stage's plan here based on the child stages' statistics. + +// 2. Determine reducer number +val queryStageInputs: Seq[ShuffleQueryStageInput] = child.collect { + case input: ShuffleQueryStageInput => input +} +val childMapOutputStatistics = queryStageInputs.map(_.childStage.mapOutputStatistics) + .filter(_ != null).toArray +if (childMapOutputStatistics.length > 0) { + val exchangeCoordinator = new ExchangeCoordinator( +conf.targetPostShuffleInputSize, +conf.minNumPostShufflePartitions) + + val partitionStartInd
[GitHub] spark pull request #20303: [SPARK-23128][SQL] A new approach to do adaptive ...
GitHub user carsonwang opened a pull request: https://github.com/apache/spark/pull/20303 [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL ## What changes were proposed in this pull request? This is the co-work with @yucai , @gczsjdy , @chenghao-intel , @xuanyuanking We'd like to introduce a new approach to do adaptive execution in Spark SQL. The idea is described at https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing ## How was this patch tested? Updated ExchangeCoordinatorSuite. We also tested this with all queries in TPC-DS. You can merge this pull request into a Git repository by running: $ git pull https://github.com/carsonwang/spark AE_1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20303.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20303 commit 7e9e57859ae35f70b57ece2fd907464392d88c0f Author: Carson Wang Date: 2017-11-22T08:49:01Z Introduce QueryStage for adapative execution commit 2aa185ddec4616b60e7596153c3dd62188e4d145 Author: Carson Wang Date: 2017-12-12T02:19:53Z Add QueryPlan for ExecutedCommandExec commit 7c3d182f6b6c21f437fbef1ea5ebb2843934dbc6 Author: Carson Wang Date: 2017-12-13T05:31:52Z Add config spark.sql.adaptive.maxNumPostShufflePartitions commit e9b3075c7799b8de6038cc6492cd9e54c12f108e Author: Carson Wang Date: 2017-12-13T07:40:08Z update commetns commit 9666c5fa5b42ebabc4e2b099a1f545945e959e6f Author: Carson Wang Date: 2017-12-18T08:38:32Z Fix style commit 9b29a3c5eb20b76c1ef7f897c07ccdd3a98a27b4 Author: Carson Wang Date: 2017-12-26T02:22:45Z fix bug commit e0b98fbed96c1c07ebf2a1a6846576f705cf2c24 Author: Carson Wang Date: 2018-01-17T05:59:51Z update doc and style --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org