This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new ffcd757 [SPARK-32905][CORE][YARN] ApplicationMaster fails to receive UpdateDelegationTokens message ffcd757 is described below commit ffcd757cf54583dfd589b42e87897fe8a255077f Author: Kent Yao <yaooq...@hotmail.com> AuthorDate: Fri Sep 18 07:41:21 2020 +0000 [SPARK-32905][CORE][YARN] ApplicationMaster fails to receive UpdateDelegationTokens message ### What changes were proposed in this pull request? With a long-running application in kerberized mode, the AMEndpiont handles `UpdateDelegationTokens` message wrong, which is an OneWayMessage that should be handled in the `receive` function. ```java 20-09-15 18:53:01 INFO yarn.YarnAllocator: Received 22 containers from YARN, launching executors on 0 of them. 20-09-16 12:52:28 ERROR netty.Inbox: Ignoring error org.apache.spark.SparkException: NettyRpcEndpointRef(spark-client://YarnAM) does not implement 'receive' at org.apache.spark.rpc.RpcEndpoint$$anonfun$receive$1.applyOrElse(RpcEndpoint.scala:70) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 20-09-17 06:52:28 ERROR netty.Inbox: Ignoring error org.apache.spark.SparkException: NettyRpcEndpointRef(spark-client://YarnAM) does not implement 'receive' at org.apache.spark.rpc.RpcEndpoint$$anonfun$receive$1.applyOrElse(RpcEndpoint.scala:70) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` ### Why are the changes needed? bugfix, without a proper token refresher, the long-running apps are going to fail potentially in kerberized cluster ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Passing jenkins and verify manually I am running the sub-module `kyuubi-spark-sql-engine` of https://github.com/yaooqinn/kyuubi The simplest way to reproduce the bug and verify this fix is to follow these steps #### 1 build the `kyuubi-spark-sql-engine` module ``` mvn clean package -pl :kyuubi-spark-sql-engine ``` #### 2. config the spark with Kerberos settings towards your secured cluster #### 3. start it in the background ``` nohup bin/spark-submit --class org.apache.kyuubi.engine.spark.SparkSQLEngine ../kyuubi-spark-sql-engine-1.0.0-SNAPSHOT.jar > kyuubi.log & ``` #### 4. check the AM log and see "Updating delegation tokens ..." for SUCCESS "Inbox: Ignoring error ...... does not implement 'receive'" for FAILURE Closes #29777 from yaooqinn/SPARK-32905. Authored-by: Kent Yao <yaooq...@hotmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 9e9d4b6994a29fb139fd50d24b5418a900c7f072) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 862acd8..99efa5c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -775,6 +775,11 @@ private[spark] class ApplicationMaster( driver.send(RegisterClusterManager(self)) } + override def receive: PartialFunction[Any, Unit] = { + case UpdateDelegationTokens(tokens) => + SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf) + } + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: RequestExecutors => Option(allocator) match { @@ -806,9 +811,6 @@ private[spark] class ApplicationMaster( case None => logWarning("Container allocator is not ready to find executor loss reasons yet.") } - - case UpdateDelegationTokens(tokens) => - SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org