This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 1cf4e1b [SPARK-34469][K8S] Ignore RegisterExecutor when SparkContext is stopped 1cf4e1b is described below commit 1cf4e1bd023b51603d2d7ad2f0ce936c5d7e999d Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Fri Feb 19 09:36:07 2021 -0800 [SPARK-34469][K8S] Ignore RegisterExecutor when SparkContext is stopped ### What changes were proposed in this pull request? This PR aims to make `KubernetesClusterSchedulerBackend` ignore `RegisterExecutor` message when `SparkContext` is stopped already. ### Why are the changes needed? If `SparkDriver` is terminated, the executors will be removed by K8s automatically. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the newly added test case. Closes #31587 from dongjoon-hyun/SPARK-34469. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../cluster/k8s/KubernetesClusterSchedulerBackend.scala | 9 ++++++++- .../cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala | 9 ++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 78862bc..c35a434 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -29,9 +29,10 @@ import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO import org.apache.spark.resource.ResourceProfile -import org.apache.spark.rpc.RpcAddress +import org.apache.spark.rpc.{RpcAddress, RpcCallContext} import org.apache.spark.scheduler.{ExecutorKilled, ExecutorLossReason, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( @@ -214,6 +215,12 @@ private[spark] class KubernetesClusterSchedulerBackend( } private class KubernetesDriverEndpoint extends DriverEndpoint { + private def ignoreRegisterExecutorAtStoppedContext: PartialFunction[Any, Unit] = { + case _: RegisterExecutor if sc.isStopped => // No-op + } + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = + ignoreRegisterExecutorAtStoppedContext.orElse(super.receiveAndReply(context)) override def onDisconnected(rpcAddress: RpcAddress): Unit = { // Don't do anything besides disabling the executor - allow the Kubernetes API events to diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index b0dd40d..861d41c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager} import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl} -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RemoveExecutor, StopDriver} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor, StopDriver} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID @@ -199,4 +199,11 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn // Verify the last operation of `schedulerBackendUnderTest.stop`. verify(kubernetesClient).close() } + + test("SPARK-34469: Ignore RegisterExecutor when SparkContext is stopped") { + when(sc.isStopped).thenReturn(true) + val endpoint = schedulerBackendUnderTest.createDriverEndpoint() + endpoint.receiveAndReply(null).apply( + RegisterExecutor("1", null, "host1", 1, Map.empty, Map.empty, Map.empty, 0)) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org