agrawaldevesh commented on a change in pull request #29788: URL: https://github.com/apache/spark/pull/29788#discussion_r491083572
########## File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ########## @@ -2368,7 +2368,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case ExecutorLost(execId, reason) => val workerHost = reason match { case ExecutorProcessLost(_, workerHost, _) => workerHost - case ExecutorDecommission(workerHost) => workerHost + case ExecutorDecommission(_, host) => host Review comment: See comment below for `ExecutorDecommission` ... Should this be changed to a: ``` case decom @ ExecutorDecommission => decom.workerHost // or decom.host ``` You don't need to add an extra '_' then. ########## File path: core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala ########## @@ -71,7 +71,8 @@ case class ExecutorProcessLost( * This is used by the task scheduler to remove state associated with the executor, but * not yet fail any tasks that were running in the executor before the executor is "fully" lost. * - * @param workerHost it is defined when the worker is decommissioned too + * @param reason the reason why the executor is decommissioned + * @param host it is defined when the host where the executor located is decommissioned too */ -private [spark] case class ExecutorDecommission(workerHost: Option[String] = None) - extends ExecutorLossReason("Executor decommission.") +private [spark] case class ExecutorDecommission(reason: String, host: Option[String] = None) Review comment: My scala knowledge is really really poor, but I would rather we make this be a non case class if you are planning to do this. Currently, I think the field "reason" is going to be duplicated in the base class ExecutorLossReason and the ExecutorDecommission. That's also the reason why you are pattern matching it above with an additional _ (for the `reason`) argument, when you really don't care about the `reason`. ########## File path: core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionReason.scala ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +private[spark] sealed trait ExecutorDecommissionReason { + val reason: String = "decommissioned" Review comment: I don't think the `reason` field is really needed anywhere, besides it being used for `toString` ? Should we just require overriding `toString` by marking `toString` abstract ? I don't think that child classes need to override both `toString` and `reason` : I would prefer we just override methods instead of fields. ########## File path: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ########## @@ -970,6 +970,9 @@ private[spark] class TaskSchedulerImpl( logDebug(s"Executor $executorId on $hostPort lost, but reason not yet known.") case ExecutorKilled => logInfo(s"Executor $executorId on $hostPort killed by driver.") + case ExecutorDecommission(reason, _) => + // use logInfo instead of logError as the loss of decommissioned executor is what we expect + logInfo(s"Decommissioned executor $executorId on $hostPort shutdown: $reason") Review comment: instead of 'shutdown', should we say 'is finally lost' ? To be more accurate in this setting. +1 on this change to avoid log spam. ########## File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ########## @@ -991,7 +991,7 @@ private[spark] class TaskSetManager( for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { val exitCausedByApp: Boolean = reason match { case exited: ExecutorExited => exited.exitCausedByApp - case ExecutorKilled | ExecutorDecommission(_) => false + case ExecutorKilled | ExecutorDecommission(_, _) => false Review comment: I am wondering if we should instead pattern match in a separate arm like: ``` _ @ ExecutorDecommission => false ``` To avoid having to change the case arms when we make changes to the structure definitions. ########## File path: core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionReason.scala ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +private[spark] sealed trait ExecutorDecommissionReason { Review comment: @holdenk, can you please provide an example of how having this as a sealed trait would limit the flexibility ? It is marked as a private[spark], so the resource manager specific scheduler backends, should be able to extend it ... no ? ########## File path: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ########## @@ -191,6 +192,18 @@ private[spark] class KubernetesClusterSchedulerBackend( private class KubernetesDriverEndpoint extends DriverEndpoint { + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case ExecutorDecommissioning(executorId) => Review comment: I didn't fully follow the need for distinction b/w the K8s case and the simple executor triggered case. I thought K8s only needs the SIGPWR based thing, and indeed ExecutorDecommissioning is only sent in response to a SIGPWR. So I am missing why we override `ExecutorDecommissioning` here and the motivation for `K8SDecommission`. ########## File path: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ########## @@ -88,44 +88,35 @@ private[spark] trait ExecutorAllocationClient { * Default implementation delegates to kill, scheduler must override * if it supports graceful decommissioning. * - * @param executorsAndDecomInfo identifiers of executors & decom info. + * @param executorsAndDecomReason identifiers of executors & decom reason. * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down * after these executors have been decommissioned. - * @param triggeredByExecutor whether the decommission is triggered at executor. * @return the ids of the executors acknowledged by the cluster manager to be removed. */ def decommissionExecutors( - executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], - adjustTargetNumExecutors: Boolean, - triggeredByExecutor: Boolean): Seq[String] = { - killExecutors(executorsAndDecomInfo.map(_._1), + executorsAndDecomReason: Array[(String, ExecutorDecommissionReason)], Review comment: This is how it was earlier -- so we aren't changing the semantics save the renaming :-) And plus yes this can happen: Different executors on different hosts would have different ExecutorDecommissionReason/Info with different hosts potentially in them. This is simply a bulk api : Instead of making n calls we are folding them into one. ########## File path: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ########## @@ -17,7 +17,7 @@ package org.apache.spark -import org.apache.spark.scheduler.ExecutorDecommissionInfo +import org.apache.spark.scheduler.ExecutorDecommissionReason Review comment: +1, I would argue against un-necessary renaming even if it seems a bit "unnatural". It creates un-necessary diff noise. To me "Info" and "Reason" are both similar: They both portend "additional information". ########## File path: core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionReason.scala ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +private[spark] sealed trait ExecutorDecommissionReason { + val reason: String = "decommissioned" + override def toString: String = reason +} + +/** + * For the case where decommission is trigger because of executor dynamic allocation + */ +case class DynamicAllocationDecommission() extends ExecutorDecommissionReason { + override val reason: String = "decommissioned by dynamic allocation" +} + +/** + * For the case where decommission is triggered at executor fist. + */ +class ExecutorTriggeredDecommission extends ExecutorDecommissionReason + +/** + * For the Kubernetes workloads + */ +case class K8SDecommission() extends ExecutorTriggeredDecommission + +/** + * For the Standalone workloads. + * @param workerHost When workerHost is defined, it means the worker has been decommissioned too. + * Used to infer if the shuffle data might be lost even if the external shuffle + * service is enabled. + */ +case class StandaloneDecommission(workerHost: Option[String] = None) + extends ExecutorDecommissionReason { + override val reason: String = if (workerHost.isDefined) { + s"Worker ${workerHost.get} decommissioned" + } else { + "decommissioned" + } +} + +/** Review comment: Can you move this test only class somewhere in the test only package ? See TestResourceIDs as an example. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org