This is an automated email from the ASF dual-hosted git repository.

josephwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 0ab27c674b74f62c061bdbd64db30ce0fde1040f
Author: Joseph Wu <josep...@apache.org>
AuthorDate: Tue May 14 14:36:40 2019 +0200

    Changed Agent GET_STATE for completed executor's tasks (Part 2).
    
    This is another code path where an executor can "complete" with
    a non-terminal task (based on last status update).  This happens when
    the Framework or Master TEARDOWN calls are made, for a framework.
    If the executor does not send a TASK_KILLED in time, the agent will
    still mark the executor as complete and kill it.
    
    Unlike the other code path, this one does not require an agent restart.
    The agent in this state will return a GET_STATE response where
    non-terminal tasks of a completed executor (of a completed framework)
    will appear under `launched_tasks`.  This may provide misleading
    information about the total number of tasks running.
    
    This commit adds extra logic to place these non-terminal tasks under
    the `terminated_tasks` category, and adds a regression test.
    
    Review: https://reviews.apache.org/r/70641
---
 src/slave/slave.cpp     |  13 +++++
 src/tests/api_tests.cpp | 123 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 136 insertions(+)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index e966f7c..30039b0 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -6670,6 +6670,19 @@ void Slave::executorTerminated(
           sendExecutorTerminatedStatusUpdate(
               taskId, termination, frameworkId, executor);
         }
+      } else {
+        // When the framework is TERMINATING, we cannot send status updates
+        // for "launched tasks", but these tasks no longer belong in the
+        // `launchedTasks` structure. These tasks will continue to show
+        // in the agent's state (as a completed executor) for some time
+        // after the framework/executor terminates.
+        foreachpair (
+            const TaskID& taskId,
+            Task* task,
+            utils::copy(executor->launchedTasks)) {
+          executor->launchedTasks.erase(taskId);
+          executor->terminatedTasks[taskId] = task;
+        }
       }
 
       // Only send ExitedExecutorMessage if it is not a Command (or
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 5aee49a..61d516c 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -6769,6 +6769,129 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
 }
 
 
+// Checks that the V1 GET_STATE API will correctly categorize a non-terminal
+// task as a completed task, if the task belongs to a completed executor.
+// This variant of the test introduces the non-terminal task via the
+// master TEARDOWN call. A framework TEARDOWN call has similar effects.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(
+    AgentAPITest, TeardownAndGetStateWithNonTerminalCompletedTask)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // Remove this delay so that the agent will immediately kill any tasks.
+  slaveFlags.executor_shutdown_grace_period = Seconds(0);
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &containerizer, slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  Future<TaskStatus> statusRunning;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning));
+
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<Nothing> executorShutdown;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillOnce(FutureSatisfy(&executorShutdown));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+  const Offer& offer = offers.get()[0];
+
+  TaskInfo task = createTask(offer, "", DEFAULT_EXECUTOR_ID);
+
+  driver.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(statusRunning);
+  EXPECT_EQ(TASK_RUNNING, statusRunning->state());
+
+  ContentType contentType = GetParam();
+
+  // Emulate the checkpointed state of an executor where the following occurs:
+  //   1) The operator initiates a TEARDOWN on a running framework.
+  //   2) The master tells any affected agents to shutdown the framework.
+  //   3) The agent shuts down any executors, the executors exit before
+  //      all terminal status updates reach the agent.
+  //      This results in a completed executor, with non-terminal tasks.
+  {
+    v1::master::Call v1Call;
+    v1Call.set_type(v1::master::Call::TEARDOWN);
+
+    v1Call.mutable_teardown()
+      ->mutable_framework_id()->set_value(frameworkId->value());
+
+    Future<http::Response> response = http::post(
+        master.get()->pid,
+        "api/v1",
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+        serialize(contentType, v1Call),
+        stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  }
+
+  AWAIT_READY(executorShutdown);
+
+  driver.stop();
+  driver.join();
+
+  // Non-terminal tasks on completed executors should appear as terminated
+  // tasks, even if they do not have a terminal status update.
+  {
+    v1::agent::Call v1Call;
+    v1Call.set_type(v1::agent::Call::GET_STATE);
+
+    Future<v1::agent::Response> v1Response =
+      post(slave.get()->pid, v1Call, contentType);
+
+    AWAIT_READY(v1Response);
+    ASSERT_TRUE(v1Response->IsInitialized());
+    ASSERT_EQ(v1::agent::Response::GET_STATE, v1Response->type());
+
+    const v1::agent::Response::GetState& getState = v1Response->get_state();
+    EXPECT_TRUE(getState.get_frameworks().frameworks().empty());
+    EXPECT_EQ(1, getState.get_frameworks().completed_frameworks_size());
+    EXPECT_TRUE(getState.get_tasks().launched_tasks().empty());
+    ASSERT_EQ(1, getState.get_tasks().terminated_tasks_size());
+    EXPECT_TRUE(getState.get_executors().executors().empty());
+    EXPECT_EQ(1, getState.get_executors().completed_executors_size());
+
+    // The latest state of this terminated task will not be terminal,
+    // because the executor was not given the chance to send the update.
+    EXPECT_EQ(TASK_RUNNING, getState.get_tasks().terminated_tasks(0).state());
+  }
+}
+
+
 // This test verifies that launch nested container session fails when
 // attaching to the output of the container fails. Consequently, the
 // launched container should be destroyed.

Reply via email to