[spark] branch branch-3.0 updated: [SPARK-31472][CORE][3.0] Make sure Barrier Task always return messages or exception with abortableRpcFuture check

2020-04-23 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 1b221f3  [SPARK-31472][CORE][3.0] Make sure Barrier Task always return 
messages or exception with abortableRpcFuture check
1b221f3 is described below

commit 1b221f35abd1657a3ecd49335118bfd5dcb811ee
Author: yi.wu 
AuthorDate: Thu Apr 23 14:43:27 2020 +

[SPARK-31472][CORE][3.0] Make sure Barrier Task always return messages or 
exception with abortableRpcFuture check

### What changes were proposed in this pull request?

Rewrite the periodically check logic of  `abortableRpcFuture` to make sure 
that barrier task would always return either desired messages or expected 
exception.

This PR also simplify a bit around `AbortableRpcFuture`.

### Why are the changes needed?

Currently, the periodically check logic of  `abortableRpcFuture` is done by 
following:

```scala
...
var messages: Array[String] = null

while (!abortableRpcFuture.toFuture.isCompleted) {
   messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 1.second)
   ...
}
return messages
```
It's possible that `abortableRpcFuture` complete before next invocation on 
`messages = ...`. In this case, the task may return null messages or execute 
successfully while it should throw exception(e.g. `SparkException` from 
`BarrierCoordinator`).

And here's a flaky test which caused by this bug:

```
[info] BarrierTaskContextSuite:
[info] - share messages with allGather() call *** FAILED *** (18 seconds, 
705 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
Could not recover from a failed barrier ResultStage. Most recent failure 
reason: Stage failed because barrier task ResultTask(0, 2) finished 
unsuccessfully.
[info] java.lang.NullPointerException
[info]  at 
scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:204)
[info]  at 
scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:204)
[info]  at 
scala.collection.IndexedSeqOptimized.toList(IndexedSeqOptimized.scala:285)
[info]  at 
scala.collection.IndexedSeqOptimized.toList$(IndexedSeqOptimized.scala:284)
[info]  at 
scala.collection.mutable.ArrayOps$ofRef.toList(ArrayOps.scala:198)
[info]  at 
org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$4(BarrierTaskContextSuite.scala:68)
...
```

The test exception can be reproduced by changing the line `messages = ...` 
to the following:

```scala
messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 10.micros)
Thread.sleep(5000)
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manually test and update some unit tests.

Closes #28312 from Ngone51/cherry-pick-31472.

Authored-by: yi.wu 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/BarrierTaskContext.scala  | 30 ++
 .../org/apache/spark/rpc/RpcEndpointRef.scala  | 10 +++-
 .../org/apache/spark/rpc/netty/NettyRpcEnv.scala   | 12 +
 .../scala/org/apache/spark/util/ThreadUtils.scala  |  5 ++--
 .../scala/org/apache/spark/rpc/RpcEnvSuite.scala   | 12 -
 5 files changed, 32 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 06f8024..4d76548 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -20,9 +20,9 @@ package org.apache.spark
 import java.util.{Properties, Timer, TimerTask}
 
 import scala.collection.JavaConverters._
-import scala.concurrent.TimeoutException
 import scala.concurrent.duration._
 import scala.language.postfixOps
+import scala.util.{Failure, Success => ScalaSuccess, Try}
 
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.executor.TaskMetrics
@@ -85,28 +85,26 @@ class BarrierTaskContext private[spark] (
 // BarrierCoordinator on timeout, instead of RPCTimeoutException from 
the RPC framework.
 timeout = new RpcTimeout(365.days, "barrierTimeout"))
 
-  // messages which consist of all barrier tasks' messages
-  var messages: Array[String] = null
   // Wait the RPC future to be completed, but every 1 second it will jump 
out waiting
   // and check whether current spark task is killed. If killed, then throw
   // a `TaskKilledException`, otherwise continue wait RPC until it 
completes.
-  try {
-while (!abortableRpcFuture.toFuture.isCompleted) {
+
+  while 

[spark] branch branch-3.0 updated: [SPARK-31472][CORE][3.0] Make sure Barrier Task always return messages or exception with abortableRpcFuture check

2020-04-23 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new 1b221f3  [SPARK-31472][CORE][3.0] Make sure Barrier Task always return 
messages or exception with abortableRpcFuture check
1b221f3 is described below

commit 1b221f35abd1657a3ecd49335118bfd5dcb811ee
Author: yi.wu 
AuthorDate: Thu Apr 23 14:43:27 2020 +

[SPARK-31472][CORE][3.0] Make sure Barrier Task always return messages or 
exception with abortableRpcFuture check

### What changes were proposed in this pull request?

Rewrite the periodically check logic of  `abortableRpcFuture` to make sure 
that barrier task would always return either desired messages or expected 
exception.

This PR also simplify a bit around `AbortableRpcFuture`.

### Why are the changes needed?

Currently, the periodically check logic of  `abortableRpcFuture` is done by 
following:

```scala
...
var messages: Array[String] = null

while (!abortableRpcFuture.toFuture.isCompleted) {
   messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 1.second)
   ...
}
return messages
```
It's possible that `abortableRpcFuture` complete before next invocation on 
`messages = ...`. In this case, the task may return null messages or execute 
successfully while it should throw exception(e.g. `SparkException` from 
`BarrierCoordinator`).

And here's a flaky test which caused by this bug:

```
[info] BarrierTaskContextSuite:
[info] - share messages with allGather() call *** FAILED *** (18 seconds, 
705 milliseconds)
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: 
Could not recover from a failed barrier ResultStage. Most recent failure 
reason: Stage failed because barrier task ResultTask(0, 2) finished 
unsuccessfully.
[info] java.lang.NullPointerException
[info]  at 
scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:204)
[info]  at 
scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:204)
[info]  at 
scala.collection.IndexedSeqOptimized.toList(IndexedSeqOptimized.scala:285)
[info]  at 
scala.collection.IndexedSeqOptimized.toList$(IndexedSeqOptimized.scala:284)
[info]  at 
scala.collection.mutable.ArrayOps$ofRef.toList(ArrayOps.scala:198)
[info]  at 
org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$4(BarrierTaskContextSuite.scala:68)
...
```

The test exception can be reproduced by changing the line `messages = ...` 
to the following:

```scala
messages = ThreadUtils.awaitResult(abortableRpcFuture.toFuture, 10.micros)
Thread.sleep(5000)
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manually test and update some unit tests.

Closes #28312 from Ngone51/cherry-pick-31472.

Authored-by: yi.wu 
Signed-off-by: Wenchen Fan 
---
 .../org/apache/spark/BarrierTaskContext.scala  | 30 ++
 .../org/apache/spark/rpc/RpcEndpointRef.scala  | 10 +++-
 .../org/apache/spark/rpc/netty/NettyRpcEnv.scala   | 12 +
 .../scala/org/apache/spark/util/ThreadUtils.scala  |  5 ++--
 .../scala/org/apache/spark/rpc/RpcEnvSuite.scala   | 12 -
 5 files changed, 32 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index 06f8024..4d76548 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -20,9 +20,9 @@ package org.apache.spark
 import java.util.{Properties, Timer, TimerTask}
 
 import scala.collection.JavaConverters._
-import scala.concurrent.TimeoutException
 import scala.concurrent.duration._
 import scala.language.postfixOps
+import scala.util.{Failure, Success => ScalaSuccess, Try}
 
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.executor.TaskMetrics
@@ -85,28 +85,26 @@ class BarrierTaskContext private[spark] (
 // BarrierCoordinator on timeout, instead of RPCTimeoutException from 
the RPC framework.
 timeout = new RpcTimeout(365.days, "barrierTimeout"))
 
-  // messages which consist of all barrier tasks' messages
-  var messages: Array[String] = null
   // Wait the RPC future to be completed, but every 1 second it will jump 
out waiting
   // and check whether current spark task is killed. If killed, then throw
   // a `TaskKilledException`, otherwise continue wait RPC until it 
completes.
-  try {
-while (!abortableRpcFuture.toFuture.isCompleted) {
+
+  while