[spark] branch branch-3.0 updated: [SPARK-31472][CORE][3.0] Make sure Barrier Task always return messages or exception with abortableRpcFuture check
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 1b221f3 [SPARK-31472][CORE][3.0] Make sure Barrier Task always return messages or exception with abortableRpcFuture check 1b221f3 is described below commit 1b221f35abd1657a3ecd49335118bfd5dcb811ee Author: yi.wu AuthorDate: Thu Apr 23 14:43:27 2020 + [SPARK-31472][CORE][3.0] Make sure Barrier Task always return messages or exception with abortableRpcFuture check ### What changes were proposed in this pull request? Rewrite the periodically check logic of `abortableRpcFuture` to make sure that barrier task would always return either desired messages or expected exception. This PR also simplify a bit around `AbortableRpcFuture`. ### Why are the changes needed? Currently, the periodically check logic of `abortableRpcFuture` is done by following: ```scala ... var messages: Array[String] = null while (!abortableRpcFuture.toFuture.isCompleted) { messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 1.second) ... } return messages ``` It's possible that `abortableRpcFuture` complete before next invocation on `messages = ...`. In this case, the task may return null messages or execute successfully while it should throw exception(e.g. `SparkException` from `BarrierCoordinator`). And here's a flaky test which caused by this bug: ``` [info] BarrierTaskContextSuite: [info] - share messages with allGather() call *** FAILED *** (18 seconds, 705 milliseconds) [info] org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(0, 2) finished unsuccessfully. [info] java.lang.NullPointerException [info] at scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:204) [info] at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:204) [info] at scala.collection.IndexedSeqOptimized.toList(IndexedSeqOptimized.scala:285) [info] at scala.collection.IndexedSeqOptimized.toList$(IndexedSeqOptimized.scala:284) [info] at scala.collection.mutable.ArrayOps$ofRef.toList(ArrayOps.scala:198) [info] at org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$4(BarrierTaskContextSuite.scala:68) ... ``` The test exception can be reproduced by changing the line `messages = ...` to the following: ```scala messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 10.micros) Thread.sleep(5000) ``` ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually test and update some unit tests. Closes #28312 from Ngone51/cherry-pick-31472. Authored-by: yi.wu Signed-off-by: Wenchen Fan --- .../org/apache/spark/BarrierTaskContext.scala | 30 ++ .../org/apache/spark/rpc/RpcEndpointRef.scala | 10 +++- .../org/apache/spark/rpc/netty/NettyRpcEnv.scala | 12 + .../scala/org/apache/spark/util/ThreadUtils.scala | 5 ++-- .../scala/org/apache/spark/rpc/RpcEnvSuite.scala | 12 - 5 files changed, 32 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 06f8024..4d76548 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -20,9 +20,9 @@ package org.apache.spark import java.util.{Properties, Timer, TimerTask} import scala.collection.JavaConverters._ -import scala.concurrent.TimeoutException import scala.concurrent.duration._ import scala.language.postfixOps +import scala.util.{Failure, Success => ScalaSuccess, Try} import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.executor.TaskMetrics @@ -85,28 +85,26 @@ class BarrierTaskContext private[spark] ( // BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework. timeout = new RpcTimeout(365.days, "barrierTimeout")) - // messages which consist of all barrier tasks' messages - var messages: Array[String] = null // Wait the RPC future to be completed, but every 1 second it will jump out waiting // and check whether current spark task is killed. If killed, then throw // a `TaskKilledException`, otherwise continue wait RPC until it completes. - try { -while (!abortableRpcFuture.toFuture.isCompleted) { + + while
[spark] branch branch-3.0 updated: [SPARK-31472][CORE][3.0] Make sure Barrier Task always return messages or exception with abortableRpcFuture check
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 1b221f3 [SPARK-31472][CORE][3.0] Make sure Barrier Task always return messages or exception with abortableRpcFuture check 1b221f3 is described below commit 1b221f35abd1657a3ecd49335118bfd5dcb811ee Author: yi.wu AuthorDate: Thu Apr 23 14:43:27 2020 + [SPARK-31472][CORE][3.0] Make sure Barrier Task always return messages or exception with abortableRpcFuture check ### What changes were proposed in this pull request? Rewrite the periodically check logic of `abortableRpcFuture` to make sure that barrier task would always return either desired messages or expected exception. This PR also simplify a bit around `AbortableRpcFuture`. ### Why are the changes needed? Currently, the periodically check logic of `abortableRpcFuture` is done by following: ```scala ... var messages: Array[String] = null while (!abortableRpcFuture.toFuture.isCompleted) { messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 1.second) ... } return messages ``` It's possible that `abortableRpcFuture` complete before next invocation on `messages = ...`. In this case, the task may return null messages or execute successfully while it should throw exception(e.g. `SparkException` from `BarrierCoordinator`). And here's a flaky test which caused by this bug: ``` [info] BarrierTaskContextSuite: [info] - share messages with allGather() call *** FAILED *** (18 seconds, 705 milliseconds) [info] org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(0, 2) finished unsuccessfully. [info] java.lang.NullPointerException [info] at scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:204) [info] at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:204) [info] at scala.collection.IndexedSeqOptimized.toList(IndexedSeqOptimized.scala:285) [info] at scala.collection.IndexedSeqOptimized.toList$(IndexedSeqOptimized.scala:284) [info] at scala.collection.mutable.ArrayOps$ofRef.toList(ArrayOps.scala:198) [info] at org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$4(BarrierTaskContextSuite.scala:68) ... ``` The test exception can be reproduced by changing the line `messages = ...` to the following: ```scala messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 10.micros) Thread.sleep(5000) ``` ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Manually test and update some unit tests. Closes #28312 from Ngone51/cherry-pick-31472. Authored-by: yi.wu Signed-off-by: Wenchen Fan --- .../org/apache/spark/BarrierTaskContext.scala | 30 ++ .../org/apache/spark/rpc/RpcEndpointRef.scala | 10 +++- .../org/apache/spark/rpc/netty/NettyRpcEnv.scala | 12 + .../scala/org/apache/spark/util/ThreadUtils.scala | 5 ++-- .../scala/org/apache/spark/rpc/RpcEnvSuite.scala | 12 - 5 files changed, 32 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala index 06f8024..4d76548 100644 --- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala +++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala @@ -20,9 +20,9 @@ package org.apache.spark import java.util.{Properties, Timer, TimerTask} import scala.collection.JavaConverters._ -import scala.concurrent.TimeoutException import scala.concurrent.duration._ import scala.language.postfixOps +import scala.util.{Failure, Success => ScalaSuccess, Try} import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.executor.TaskMetrics @@ -85,28 +85,26 @@ class BarrierTaskContext private[spark] ( // BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework. timeout = new RpcTimeout(365.days, "barrierTimeout")) - // messages which consist of all barrier tasks' messages - var messages: Array[String] = null // Wait the RPC future to be completed, but every 1 second it will jump out waiting // and check whether current spark task is killed. If killed, then throw // a `TaskKilledException`, otherwise continue wait RPC until it completes. - try { -while (!abortableRpcFuture.toFuture.isCompleted) { + + while