[ 
https://issues.apache.org/jira/browse/MESOS-2863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14742228#comment-14742228
 ] 

Vaibhav Khanduja commented on MESOS-2863:
-----------------------------------------

Here is the code diff, which is more of a corrective measure .. and not 
preventive  (which i would prefer).

diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 50b3c6e..8d1fcc1 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -80,6 +80,7 @@ public:
     : launched(false),
       killed(false),
       killedByHealthCheck(false),
+      state(TASK_STARTING),
       pid(-1),
       healthPid(-1),
       escalationTimeout(slave::EXECUTOR_SIGNAL_ESCALATION_TIMEOUT),
@@ -111,13 +112,8 @@ public:
   void launchTask(ExecutorDriver* driver, const TaskInfo& task)
   {
     if (launched) {
-      TaskStatus status;
-      status.mutable_task_id()->MergeFrom(task.task_id());
-      status.set_state(TASK_FAILED);
-      status.set_message(
-          "Attempted to run multiple tasks using a \"command\" executor");
-
-      driver->sendStatusUpdate(status);
+      sendStatusUpdate(driver, TASK_FAILED, task.task_id(),
+        string("Attempted to run multiple tasks using a \"command\" 
executor"));
       return;
     }
 
@@ -281,11 +277,7 @@ public:
                    pid,
                    lambda::_1));
 
-    TaskStatus status;
-    status.mutable_task_id()->MergeFrom(task.task_id());
-    status.set_state(TASK_RUNNING);
-    driver->sendStatusUpdate(status);
-
+    sendStatusUpdate(driver, TASK_RUNNING, task.task_id(),string(""));
     launched = true;
   }
 
@@ -363,7 +355,7 @@ protected:
     status.set_healthy(healthy);
     status.set_state(TASK_RUNNING);
     driver.get()->sendStatusUpdate(status);
-
+      
     if (initiateTaskKill) {
       killedByHealthCheck = true;
       killTask(driver.get(), taskID);
@@ -372,54 +364,74 @@ protected:
 
 
 private:
+  void sendStatusUpdate(
+    ExecutorDriver* driver,
+    TaskState inState,
+    const TaskID& taskId,
+    const std::string& message)
+  {
+    // If last state has been TASK_FINISHED or TASK_KILLED,
+    // then do not send a different state again. Send out the
+    // last state.
+    // Check is new state is greater than last state, then
+    // no need to send a state. One logic would not to send
+    // a state, but then receiver with time out.
+    if ((inState < state) ||
+        (state == TASK_FINISHED) ||
+        (state == TASK_KILLED)) {
+      inState = state;
+    }
+    TaskStatus taskStatus;
+    taskStatus.mutable_task_id()->MergeFrom(taskId);
+    taskStatus.set_state(inState);
+    taskStatus.set_message(message);
+    if (killed && killedByHealthCheck) {
+      taskStatus.set_healthy(false);
+    }
+    driver->sendStatusUpdate(taskStatus);
+    // Maintain the last state sent.
+    state = inState;
+  }
+    
   void reaped(
       ExecutorDriver* driver,
       const TaskID& taskId,
       pid_t pid,
       const Future<Option<int> >& status_)
   {
-    TaskState state;
     string message;
+    TaskState lstate;
 
     Clock::cancel(escalationTimer);
 
     if (!status_.isReady()) {
-      state = TASK_FAILED;
+      lstate = TASK_FAILED;
       message =
         "Failed to get exit status for Command: " +
         (status_.isFailed() ? status_.failure() : "future discarded");
     } else if (status_.get().isNone()) {
-      state = TASK_FAILED;
+      lstate = TASK_FAILED;
       message = "Failed to get exit status for Command";
     } else {
       int status = status_.get().get();
       CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << status;
 
       if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
-        state = TASK_FINISHED;
+        lstate = TASK_FINISHED;
       } else if (killed) {
         // Send TASK_KILLED if the task was killed as a result of
         // killTask() or shutdown().
-        state = TASK_KILLED;
+        lstate = TASK_KILLED;
       } else {
-        state = TASK_FAILED;
+        lstate = TASK_FAILED;
       }
 
       message = "Command " + WSTRINGIFY(status);
     }
 
     cout << message << " (pid: " << pid << ")" << endl;
-
-    TaskStatus taskStatus;
-    taskStatus.mutable_task_id()->MergeFrom(taskId);
-    taskStatus.set_state(state);
-    taskStatus.set_message(message);
-    if (killed && killedByHealthCheck) {
-      taskStatus.set_healthy(false);
-    }
-
-    driver->sendStatusUpdate(taskStatus);
-
+      
+    sendStatusUpdate(driver, lstate,taskId, message);
     // A hack for now ... but we need to wait until the status update
     // is sent to the slave before we shut ourselves down.
     os::sleep(Seconds(1));
@@ -494,6 +506,7 @@ private:
   bool launched;
   bool killed;
   bool killedByHealthCheck;
+  TaskState state;
   pid_t pid;
   pid_t healthPid;
   Duration escalationTimeout;


> Command executor can send TASK_KILLED after TASK_FINISHED
> ---------------------------------------------------------
>
>                 Key: MESOS-2863
>                 URL: https://issues.apache.org/jira/browse/MESOS-2863
>             Project: Mesos
>          Issue Type: Bug
>            Reporter: Vinod Kone
>            Assignee: Vaibhav Khanduja
>              Labels: newbie++
>
> Observed this while doing some tests in our test cluster.
> If the command executor gets a shutdown() (e.g., framework unregistered) 
> after sending TASK_FINISHED but before exiting (there is a forced sleep), it 
> could send a TASK_KILLED update to the slave.
> Ideally the command executor should not send multiple terminal updates.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to