This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 18892687ffd98be35fc0f2012df5aae9c99a034e
Author: Benjamin Bannier <bbann...@apache.org>
AuthorDate: Mon Jul 15 10:26:10 2019 -0700

    Adjusted task status updates during draining.
    
    When a task is reported as killed to the agent during active agent
    draining we now decorate the reported status with
    `REASON_AGENT_DRAINING` unconditionally. If the draining marks the agent
    as gone via the `mark_gone` draining flag we additionally report
    `TASK_GONE_BY_OPERATOR` instead of the original state.
    
    This patch leaves some ambiguity in what triggered the kill since the
    agent-executor protocol does not transport reasons; instead
    the reason is here only inferred after the killed task has
    been observed. This should usually be fine since due to the inherit race
    between e.g., any user- and drain-triggered kill a user cannot
    distinguish racy reasons.
    
    Review: https://reviews.apache.org/r/70936/
---
 src/slave/slave.cpp       | 34 ++++++++++++++++++++++++++++++++++
 src/tests/slave_tests.cpp | 44 +++++++++++++++++++++++++++++++++++---------
 2 files changed, 69 insertions(+), 9 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 3878ab8..37385bd 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -5698,6 +5698,40 @@ void Slave::statusUpdate(StatusUpdate update, const 
Option<UPID>& pid)
   update.mutable_status()->set_source(
       pid == UPID() ? TaskStatus::SOURCE_SLAVE : TaskStatus::SOURCE_EXECUTOR);
 
+  // If the agent is draining we provide additional
+  // information for KILLING or KILLED states.
+  if (drainConfig.isSome()) {
+    switch (update.status().state()) {
+      case TASK_STAGING:
+      case TASK_STARTING:
+      case TASK_RUNNING:
+      case TASK_FAILED:
+      case TASK_FINISHED:
+      case TASK_ERROR:
+      case TASK_LOST:
+      case TASK_DROPPED:
+      case TASK_UNREACHABLE:
+      case TASK_GONE:
+      case TASK_GONE_BY_OPERATOR:
+      case TASK_UNKNOWN: {
+        break;
+      }
+      case TASK_KILLING:
+      case TASK_KILLED: {
+        // We unconditionally overwrite any previous reason to provide a
+        // consistent signal that this task went away during draining.
+        update.mutable_status()->set_reason(TaskStatus::REASON_SLAVE_DRAINING);
+
+        // If the draining marks the agent as gone report tasks as
+        // gone by operator.
+        if (drainConfig->mark_gone()) {
+          update.mutable_status()->set_state(TASK_GONE_BY_OPERATOR);
+        }
+        break;
+      }
+    }
+  }
+
   // Set TaskStatus.executor_id if not already set; overwrite existing
   // value if already set.
   if (update.has_executor_id()) {
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 5f8e53c..95f7780 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -11989,7 +11989,9 @@ TEST_F(SlaveTest, DrainAgentKillsRunningTask)
 
   AWAIT_READY(killedUpdate);
 
-  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+  EXPECT_EQ(v1::TASK_GONE_BY_OPERATOR, killedUpdate->status().state());
+  EXPECT_EQ(
+      v1::TaskStatus::REASON_AGENT_DRAINING, killedUpdate->status().reason());
 }
 
 
@@ -12108,7 +12110,9 @@ TEST_F(SlaveTest, DrainAgentKillsQueuedTask)
 
   AWAIT_READY(killedUpdate);
 
-  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+  EXPECT_EQ(v1::TASK_GONE_BY_OPERATOR, killedUpdate->status().state());
+  EXPECT_EQ(
+      v1::TaskStatus::REASON_AGENT_DRAINING, killedUpdate->status().reason());
 }
 
 
@@ -12213,7 +12217,9 @@ TEST_F(SlaveTest, DrainAgentKillsPendingTask)
 
   AWAIT_READY(killedUpdate);
 
-  EXPECT_EQ(v1::TASK_KILLED, killedUpdate->status().state());
+  EXPECT_EQ(v1::TASK_GONE_BY_OPERATOR, killedUpdate->status().state());
+  EXPECT_EQ(
+      v1::TaskStatus::REASON_AGENT_DRAINING, killedUpdate->status().reason());
 }
 
 
@@ -12228,6 +12234,10 @@ TEST_F(SlaveTest, CheckpointedDrainInfo)
 
   slave::Flags slaveFlags = CreateSlaveFlags();
 
+  // Make the executor reregistration timeout less than the agent's
+  // registration backoff factor to avoid resent status updates.
+  slaveFlags.executor_reregistration_timeout = Milliseconds(2);
+
   ExecutorID executorId = DEFAULT_EXECUTOR_ID;
   MockExecutor exec(executorId);
   TestContainerizer containerizer(&exec);
@@ -12253,7 +12263,9 @@ TEST_F(SlaveTest, CheckpointedDrainInfo)
   MesosSchedulerDriver driver(
       &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
 
-  EXPECT_CALL(sched, registered(_, _, _));
+  Future<Nothing> frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(FutureSatisfy(&frameworkId));
 
   Future<vector<Offer>> offers;
   EXPECT_CALL(sched, resourceOffers(_, _))
@@ -12262,6 +12274,8 @@ TEST_F(SlaveTest, CheckpointedDrainInfo)
 
   driver.start();
 
+  AWAIT_READY(frameworkId);
+
   AWAIT_READY(offers);
   ASSERT_FALSE(offers->empty());
 
@@ -12280,8 +12294,10 @@ TEST_F(SlaveTest, CheckpointedDrainInfo)
   DurationInfo maxGracePeriod;
   maxGracePeriod.set_nanoseconds(GRACE_PERIOD_NANOS);
 
+  // We do not mark the agent as gone in contrast to some other tests here to
+  // validate that we observe `TASK_KILLED` instead of `TASK_GONE_BY_OPERATOR`.
   DrainConfig drainConfig;
-  drainConfig.set_mark_gone(true);
+  drainConfig.set_mark_gone(false);
   drainConfig.mutable_max_grace_period()->CopyFrom(maxGracePeriod);
 
   DrainSlaveMessage drainSlaveMessage;
@@ -12317,20 +12333,30 @@ TEST_F(SlaveTest, CheckpointedDrainInfo)
 
   // Once the agent has finished recovering executors it should send
   // another task kill request to the executor.
-  Future<Nothing> killTask2;
   EXPECT_CALL(exec, killTask(_, _))
-    .WillOnce(FutureSatisfy(&killTask2));
+    .WillOnce(SendStatusUpdateFromTaskID(TASK_KILLED));
 
   // Restart the agent.
   slave.get()->terminate();
+
+  Future<TaskStatus> statusKilled;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusKilled))
+    .WillRepeatedly(Return()); // Ignore resent updates.
+
   slave = StartSlave(&detector, &containerizer, slaveFlags);
 
   AWAIT_READY(reregistered);
 
-  // Advance the clock to finish the executor reregistration phase.
+  // Advance the clock to finish the executor and agent reregistration phases.
   Clock::advance(slaveFlags.executor_reregistration_timeout);
+  Clock::settle();
 
-  AWAIT_READY(killTask2);
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(statusKilled);
+  EXPECT_EQ(TASK_KILLED, statusKilled->state());
+  EXPECT_EQ(TaskStatus::REASON_SLAVE_DRAINING, statusKilled->reason());
 }
 
 } // namespace tests {

Reply via email to