agrawaldevesh commented on a change in pull request #29788:
URL: https://github.com/apache/spark/pull/29788#discussion_r491083572



##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -2368,7 +2368,7 @@ private[scheduler] class 
DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     case ExecutorLost(execId, reason) =>
       val workerHost = reason match {
         case ExecutorProcessLost(_, workerHost, _) => workerHost
-        case ExecutorDecommission(workerHost) => workerHost
+        case ExecutorDecommission(_, host) => host

Review comment:
       See comment below for `ExecutorDecommission` ... Should this be changed 
to a:
   
   ```
   case decom @ ExecutorDecommission => decom.workerHost // or decom.host
   ```
   
   You don't need to add an extra '_' then.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
##########
@@ -71,7 +71,8 @@ case class ExecutorProcessLost(
  * This is used by the task scheduler to remove state associated with the 
executor, but
  * not yet fail any tasks that were running in the executor before the 
executor is "fully" lost.
  *
- * @param workerHost it is defined when the worker is decommissioned too
+ * @param reason the reason why the executor is decommissioned
+ * @param host it is defined when the host where the executor located is 
decommissioned too
  */
-private [spark] case class ExecutorDecommission(workerHost: Option[String] = 
None)
- extends ExecutorLossReason("Executor decommission.")
+private [spark] case class ExecutorDecommission(reason: String, host: 
Option[String] = None)

Review comment:
       My scala knowledge is really really poor, but I would rather we make 
this be a non case class if you are planning to do this. Currently, I think the 
field "reason" is going to be duplicated in the base class ExecutorLossReason  
and the ExecutorDecommission. 
   
   That's also the reason why you are pattern matching it above with an 
additional _ (for the `reason`) argument, when you really don't care about the 
`reason`. 

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionReason.scala
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+private[spark] sealed trait ExecutorDecommissionReason {
+  val reason: String = "decommissioned"

Review comment:
       I don't think the `reason` field is really needed anywhere, besides it 
being used for `toString` ? Should we just require overriding `toString` by 
marking `toString` abstract ? I don't think that child classes need to override 
both `toString` and `reason` : I would prefer we just override methods instead 
of fields.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
##########
@@ -970,6 +970,9 @@ private[spark] class TaskSchedulerImpl(
       logDebug(s"Executor $executorId on $hostPort lost, but reason not yet 
known.")
     case ExecutorKilled =>
       logInfo(s"Executor $executorId on $hostPort killed by driver.")
+    case ExecutorDecommission(reason, _) =>
+      // use logInfo instead of logError as the loss of decommissioned 
executor is what we expect
+      logInfo(s"Decommissioned executor $executorId on $hostPort shutdown: 
$reason")

Review comment:
       instead of 'shutdown', should we say 'is finally lost' ? To be more 
accurate in this setting.
   
   +1 on this change to avoid log spam.

##########
File path: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
##########
@@ -991,7 +991,7 @@ private[spark] class TaskSetManager(
     for ((tid, info) <- taskInfos if info.running && info.executorId == 
execId) {
       val exitCausedByApp: Boolean = reason match {
         case exited: ExecutorExited => exited.exitCausedByApp
-        case ExecutorKilled | ExecutorDecommission(_) => false
+        case ExecutorKilled | ExecutorDecommission(_, _) => false

Review comment:
       I am wondering if we should instead pattern match in a separate arm 
like: 
   
   ```
   _ @ ExecutorDecommission => false
   ```
   
   To avoid having to change the case arms when we make changes to the 
structure definitions.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionReason.scala
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+private[spark] sealed trait ExecutorDecommissionReason {

Review comment:
       @holdenk, can you please provide an example of how having this as a 
sealed trait would limit the flexibility ? 
   
   It is marked as a private[spark], so the resource manager specific scheduler 
backends, should be able to extend it ... no ?

##########
File path: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
##########
@@ -191,6 +192,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
 
   private class KubernetesDriverEndpoint extends DriverEndpoint {
 
+    override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+      case ExecutorDecommissioning(executorId) =>

Review comment:
       I didn't fully follow the need for distinction b/w the K8s case and the 
simple executor triggered case. 
   
   I thought K8s only needs the SIGPWR based thing, and indeed 
ExecutorDecommissioning is only sent in response to a SIGPWR.
   
   So I am missing why we override `ExecutorDecommissioning` here and the 
motivation for `K8SDecommission`.

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
##########
@@ -88,44 +88,35 @@ private[spark] trait ExecutorAllocationClient {
    * Default implementation delegates to kill, scheduler must override
    * if it supports graceful decommissioning.
    *
-   * @param executorsAndDecomInfo identifiers of executors & decom info.
+   * @param executorsAndDecomReason identifiers of executors & decom reason.
    * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
    *                                 after these executors have been 
decommissioned.
-   * @param triggeredByExecutor whether the decommission is triggered at 
executor.
    * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
    */
   def decommissionExecutors(
-      executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
-      adjustTargetNumExecutors: Boolean,
-      triggeredByExecutor: Boolean): Seq[String] = {
-    killExecutors(executorsAndDecomInfo.map(_._1),
+      executorsAndDecomReason: Array[(String, ExecutorDecommissionReason)],

Review comment:
       This is how it was earlier -- so we aren't changing the semantics save 
the renaming :-) And plus yes this can happen: Different executors on different 
hosts would have different ExecutorDecommissionReason/Info with different hosts 
potentially in them. 
   
   This is simply a bulk api : Instead of making n calls we are folding them 
into one.

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
##########
@@ -17,7 +17,7 @@
 
 package org.apache.spark
 
-import org.apache.spark.scheduler.ExecutorDecommissionInfo
+import org.apache.spark.scheduler.ExecutorDecommissionReason

Review comment:
       +1, I would argue against un-necessary renaming even if it seems a bit 
"unnatural". It creates un-necessary diff noise. 
   
   To me "Info" and "Reason" are both similar: They both portend "additional 
information".

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionReason.scala
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+private[spark] sealed trait ExecutorDecommissionReason {
+  val reason: String = "decommissioned"
+  override def toString: String = reason
+}
+
+/**
+ * For the case where decommission is trigger because of executor dynamic 
allocation
+ */
+case class DynamicAllocationDecommission() extends ExecutorDecommissionReason {
+  override val reason: String = "decommissioned by dynamic allocation"
+}
+
+/**
+ * For the case where decommission is triggered at executor fist.
+ */
+class ExecutorTriggeredDecommission extends ExecutorDecommissionReason
+
+/**
+ * For the Kubernetes workloads
+ */
+case class K8SDecommission() extends ExecutorTriggeredDecommission
+
+/**
+ * For the Standalone workloads.
+ * @param workerHost When workerHost is defined, it means the worker has been 
decommissioned too.
+ *                   Used to infer if the shuffle data might be lost even if 
the external shuffle
+ *                   service is enabled.
+ */
+case class StandaloneDecommission(workerHost: Option[String] = None)
+  extends ExecutorDecommissionReason {
+  override val reason: String = if (workerHost.isDefined) {
+    s"Worker ${workerHost.get} decommissioned"
+  } else {
+    "decommissioned"
+  }
+}
+
+/**

Review comment:
       Can you move this test only class somewhere in the test only package ? 
   
   See TestResourceIDs as an example.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to