This is an automated email from the ASF dual-hosted git repository. grag pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 18892687ffd98be35fc0f2012df5aae9c99a034e Author: Benjamin Bannier <bbann...@apache.org> AuthorDate: Mon Jul 15 10:26:10 2019 -0700 Adjusted task status updates during draining. When a task is reported as killed to the agent during active agent draining we now decorate the reported status with `REASON_AGENT_DRAINING` unconditionally. If the draining marks the agent as gone via the `mark_gone` draining flag we additionally report `TASK_GONE_BY_OPERATOR` instead of the original state. This patch leaves some ambiguity in what triggered the kill since the agent-executor protocol does not transport reasons; instead the reason is here only inferred after the killed task has been observed. This should usually be fine since due to the inherit race between e.g., any user- and drain-triggered kill a user cannot distinguish racy reasons. Review: https://reviews.apache.org/r/70936/ --- src/slave/slave.cpp | 34 ++++++++++++++++++++++++++++++++++ src/tests/slave_tests.cpp | 44 +++++++++++++++++++++++++++++++++++--------- 2 files changed, 69 insertions(+), 9 deletions(-) diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 3878ab8..37385bd 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -5698,6 +5698,40 @@ void Slave::statusUpdate(StatusUpdate update, const Option<UPID>& pid) update.mutable_status()->set_source( pid == UPID() ? TaskStatus::SOURCE_SLAVE : TaskStatus::SOURCE_EXECUTOR); + // If the agent is draining we provide additional + // information for KILLING or KILLED states. + if (drainConfig.isSome()) { + switch (update.status().state()) { + case TASK_STAGING: + case TASK_STARTING: + case TASK_RUNNING: + case TASK_FAILED: + case TASK_FINISHED: + case TASK_ERROR: + case TASK_LOST: + case TASK_DROPPED: + case TASK_UNREACHABLE: + case TASK_GONE: + case TASK_GONE_BY_OPERATOR: + case TASK_UNKNOWN: { + break; + } + case TASK_KILLING: + case TASK_KILLED: { + // We unconditionally overwrite any previous reason to provide a + // consistent signal that this task went away during draining. + update.mutable_status()->set_reason(TaskStatus::REASON_SLAVE_DRAINING); + + // If the draining marks the agent as gone report tasks as + // gone by operator. + if (drainConfig->mark_gone()) { + update.mutable_status()->set_state(TASK_GONE_BY_OPERATOR); + } + break; + } + } + } + // Set TaskStatus.executor_id if not already set; overwrite existing // value if already set. if (update.has_executor_id()) { diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp index 5f8e53c..95f7780 100644 --- a/src/tests/slave_tests.cpp +++ b/src/tests/slave_tests.cpp @@ -11989,7 +11989,9 @@ TEST_F(SlaveTest, DrainAgentKillsRunningTask) AWAIT_READY(killedUpdate); - EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state()); + EXPECT_EQ(v1::TASK_GONE_BY_OPERATOR, killedUpdate->status().state()); + EXPECT_EQ( + v1::TaskStatus::REASON_AGENT_DRAINING, killedUpdate->status().reason()); } @@ -12108,7 +12110,9 @@ TEST_F(SlaveTest, DrainAgentKillsQueuedTask) AWAIT_READY(killedUpdate); - EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state()); + EXPECT_EQ(v1::TASK_GONE_BY_OPERATOR, killedUpdate->status().state()); + EXPECT_EQ( + v1::TaskStatus::REASON_AGENT_DRAINING, killedUpdate->status().reason()); } @@ -12213,7 +12217,9 @@ TEST_F(SlaveTest, DrainAgentKillsPendingTask) AWAIT_READY(killedUpdate); - EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state()); + EXPECT_EQ(v1::TASK_GONE_BY_OPERATOR, killedUpdate->status().state()); + EXPECT_EQ( + v1::TaskStatus::REASON_AGENT_DRAINING, killedUpdate->status().reason()); } @@ -12228,6 +12234,10 @@ TEST_F(SlaveTest, CheckpointedDrainInfo) slave::Flags slaveFlags = CreateSlaveFlags(); + // Make the executor reregistration timeout less than the agent's + // registration backoff factor to avoid resent status updates. + slaveFlags.executor_reregistration_timeout = Milliseconds(2); + ExecutorID executorId = DEFAULT_EXECUTOR_ID; MockExecutor exec(executorId); TestContainerizer containerizer(&exec); @@ -12253,7 +12263,9 @@ TEST_F(SlaveTest, CheckpointedDrainInfo) MesosSchedulerDriver driver( &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); - EXPECT_CALL(sched, registered(_, _, _)); + Future<Nothing> frameworkId; + EXPECT_CALL(sched, registered(_, _, _)) + .WillOnce(FutureSatisfy(&frameworkId)); Future<vector<Offer>> offers; EXPECT_CALL(sched, resourceOffers(_, _)) @@ -12262,6 +12274,8 @@ TEST_F(SlaveTest, CheckpointedDrainInfo) driver.start(); + AWAIT_READY(frameworkId); + AWAIT_READY(offers); ASSERT_FALSE(offers->empty()); @@ -12280,8 +12294,10 @@ TEST_F(SlaveTest, CheckpointedDrainInfo) DurationInfo maxGracePeriod; maxGracePeriod.set_nanoseconds(GRACE_PERIOD_NANOS); + // We do not mark the agent as gone in contrast to some other tests here to + // validate that we observe `TASK_KILLED` instead of `TASK_GONE_BY_OPERATOR`. DrainConfig drainConfig; - drainConfig.set_mark_gone(true); + drainConfig.set_mark_gone(false); drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod); DrainSlaveMessage drainSlaveMessage; @@ -12317,20 +12333,30 @@ TEST_F(SlaveTest, CheckpointedDrainInfo) // Once the agent has finished recovering executors it should send // another task kill request to the executor. - Future<Nothing> killTask2; EXPECT_CALL(exec, killTask(_, _)) - .WillOnce(FutureSatisfy(&killTask2)); + .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED)); // Restart the agent. slave.get()->terminate(); + + Future<TaskStatus> statusKilled; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&statusKilled)) + .WillRepeatedly(Return()); // Ignore resent updates. + slave = StartSlave(&detector, &containerizer, slaveFlags); AWAIT_READY(reregistered); - // Advance the clock to finish the executor reregistration phase. + // Advance the clock to finish the executor and agent reregistration phases. Clock::advance(slaveFlags.executor_reregistration_timeout); + Clock::settle(); - AWAIT_READY(killTask2); + Clock::advance(slaveFlags.registration_backoff_factor); + + AWAIT_READY(statusKilled); + EXPECT_EQ(TASK_KILLED, statusKilled->state()); + EXPECT_EQ(TaskStatus::REASON_SLAVE_DRAINING, statusKilled->reason()); } } // namespace tests {