[spark] branch master updated: [SPARK-31475][SQL] Broadcast stage in AQE did not timeout
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 44d370d [SPARK-31475][SQL] Broadcast stage in AQE did not timeout 44d370d is described below commit 44d370dd4501f0a4abb7194f7cff0d346aac0992 Author: Maryann Xue AuthorDate: Mon Apr 20 11:55:48 2020 -0700 [SPARK-31475][SQL] Broadcast stage in AQE did not timeout ### What changes were proposed in this pull request? This PR adds a timeout for the Future of a BroadcastQueryStageExec to make sure it can have the same timeout behavior as a non-AQE broadcast exchange. ### Why are the changes needed? This is to make the broadcast timeout behavior in AQE consistent with that in non-AQE. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UT. Closes #28250 from maryannxue/aqe-broadcast-timeout. Authored-by: Maryann Xue Signed-off-by: gatorsmile --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../sql/execution/adaptive/QueryStageExec.scala| 35 ++ .../execution/exchange/BroadcastExchangeExec.scala | 8 ++--- .../sql/execution/joins/BroadcastJoinSuite.scala | 23 -- 4 files changed, 56 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 3ac4ea5..f819937 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -547,7 +547,7 @@ case class AdaptiveSparkPlanExec( } object AdaptiveSparkPlanExec { - private val executionContext = ExecutionContext.fromExecutorService( + private[adaptive] val executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("QueryStageCreator", 16)) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index beaa972..f414f85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution.adaptive -import scala.concurrent.Future +import java.util.concurrent.TimeUnit -import org.apache.spark.{FutureAction, MapOutputStatistics} +import scala.concurrent.{Future, Promise} + +import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -28,6 +30,8 @@ import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.ThreadUtils /** * A query stage is an independent subgraph of the query plan. Query stage materializes its output @@ -100,8 +104,8 @@ abstract class QueryStageExec extends LeafExecNode { override def executeTail(n: Int): Array[InternalRow] = plan.executeTail(n) override def executeToIterator(): Iterator[InternalRow] = plan.executeToIterator() - override def doPrepare(): Unit = plan.prepare() - override def doExecute(): RDD[InternalRow] = plan.execute() + protected override def doPrepare(): Unit = plan.prepare() + protected override def doExecute(): RDD[InternalRow] = plan.execute() override def doExecuteBroadcast[T](): Broadcast[T] = plan.executeBroadcast() override def doCanonicalize(): SparkPlan = plan.canonicalized @@ -187,8 +191,24 @@ case class BroadcastQueryStageExec( throw new IllegalStateException("wrong plan for broadcast stage:\n " + plan.treeString) } + @transient private lazy val materializeWithTimeout = { +val broadcastFuture = broadcast.completionFuture +val timeout = SQLConf.get.broadcastTimeout +val promise = Promise[Any]() +val fail = BroadcastQueryStageExec.scheduledExecutor.schedule(new Runnable() { + override def run(): Unit = { +promise.tryFailure(new SparkException(s"Could not execute broadcast in $timeout secs. " + + s"You can increase the timeout for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or " + + s"disable broadcast join by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1")) + } +}, timeout, TimeUnit.SECONDS) +broadcastFuture.onCo
[spark] branch master updated: [SPARK-31475][SQL] Broadcast stage in AQE did not timeout
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 44d370d [SPARK-31475][SQL] Broadcast stage in AQE did not timeout 44d370d is described below commit 44d370dd4501f0a4abb7194f7cff0d346aac0992 Author: Maryann Xue AuthorDate: Mon Apr 20 11:55:48 2020 -0700 [SPARK-31475][SQL] Broadcast stage in AQE did not timeout ### What changes were proposed in this pull request? This PR adds a timeout for the Future of a BroadcastQueryStageExec to make sure it can have the same timeout behavior as a non-AQE broadcast exchange. ### Why are the changes needed? This is to make the broadcast timeout behavior in AQE consistent with that in non-AQE. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added UT. Closes #28250 from maryannxue/aqe-broadcast-timeout. Authored-by: Maryann Xue Signed-off-by: gatorsmile --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../sql/execution/adaptive/QueryStageExec.scala| 35 ++ .../execution/exchange/BroadcastExchangeExec.scala | 8 ++--- .../sql/execution/joins/BroadcastJoinSuite.scala | 23 -- 4 files changed, 56 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 3ac4ea5..f819937 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -547,7 +547,7 @@ case class AdaptiveSparkPlanExec( } object AdaptiveSparkPlanExec { - private val executionContext = ExecutionContext.fromExecutorService( + private[adaptive] val executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("QueryStageCreator", 16)) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index beaa972..f414f85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution.adaptive -import scala.concurrent.Future +import java.util.concurrent.TimeUnit -import org.apache.spark.{FutureAction, MapOutputStatistics} +import scala.concurrent.{Future, Promise} + +import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -28,6 +30,8 @@ import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.exchange._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.ThreadUtils /** * A query stage is an independent subgraph of the query plan. Query stage materializes its output @@ -100,8 +104,8 @@ abstract class QueryStageExec extends LeafExecNode { override def executeTail(n: Int): Array[InternalRow] = plan.executeTail(n) override def executeToIterator(): Iterator[InternalRow] = plan.executeToIterator() - override def doPrepare(): Unit = plan.prepare() - override def doExecute(): RDD[InternalRow] = plan.execute() + protected override def doPrepare(): Unit = plan.prepare() + protected override def doExecute(): RDD[InternalRow] = plan.execute() override def doExecuteBroadcast[T](): Broadcast[T] = plan.executeBroadcast() override def doCanonicalize(): SparkPlan = plan.canonicalized @@ -187,8 +191,24 @@ case class BroadcastQueryStageExec( throw new IllegalStateException("wrong plan for broadcast stage:\n " + plan.treeString) } + @transient private lazy val materializeWithTimeout = { +val broadcastFuture = broadcast.completionFuture +val timeout = SQLConf.get.broadcastTimeout +val promise = Promise[Any]() +val fail = BroadcastQueryStageExec.scheduledExecutor.schedule(new Runnable() { + override def run(): Unit = { +promise.tryFailure(new SparkException(s"Could not execute broadcast in $timeout secs. " + + s"You can increase the timeout for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or " + + s"disable broadcast join by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1")) + } +}, timeout, TimeUnit.SECONDS) +broadcastFuture.onCo