[ https://issues.apache.org/jira/browse/MESOS-2863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14742228#comment-14742228 ]
Vaibhav Khanduja commented on MESOS-2863: ----------------------------------------- Here is the code diff, which is more of a corrective measure .. and not preventive (which i would prefer). diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp index 50b3c6e..8d1fcc1 100644 --- a/src/launcher/executor.cpp +++ b/src/launcher/executor.cpp @@ -80,6 +80,7 @@ public: : launched(false), killed(false), killedByHealthCheck(false), + state(TASK_STARTING), pid(-1), healthPid(-1), escalationTimeout(slave::EXECUTOR_SIGNAL_ESCALATION_TIMEOUT), @@ -111,13 +112,8 @@ public: void launchTask(ExecutorDriver* driver, const TaskInfo& task) { if (launched) { - TaskStatus status; - status.mutable_task_id()->MergeFrom(task.task_id()); - status.set_state(TASK_FAILED); - status.set_message( - "Attempted to run multiple tasks using a \"command\" executor"); - - driver->sendStatusUpdate(status); + sendStatusUpdate(driver, TASK_FAILED, task.task_id(), + string("Attempted to run multiple tasks using a \"command\" executor")); return; } @@ -281,11 +277,7 @@ public: pid, lambda::_1)); - TaskStatus status; - status.mutable_task_id()->MergeFrom(task.task_id()); - status.set_state(TASK_RUNNING); - driver->sendStatusUpdate(status); - + sendStatusUpdate(driver, TASK_RUNNING, task.task_id(),string("")); launched = true; } @@ -363,7 +355,7 @@ protected: status.set_healthy(healthy); status.set_state(TASK_RUNNING); driver.get()->sendStatusUpdate(status); - + if (initiateTaskKill) { killedByHealthCheck = true; killTask(driver.get(), taskID); @@ -372,54 +364,74 @@ protected: private: + void sendStatusUpdate( + ExecutorDriver* driver, + TaskState inState, + const TaskID& taskId, + const std::string& message) + { + // If last state has been TASK_FINISHED or TASK_KILLED, + // then do not send a different state again. Send out the + // last state. + // Check is new state is greater than last state, then + // no need to send a state. One logic would not to send + // a state, but then receiver with time out. + if ((inState < state) || + (state == TASK_FINISHED) || + (state == TASK_KILLED)) { + inState = state; + } + TaskStatus taskStatus; + taskStatus.mutable_task_id()->MergeFrom(taskId); + taskStatus.set_state(inState); + taskStatus.set_message(message); + if (killed && killedByHealthCheck) { + taskStatus.set_healthy(false); + } + driver->sendStatusUpdate(taskStatus); + // Maintain the last state sent. + state = inState; + } + void reaped( ExecutorDriver* driver, const TaskID& taskId, pid_t pid, const Future<Option<int> >& status_) { - TaskState state; string message; + TaskState lstate; Clock::cancel(escalationTimer); if (!status_.isReady()) { - state = TASK_FAILED; + lstate = TASK_FAILED; message = "Failed to get exit status for Command: " + (status_.isFailed() ? status_.failure() : "future discarded"); } else if (status_.get().isNone()) { - state = TASK_FAILED; + lstate = TASK_FAILED; message = "Failed to get exit status for Command"; } else { int status = status_.get().get(); CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << status; if (WIFEXITED(status) && WEXITSTATUS(status) == 0) { - state = TASK_FINISHED; + lstate = TASK_FINISHED; } else if (killed) { // Send TASK_KILLED if the task was killed as a result of // killTask() or shutdown(). - state = TASK_KILLED; + lstate = TASK_KILLED; } else { - state = TASK_FAILED; + lstate = TASK_FAILED; } message = "Command " + WSTRINGIFY(status); } cout << message << " (pid: " << pid << ")" << endl; - - TaskStatus taskStatus; - taskStatus.mutable_task_id()->MergeFrom(taskId); - taskStatus.set_state(state); - taskStatus.set_message(message); - if (killed && killedByHealthCheck) { - taskStatus.set_healthy(false); - } - - driver->sendStatusUpdate(taskStatus); - + + sendStatusUpdate(driver, lstate,taskId, message); // A hack for now ... but we need to wait until the status update // is sent to the slave before we shut ourselves down. os::sleep(Seconds(1)); @@ -494,6 +506,7 @@ private: bool launched; bool killed; bool killedByHealthCheck; + TaskState state; pid_t pid; pid_t healthPid; Duration escalationTimeout; > Command executor can send TASK_KILLED after TASK_FINISHED > --------------------------------------------------------- > > Key: MESOS-2863 > URL: https://issues.apache.org/jira/browse/MESOS-2863 > Project: Mesos > Issue Type: Bug > Reporter: Vinod Kone > Assignee: Vaibhav Khanduja > Labels: newbie++ > > Observed this while doing some tests in our test cluster. > If the command executor gets a shutdown() (e.g., framework unregistered) > after sending TASK_FINISHED but before exiting (there is a forced sleep), it > could send a TASK_KILLED update to the slave. > Ideally the command executor should not send multiple terminal updates. -- This message was sent by Atlassian JIRA (v6.3.4#6332)