Repository: mesos Updated Branches: refs/heads/master 9303db67c -> d2b3d365c
Updated default executor to send TASK_KILLED updates. Just like LAUNCH_GROUP implementation this is a dummy implementation that sends the TASK_KILLED upates without doing any actual kills. Review: https://reviews.apache.org/r/52108 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d2b3d365 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d2b3d365 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d2b3d365 Branch: refs/heads/master Commit: d2b3d365c9c121d955d99f9dcfda381de339c917 Parents: e28f197 Author: Vinod Kone <vinodk...@gmail.com> Authored: Tue Sep 20 18:11:21 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Tue Sep 20 20:27:11 2016 -0700 ---------------------------------------------------------------------- src/launcher/default_executor.cpp | 36 ++++++- src/tests/default_executor_tests.cpp | 170 ++++++++++++++++++++++++++++++ 2 files changed, 201 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/d2b3d365/src/launcher/default_executor.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp index ef63edc..369db3e 100644 --- a/src/launcher/default_executor.cpp +++ b/src/launcher/default_executor.cpp @@ -65,7 +65,8 @@ public: launched(false), frameworkInfo(None()), frameworkId(_frameworkId), - executorId(_executorId) {} + executorId(_executorId), + taskGroup(None()) {} virtual ~DefaultExecutor() = default; @@ -110,7 +111,7 @@ public: } case Event::KILL: { - // TODO(anand): Implement this. + killTask(event.kill().task_id()); break; } @@ -206,21 +207,45 @@ protected: } launched = true; + taskGroup = _taskGroup; - foreach (const TaskInfo& task, _taskGroup.tasks()) { + foreach (const TaskInfo& task, taskGroup->tasks()) { tasks[task.task_id()] = task; } // Send a TASK_RUNNING status update followed immediately by a // TASK_FINISHED update. // - // TODO(anand): Eventually, we need to invoke the `LAUNCH_CONTAINER` + // TODO(anand): Eventually, we need to invoke the `LAUNCH_NESTED_CONTAINER` // call via the Agent API. - foreach (const TaskInfo& task, _taskGroup.tasks()) { + foreach (const TaskInfo& task, taskGroup->tasks()) { update(task.task_id(), TASK_RUNNING); } } + void killTask(const TaskID& taskId) + { + CHECK_EQ(SUBSCRIBED, state); + + cout << "Received kill for task '" << taskId << "'"; + + // Send TASK_KILLED updates for all tasks in the group. + // + // TODO(vinod): We need to send `KILL_NESTED_CONTAINER` call to the Agent + // API for each of the tasks and wait for `WAIT_NESTED_CONTAINER` responses. + + CHECK_SOME(taskGroup); // Should not get a `KILL` before `LAUNCH_GROUP`. + foreach (const TaskInfo& task, taskGroup->tasks()) { + update(task.task_id(), TASK_KILLED); + } + + // TODO(qianzhang): Remove this hack since the executor now receives + // acknowledgements for status updates. The executor can terminate + // after it receives an ACK for a terminal status update. + os::sleep(Seconds(1)); + terminate(self()); + } + private: void update( const TaskID& taskId, @@ -270,6 +295,7 @@ private: const FrameworkID frameworkId; const ExecutorID executorId; Owned<Mesos> mesos; + Option<TaskGroupInfo> taskGroup; LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates. LinkedHashMap<TaskID, TaskInfo> tasks; // Unacknowledged tasks. }; http://git-wip-us.apache.org/repos/asf/mesos/blob/d2b3d365/src/tests/default_executor_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp index 6a0b011..786fbe3 100644 --- a/src/tests/default_executor_tests.cpp +++ b/src/tests/default_executor_tests.cpp @@ -145,6 +145,176 @@ TEST_F(DefaultExecutorTest, TaskRunning) EXPECT_TRUE(update->status().healthy()); } + +// This test verifies that if the default executor is asked +// to kill a task from a task group, it kills all tasks in +// the group and sends TASK_KILLED updates for them. +TEST_F(DefaultExecutorTest, KillTask) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + auto scheduler = std::make_shared<MockV1HTTPScheduler>(); + + Resources resources = + Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + + ExecutorInfo executorInfo; + executorInfo.set_type(ExecutorInfo::DEFAULT); + + executorInfo.mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID); + executorInfo.mutable_resources()->CopyFrom(resources); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)); + + scheduler::TestV1Mesos mesos( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + { + Call call; + call.set_type(Call::SUBSCRIBE); + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(evolve(frameworkInfo)); + + mesos.send(call); + } + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + // Update `executorInfo` with the subscribed `frameworkId`. + executorInfo.mutable_framework_id()->CopyFrom(devolve(frameworkId)); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + + Future<v1::scheduler::Event::Update> runningUpdate1; + Future<v1::scheduler::Event::Update> runningUpdate2; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&runningUpdate1)) + .WillOnce(FutureArg<1>(&runningUpdate2)); + + const v1::Offer& offer = offers->offers(0); + const SlaveID slaveId = devolve(offer.agent_id()); + + v1::TaskInfo taskInfo1 = + evolve(createTask(slaveId, resources, "sleep 1000")); + + v1::TaskInfo taskInfo2 = + evolve(createTask(slaveId, resources, "sleep 1000")); + + v1::TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(taskInfo1); + taskGroup.add_tasks()->CopyFrom(taskInfo2); + + { + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(Call::ACCEPT); + + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer.id()); + + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); + + v1::Offer::Operation::LaunchGroup* launchGroup = + operation->mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(evolve(executorInfo)); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + mesos.send(call); + } + + AWAIT_READY(runningUpdate1); + ASSERT_EQ(TASK_RUNNING, runningUpdate1->status().state()); + EXPECT_EQ(taskInfo1.task_id(), runningUpdate1->status().task_id()); + + AWAIT_READY(runningUpdate2); + ASSERT_EQ(TASK_RUNNING, runningUpdate2->status().state()); + EXPECT_EQ(taskInfo2.task_id(), runningUpdate2->status().task_id()); + + // Acknowledge the TASK_RUNNING updates to receive the next updates. + + { + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(Call::ACKNOWLEDGE); + + Call::Acknowledge* acknowledge = call.mutable_acknowledge(); + acknowledge->mutable_task_id()->CopyFrom(taskInfo1.task_id()); + acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id()); + acknowledge->set_uuid(runningUpdate1->status().uuid()); + + mesos.send(call); + } + + { + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(Call::ACKNOWLEDGE); + + Call::Acknowledge* acknowledge = call.mutable_acknowledge(); + acknowledge->mutable_task_id()->CopyFrom(taskInfo2.task_id()); + acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id()); + acknowledge->set_uuid(runningUpdate2->status().uuid()); + + mesos.send(call); + } + + Future<v1::scheduler::Event::Update> killedUpdate1; + Future<v1::scheduler::Event::Update> killedUpdate2; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&killedUpdate1)) + .WillOnce(FutureArg<1>(&killedUpdate2)); + + // Now kill one task in the task group. + { + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(Call::KILL); + + Call::Kill* kill = call.mutable_kill(); + kill->mutable_task_id()->CopyFrom(taskInfo1.task_id()); + + mesos.send(call); + } + + // All the tasks in the task group should be killed. + + AWAIT_READY(killedUpdate1); + ASSERT_EQ(TASK_KILLED, killedUpdate1->status().state()); + EXPECT_EQ(taskInfo1.task_id(), killedUpdate1->status().task_id()); + + AWAIT_READY(killedUpdate2); + ASSERT_EQ(TASK_KILLED, killedUpdate2->status().state()); + EXPECT_EQ(taskInfo2.task_id(), killedUpdate2->status().task_id()); +} + } // namespace tests { } // namespace internal { } // namespace mesos {