This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 33864a8 [SPARK-27496][CORE] Fatal errors should also be sent back to the sender 33864a8 is described below commit 33864a8eb0e939b0edc90be30a0c48409eb8312a Author: Shixiong Zhu <zsxw...@gmail.com> AuthorDate: Sun Apr 21 17:00:07 2019 -0700 [SPARK-27496][CORE] Fatal errors should also be sent back to the sender ## What changes were proposed in this pull request? When a fatal error (such as StackOverflowError) throws from "receiveAndReply", we should try our best to notify the sender. Otherwise, the sender will hang until timeout. In addition, when a MessageLoop is dying unexpectedly, it should resubmit a new one so that Dispatcher is still working. ## How was this patch tested? New unit tests. Closes #24396 from zsxwing/SPARK-27496. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit 009059e3c261a73d605bc49aee4aecb0eb0e8267) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/rpc/netty/Dispatcher.scala | 10 +++- .../scala/org/apache/spark/rpc/netty/Inbox.scala | 2 +- .../apache/spark/rpc/netty/NettyRpcEnvSuite.scala | 53 +++++++++++++++++++++- 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 904c4d0..f261635 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -224,7 +224,15 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte } } } catch { - case ie: InterruptedException => // exit + case _: InterruptedException => // exit + case t: Throwable => + try { + // Re-submit a MessageLoop so that Dispatcher will still work if + // UncaughtExceptionHandler decides to not kill JVM. + threadpool.execute(new MessageLoop) + } finally { + throw t + } } } } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala index d32eba6..44d2622 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala @@ -106,7 +106,7 @@ private[netty] class Inbox( throw new SparkException(s"Unsupported message $message from ${_sender}") }) } catch { - case NonFatal(e) => + case e: Throwable => context.sendFailure(e) // Throw the exception -- this exception will be caught by the safelyCall function. // The endpoint's onError function will be called. diff --git a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala index f9481f8..59b4b70 100644 --- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala @@ -17,13 +17,20 @@ package org.apache.spark.rpc.netty +import java.util.concurrent.ExecutionException + +import scala.concurrent.duration._ + +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.network.client.TransportClient import org.apache.spark.rpc._ -class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar { +class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar with TimeLimits { + + private implicit val signaler: Signaler = ThreadSignaler override def createRpcEnv( conf: SparkConf, @@ -84,4 +91,48 @@ class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar { msg3, RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv))) } + + test("StackOverflowError should be sent back and Dispatcher should survive") { + val numUsableCores = 2 + val conf = new SparkConf + val config = RpcEnvConfig( + conf, + "test", + "localhost", + "localhost", + 0, + new SecurityManager(conf), + numUsableCores, + clientMode = false) + val anotherEnv = new NettyRpcEnvFactory().create(config) + anotherEnv.setupEndpoint("StackOverflowError", new RpcEndpoint { + override val rpcEnv = anotherEnv + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + // scalastyle:off throwerror + case msg: String => throw new StackOverflowError + // scalastyle:on throwerror + case num: Int => context.reply(num) + } + }) + + val rpcEndpointRef = env.setupEndpointRef(anotherEnv.address, "StackOverflowError") + try { + // Send `numUsableCores` messages to trigger `numUsableCores` `StackOverflowError`s + for (_ <- 0 until numUsableCores) { + val e = intercept[SparkException] { + rpcEndpointRef.askSync[String]("hello") + } + // The root cause `e.getCause.getCause` because it is boxed by Scala Promise. + assert(e.getCause.isInstanceOf[ExecutionException]) + assert(e.getCause.getCause.isInstanceOf[StackOverflowError]) + } + failAfter(10.seconds) { + assert(rpcEndpointRef.askSync[Int](100) === 100) + } + } finally { + anotherEnv.shutdown() + anotherEnv.awaitTermination() + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org