Repository: mesos
Updated Branches:
  refs/heads/master 9303db67c -> d2b3d365c


Updated default executor to send TASK_KILLED updates.

Just like LAUNCH_GROUP implementation this is a dummy implementation
that sends the TASK_KILLED upates without doing any actual kills.

Review: https://reviews.apache.org/r/52108


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d2b3d365
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d2b3d365
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d2b3d365

Branch: refs/heads/master
Commit: d2b3d365c9c121d955d99f9dcfda381de339c917
Parents: e28f197
Author: Vinod Kone <vinodk...@gmail.com>
Authored: Tue Sep 20 18:11:21 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Tue Sep 20 20:27:11 2016 -0700

----------------------------------------------------------------------
 src/launcher/default_executor.cpp    |  36 ++++++-
 src/tests/default_executor_tests.cpp | 170 ++++++++++++++++++++++++++++++
 2 files changed, 201 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d2b3d365/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp 
b/src/launcher/default_executor.cpp
index ef63edc..369db3e 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -65,7 +65,8 @@ public:
       launched(false),
       frameworkInfo(None()),
       frameworkId(_frameworkId),
-      executorId(_executorId) {}
+      executorId(_executorId),
+      taskGroup(None()) {}
 
   virtual ~DefaultExecutor() = default;
 
@@ -110,7 +111,7 @@ public:
       }
 
       case Event::KILL: {
-        // TODO(anand): Implement this.
+        killTask(event.kill().task_id());
         break;
       }
 
@@ -206,21 +207,45 @@ protected:
     }
 
     launched = true;
+    taskGroup = _taskGroup;
 
-    foreach (const TaskInfo& task, _taskGroup.tasks()) {
+    foreach (const TaskInfo& task, taskGroup->tasks()) {
       tasks[task.task_id()] = task;
     }
 
     // Send a TASK_RUNNING status update followed immediately by a
     // TASK_FINISHED update.
     //
-    // TODO(anand): Eventually, we need to invoke the `LAUNCH_CONTAINER`
+    // TODO(anand): Eventually, we need to invoke the `LAUNCH_NESTED_CONTAINER`
     // call via the Agent API.
-    foreach (const TaskInfo& task, _taskGroup.tasks()) {
+    foreach (const TaskInfo& task, taskGroup->tasks()) {
       update(task.task_id(), TASK_RUNNING);
     }
   }
 
+  void killTask(const TaskID& taskId)
+  {
+    CHECK_EQ(SUBSCRIBED, state);
+
+    cout << "Received kill for task '" << taskId << "'";
+
+    // Send TASK_KILLED updates for all tasks in the group.
+    //
+    // TODO(vinod): We need to send `KILL_NESTED_CONTAINER` call to the Agent
+    // API for each of the tasks and wait for `WAIT_NESTED_CONTAINER` 
responses.
+
+    CHECK_SOME(taskGroup); // Should not get a `KILL` before `LAUNCH_GROUP`.
+    foreach (const TaskInfo& task, taskGroup->tasks()) {
+      update(task.task_id(), TASK_KILLED);
+    }
+
+    // TODO(qianzhang): Remove this hack since the executor now receives
+    // acknowledgements for status updates. The executor can terminate
+    // after it receives an ACK for a terminal status update.
+    os::sleep(Seconds(1));
+    terminate(self());
+  }
+
 private:
   void update(
       const TaskID& taskId,
@@ -270,6 +295,7 @@ private:
   const FrameworkID frameworkId;
   const ExecutorID executorId;
   Owned<Mesos> mesos;
+  Option<TaskGroupInfo> taskGroup;
   LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
   LinkedHashMap<TaskID, TaskInfo> tasks; // Unacknowledged tasks.
 };

http://git-wip-us.apache.org/repos/asf/mesos/blob/d2b3d365/src/tests/default_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/default_executor_tests.cpp 
b/src/tests/default_executor_tests.cpp
index 6a0b011..786fbe3 100644
--- a/src/tests/default_executor_tests.cpp
+++ b/src/tests/default_executor_tests.cpp
@@ -145,6 +145,176 @@ TEST_F(DefaultExecutorTest, TaskRunning)
   EXPECT_TRUE(update->status().healthy());
 }
 
+
+// This test verifies that if the default executor is asked
+// to kill a task from a task group, it kills all tasks in
+// the group and sends TASK_KILLED updates for them.
+TEST_F(DefaultExecutorTest, KillTask)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+
+  Resources resources =
+    Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+
+  ExecutorInfo executorInfo;
+  executorInfo.set_type(ExecutorInfo::DEFAULT);
+
+  executorInfo.mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID);
+  executorInfo.mutable_resources()->CopyFrom(resources);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+  ASSERT_SOME(slave);
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  scheduler::TestV1Mesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(evolve(frameworkInfo));
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Update `executorInfo` with the subscribed `frameworkId`.
+  executorInfo.mutable_framework_id()->CopyFrom(devolve(frameworkId));
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+
+  Future<v1::scheduler::Event::Update> runningUpdate1;
+  Future<v1::scheduler::Event::Update> runningUpdate2;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&runningUpdate1))
+    .WillOnce(FutureArg<1>(&runningUpdate2));
+
+  const v1::Offer& offer = offers->offers(0);
+  const SlaveID slaveId = devolve(offer.agent_id());
+
+  v1::TaskInfo taskInfo1 =
+    evolve(createTask(slaveId, resources, "sleep 1000"));
+
+  v1::TaskInfo taskInfo2 =
+    evolve(createTask(slaveId, resources, "sleep 1000"));
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(taskInfo1);
+  taskGroup.add_tasks()->CopyFrom(taskInfo2);
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH_GROUP);
+
+    v1::Offer::Operation::LaunchGroup* launchGroup =
+      operation->mutable_launch_group();
+
+    launchGroup->mutable_executor()->CopyFrom(evolve(executorInfo));
+    launchGroup->mutable_task_group()->CopyFrom(taskGroup);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(runningUpdate1);
+  ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state());
+  EXPECT_EQ(taskInfo1.task_id(), runningUpdate1->status().task_id());
+
+  AWAIT_READY(runningUpdate2);
+  ASSERT_EQ(TASK_RUNNING, runningUpdate2->status().state());
+  EXPECT_EQ(taskInfo2.task_id(), runningUpdate2->status().task_id());
+
+  // Acknowledge the TASK_RUNNING updates to receive the next updates.
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(taskInfo1.task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
+    acknowledge->set_uuid(runningUpdate1->status().uuid());
+
+    mesos.send(call);
+  }
+
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(taskInfo2.task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
+    acknowledge->set_uuid(runningUpdate2->status().uuid());
+
+    mesos.send(call);
+  }
+
+  Future<v1::scheduler::Event::Update> killedUpdate1;
+  Future<v1::scheduler::Event::Update> killedUpdate2;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&killedUpdate1))
+    .WillOnce(FutureArg<1>(&killedUpdate2));
+
+  // Now kill one task in the task group.
+  {
+    Call call;
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.set_type(Call::KILL);
+
+    Call::Kill* kill = call.mutable_kill();
+    kill->mutable_task_id()->CopyFrom(taskInfo1.task_id());
+
+    mesos.send(call);
+  }
+
+  // All the tasks in the task group should be killed.
+
+  AWAIT_READY(killedUpdate1);
+  ASSERT_EQ(TASK_KILLED, killedUpdate1->status().state());
+  EXPECT_EQ(taskInfo1.task_id(), killedUpdate1->status().task_id());
+
+  AWAIT_READY(killedUpdate2);
+  ASSERT_EQ(TASK_KILLED, killedUpdate2->status().state());
+  EXPECT_EQ(taskInfo2.task_id(), killedUpdate2->status().task_id());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

Reply via email to