[GitHub] spark pull request #20303: [SPARK-23128][SQL] A new approach to do adaptive ...

2018-01-26 Thread yucai
Github user yucai commented on a diff in the pull request:

https://github.com/apache/spark/pull/20303#discussion_r164122162
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.MapOutputStatistics
+import org.apache.spark.broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * In adaptive execution mode, an execution plan is divided into multiple 
QueryStages. Each
+ * QueryStage is a sub-tree that runs in a single stage.
+ */
+abstract class QueryStage extends UnaryExecNode {
+
+  var child: SparkPlan
+
+  // Ignore this wrapper for canonicalizing.
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Execute childStages and wait until all stages are completed. Use a 
thread pool to avoid
+   * blocking on one child stage.
+   */
+  def executeChildStages(): Unit = {
+// Handle broadcast stages
+val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect {
+  case bqs: BroadcastQueryStageInput => bqs.childStage
+}
+val broadcastFutures = broadcastQueryStages.map { queryStage =>
+  Future { queryStage.prepareBroadcast() }(QueryStage.executionContext)
+}
+
+// Submit shuffle stages
+val executionId = 
sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+val shuffleQueryStages: Seq[ShuffleQueryStage] = child.collect {
+  case sqs: ShuffleQueryStageInput => sqs.childStage
+}
+val shuffleStageFutures = shuffleQueryStages.map { queryStage =>
+  Future {
+SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) 
{
+  queryStage.execute()
+}
+  }(QueryStage.executionContext)
+}
+
+ThreadUtils.awaitResult(
+  Future.sequence(broadcastFutures)(implicitly, 
QueryStage.executionContext), Duration.Inf)
+ThreadUtils.awaitResult(
+  Future.sequence(shuffleStageFutures)(implicitly, 
QueryStage.executionContext), Duration.Inf)
+  }
+
+  /**
+   * Before executing the plan in this query stage, we execute all child 
stages, optimize the plan
+   * in this stage and determine the reducer number based on the child 
stages' statistics. Finally
+   * we do a codegen for this query stage and update the UI with the new 
plan.
+   */
+  def prepareExecuteStage(): Unit = {
+// 1. Execute childStages
+executeChildStages()
+// It is possible to optimize this stage's plan here based on the 
child stages' statistics.
+
+// 2. Determine reducer number
+val queryStageInputs: Seq[ShuffleQueryStageInput] = child.collect {
+  case input: ShuffleQueryStageInput => input
+}
+val childMapOutputStatistics = 
queryStageInputs.map(_.childStage.mapOutputStatistics)
+  .filter(_ != null).toArray
+if (childMapOutputStatistics.length > 0) {
+  val exchangeCoordinator = new ExchangeCoordinator(
+conf.targetPostShuffleInputSize,
+conf.minNumPostShufflePartitions)
+
+  val partitionStartIndic

[GitHub] spark pull request #20303: [SPARK-23128][SQL] A new approach to do adaptive ...

2018-01-26 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20303#discussion_r164089610
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.MapOutputStatistics
+import org.apache.spark.broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * In adaptive execution mode, an execution plan is divided into multiple 
QueryStages. Each
+ * QueryStage is a sub-tree that runs in a single stage.
+ */
+abstract class QueryStage extends UnaryExecNode {
+
+  var child: SparkPlan
+
+  // Ignore this wrapper for canonicalizing.
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Execute childStages and wait until all stages are completed. Use a 
thread pool to avoid
+   * blocking on one child stage.
+   */
+  def executeChildStages(): Unit = {
+// Handle broadcast stages
+val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect {
+  case bqs: BroadcastQueryStageInput => bqs.childStage
+}
+val broadcastFutures = broadcastQueryStages.map { queryStage =>
+  Future { queryStage.prepareBroadcast() }(QueryStage.executionContext)
+}
+
+// Submit shuffle stages
+val executionId = 
sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+val shuffleQueryStages: Seq[ShuffleQueryStage] = child.collect {
+  case sqs: ShuffleQueryStageInput => sqs.childStage
+}
+val shuffleStageFutures = shuffleQueryStages.map { queryStage =>
+  Future {
+SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) 
{
+  queryStage.execute()
+}
+  }(QueryStage.executionContext)
+}
+
+ThreadUtils.awaitResult(
+  Future.sequence(broadcastFutures)(implicitly, 
QueryStage.executionContext), Duration.Inf)
+ThreadUtils.awaitResult(
+  Future.sequence(shuffleStageFutures)(implicitly, 
QueryStage.executionContext), Duration.Inf)
+  }
+
+  /**
+   * Before executing the plan in this query stage, we execute all child 
stages, optimize the plan
+   * in this stage and determine the reducer number based on the child 
stages' statistics. Finally
+   * we do a codegen for this query stage and update the UI with the new 
plan.
+   */
+  def prepareExecuteStage(): Unit = {
+// 1. Execute childStages
+executeChildStages()
+// It is possible to optimize this stage's plan here based on the 
child stages' statistics.
+
+// 2. Determine reducer number
+val queryStageInputs: Seq[ShuffleQueryStageInput] = child.collect {
+  case input: ShuffleQueryStageInput => input
+}
+val childMapOutputStatistics = 
queryStageInputs.map(_.childStage.mapOutputStatistics)
+  .filter(_ != null).toArray
+if (childMapOutputStatistics.length > 0) {
+  val exchangeCoordinator = new ExchangeCoordinator(
+conf.targetPostShuffleInputSize,
+conf.minNumPostShufflePartitions)
+
+  val partitionStartInd

[GitHub] spark pull request #20303: [SPARK-23128][SQL] A new approach to do adaptive ...

2018-01-26 Thread wangyum
Github user wangyum commented on a diff in the pull request:

https://github.com/apache/spark/pull/20303#discussion_r164070918
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStage.scala
 ---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.MapOutputStatistics
+import org.apache.spark.broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * In adaptive execution mode, an execution plan is divided into multiple 
QueryStages. Each
+ * QueryStage is a sub-tree that runs in a single stage.
+ */
+abstract class QueryStage extends UnaryExecNode {
+
+  var child: SparkPlan
+
+  // Ignore this wrapper for canonicalizing.
+  override def doCanonicalize(): SparkPlan = child.canonicalized
+
+  override def output: Seq[Attribute] = child.output
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+  /**
+   * Execute childStages and wait until all stages are completed. Use a 
thread pool to avoid
+   * blocking on one child stage.
+   */
+  def executeChildStages(): Unit = {
+// Handle broadcast stages
+val broadcastQueryStages: Seq[BroadcastQueryStage] = child.collect {
+  case bqs: BroadcastQueryStageInput => bqs.childStage
+}
+val broadcastFutures = broadcastQueryStages.map { queryStage =>
+  Future { queryStage.prepareBroadcast() }(QueryStage.executionContext)
+}
+
+// Submit shuffle stages
+val executionId = 
sqlContext.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+val shuffleQueryStages: Seq[ShuffleQueryStage] = child.collect {
+  case sqs: ShuffleQueryStageInput => sqs.childStage
+}
+val shuffleStageFutures = shuffleQueryStages.map { queryStage =>
+  Future {
+SQLExecution.withExecutionId(sqlContext.sparkContext, executionId) 
{
+  queryStage.execute()
+}
+  }(QueryStage.executionContext)
+}
+
+ThreadUtils.awaitResult(
+  Future.sequence(broadcastFutures)(implicitly, 
QueryStage.executionContext), Duration.Inf)
+ThreadUtils.awaitResult(
+  Future.sequence(shuffleStageFutures)(implicitly, 
QueryStage.executionContext), Duration.Inf)
+  }
+
+  /**
+   * Before executing the plan in this query stage, we execute all child 
stages, optimize the plan
+   * in this stage and determine the reducer number based on the child 
stages' statistics. Finally
+   * we do a codegen for this query stage and update the UI with the new 
plan.
+   */
+  def prepareExecuteStage(): Unit = {
+// 1. Execute childStages
+executeChildStages()
+// It is possible to optimize this stage's plan here based on the 
child stages' statistics.
+
+// 2. Determine reducer number
+val queryStageInputs: Seq[ShuffleQueryStageInput] = child.collect {
+  case input: ShuffleQueryStageInput => input
+}
+val childMapOutputStatistics = 
queryStageInputs.map(_.childStage.mapOutputStatistics)
+  .filter(_ != null).toArray
+if (childMapOutputStatistics.length > 0) {
+  val exchangeCoordinator = new ExchangeCoordinator(
+conf.targetPostShuffleInputSize,
+conf.minNumPostShufflePartitions)
+
+  val partitionStartInd

[GitHub] spark pull request #20303: [SPARK-23128][SQL] A new approach to do adaptive ...

2018-01-17 Thread carsonwang
GitHub user carsonwang opened a pull request:

https://github.com/apache/spark/pull/20303

[SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

## What changes were proposed in this pull request?
This is the co-work with @yucai , @gczsjdy , @chenghao-intel , 
@xuanyuanking 

We'd like to introduce a new approach to do adaptive execution in Spark 
SQL. The idea is described at 
https://docs.google.com/document/d/1mpVjvQZRAkD-Ggy6-hcjXtBPiQoVbZGe3dLnAKgtJ4k/edit?usp=sharing

## How was this patch tested?
Updated ExchangeCoordinatorSuite.
We also tested this with all queries in TPC-DS. 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/carsonwang/spark AE_1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20303.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20303


commit 7e9e57859ae35f70b57ece2fd907464392d88c0f
Author: Carson Wang 
Date:   2017-11-22T08:49:01Z

Introduce QueryStage for adapative execution

commit 2aa185ddec4616b60e7596153c3dd62188e4d145
Author: Carson Wang 
Date:   2017-12-12T02:19:53Z

Add QueryPlan for ExecutedCommandExec

commit 7c3d182f6b6c21f437fbef1ea5ebb2843934dbc6
Author: Carson Wang 
Date:   2017-12-13T05:31:52Z

Add config spark.sql.adaptive.maxNumPostShufflePartitions

commit e9b3075c7799b8de6038cc6492cd9e54c12f108e
Author: Carson Wang 
Date:   2017-12-13T07:40:08Z

update commetns

commit 9666c5fa5b42ebabc4e2b099a1f545945e959e6f
Author: Carson Wang 
Date:   2017-12-18T08:38:32Z

Fix style

commit 9b29a3c5eb20b76c1ef7f897c07ccdd3a98a27b4
Author: Carson Wang 
Date:   2017-12-26T02:22:45Z

fix bug

commit e0b98fbed96c1c07ebf2a1a6846576f705cf2c24
Author: Carson Wang 
Date:   2018-01-17T05:59:51Z

update doc and style




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org