Filled missing executor info in tasks when `LAUNCH_GROUP`. This fixed the navigate error in Web UI because Web UI uses the executor id of the task to search the corresponding sandbox directory. Web UI uses the task id as the executor id if the executor id of the task is empty when searching the sandbox directory. It works fine when tasks are launched by `CommandExecutor` because the executor id of the task is equal to the task id in this case. However, when tasks are launched by `DefaultExecutor`, the executor id of the task is defined in the framework side and may different with the task id. So we need to fill the `ExecutorInfo` of the `TaskInfo` when `LAUNCH_GROUP` to avoid the Web UI uses incorrect executor id to search sandbox directory.
Review: https://reviews.apache.org/r/52470/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d2da824c Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d2da824c Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d2da824c Branch: refs/heads/master Commit: d2da824cfbfc4242ea4962d763da9726faf7aaca Parents: ebe7ea5 Author: haosdent huang <haosd...@gmail.com> Authored: Fri Oct 14 11:59:32 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Fri Oct 14 11:59:32 2016 -0700 ---------------------------------------------------------------------- include/mesos/mesos.proto | 5 +- include/mesos/v1/mesos.proto | 5 +- src/master/master.cpp | 25 +++++-- src/master/master.hpp | 2 +- src/master/validation.cpp | 15 +++- src/tests/default_executor_tests.cpp | 112 +++++++++++++++++++++++++++++ src/tests/master_validation_tests.cpp | 20 ++++-- 7 files changed, 166 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/include/mesos/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto index 05988d4..d071431 100644 --- a/include/mesos/mesos.proto +++ b/include/mesos/mesos.proto @@ -1436,8 +1436,9 @@ message TaskInfo { * allow the group to be launched "atomically". * * NOTES: - * 1) `TaskInfo.executor` must not be set. - * 2) `NetworkInfo` must not be set inside task's `ContainerInfo`. + * 1) `NetworkInfo` must not be set inside task's `ContainerInfo`. + * 2) `TaskInfo.executor` doesn't need to set. If set, it should match + * `LaunchGroup.executor`. */ message TaskGroupInfo { repeated TaskInfo tasks = 1; http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/include/mesos/v1/mesos.proto ---------------------------------------------------------------------- diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto index 08a536c..74761a0 100644 --- a/include/mesos/v1/mesos.proto +++ b/include/mesos/v1/mesos.proto @@ -1435,8 +1435,9 @@ message TaskInfo { * allow the group to be launched "atomically". * * NOTES: - * 1) `TaskInfo.executor` must not be set. - * 2) `NetworkInfo` must not be set inside task's `ContainerInfo`. + * 1) `NetworkInfo` must not be set inside task's `ContainerInfo`. + * 2) `TaskInfo.executor` doesn't need to set. If set, it should match + * `LaunchGroup.executor`. */ message TaskGroupInfo { repeated TaskInfo tasks = 1; http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 7ef8987..3c6b18e 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3405,13 +3405,15 @@ Resources Master::addTask( void Master::accept( Framework* framework, - const scheduler::Call::Accept& accept) + scheduler::Call::Accept accept) { CHECK_NOTNULL(framework); - foreach (const Offer::Operation& operation, accept.operations()) { - if (operation.type() == Offer::Operation::LAUNCH) { - if (operation.launch().task_infos().size() > 0) { + for (int i = 0; i < accept.operations_size(); ++i) { + Offer::Operation* operation = accept.mutable_operations(i); + + if (operation->type() == Offer::Operation::LAUNCH) { + if (operation->launch().task_infos().size() > 0) { ++metrics->messages_launch_tasks; } else { ++metrics->messages_decline_offers; @@ -3419,6 +3421,21 @@ void Master::accept( << " in ACCEPT call for framework " << framework->id() << " as the launch operation specified no tasks"; } + } else if (operation->type() == Offer::Operation::LAUNCH_GROUP) { + const ExecutorInfo& executor = operation->launch_group().executor(); + + TaskGroupInfo* taskGroup = + operation->mutable_launch_group()->mutable_task_group(); + + // Mutate `TaskInfo` to include `ExecutorInfo` to make it easy + // for operator API and WebUI to get access to the corresponding + // executor for tasks in the task group. + for (int j = 0; j < taskGroup->tasks().size(); ++j) { + TaskInfo* task = taskGroup->mutable_tasks(j); + if (!task->has_executor()) { + task->mutable_executor()->CopyFrom(executor); + } + } } // TODO(jieyu): Add metrics for non launch operations. http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/src/master/master.hpp ---------------------------------------------------------------------- diff --git a/src/master/master.hpp b/src/master/master.hpp index 43518b9..881f0d6 100644 --- a/src/master/master.hpp +++ b/src/master/master.hpp @@ -911,7 +911,7 @@ private: void accept( Framework* framework, - const scheduler::Call::Accept& accept); + scheduler::Call::Accept accept); void _accept( const FrameworkID& frameworkId, http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/src/master/validation.cpp ---------------------------------------------------------------------- diff --git a/src/master/validation.cpp b/src/master/validation.cpp index 480a94b..f690a9e 100644 --- a/src/master/validation.cpp +++ b/src/master/validation.cpp @@ -1011,8 +1011,8 @@ Option<Error> validateTask( // Now do `TaskGroup` specific validation. - if (task.has_executor()) { - return Error("'TaskInfo.executor' must not be set"); + if (!task.has_executor()) { + return Error("'TaskInfo.executor' must be set"); } if (task.has_container()) { @@ -1087,6 +1087,17 @@ Option<Error> validateExecutor( return Error("Docker ContainerInfo is not supported on the executor"); } + // Validate the `ExecutorInfo` in all tasks are same. + + foreach (const TaskInfo& task, taskGroup.tasks()) { + if (task.has_executor() && task.executor() != executor) { + return Error( + "The `ExecutorInfo` of " + "task '" + stringify(task.task_id()) + "' is different from " + "executor '" + stringify(executor.executor_id()) + "'"); + } + } + const Resources& executorResources = executor.resources(); // Validate minimal cpus and memory resources of executor. http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/src/tests/default_executor_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp index 9e0fd67..dc002c6 100644 --- a/src/tests/default_executor_tests.cpp +++ b/src/tests/default_executor_tests.cpp @@ -541,6 +541,118 @@ TEST_F(DefaultExecutorTest, KillTaskGroupOnTaskFailure) ASSERT_EQ(expectedTaskStates, taskStates); } + +// Verifies that a task in a task group with an executor is accepted +// during `TaskGroupInfo` validation. +TEST_P(DefaultExecutorTest, ROOT_TaskUsesExecutor) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + auto scheduler = std::make_shared<MockV1HTTPScheduler>(); + + Resources resources = + Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + + ExecutorInfo executorInfo; + executorInfo.set_type(ExecutorInfo::DEFAULT); + + executorInfo.mutable_executor_id()->CopyFrom(DEFAULT_EXECUTOR_ID); + executorInfo.mutable_resources()->CopyFrom(resources); + + // Disable AuthN on the agent. + slave::Flags flags = CreateSlaveFlags(); + flags.authenticate_http_readwrite = false; + flags.containerizers = GetParam(); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); + ASSERT_SOME(slave); + + Future<Nothing> connected; + EXPECT_CALL(*scheduler, connected(_)) + .WillOnce(FutureSatisfy(&connected)); + + scheduler::TestV1Mesos mesos( + master.get()->pid, ContentType::PROTOBUF, scheduler); + + AWAIT_READY(connected); + + Future<v1::scheduler::Event::Subscribed> subscribed; + EXPECT_CALL(*scheduler, subscribed(_, _)) + .WillOnce(FutureArg<1>(&subscribed)); + + Future<v1::scheduler::Event::Offers> offers; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); + + EXPECT_CALL(*scheduler, heartbeat(_)) + .WillRepeatedly(Return()); // Ignore heartbeats. + + { + Call call; + call.set_type(Call::SUBSCRIBE); + Call::Subscribe* subscribe = call.mutable_subscribe(); + subscribe->mutable_framework_info()->CopyFrom(evolve(frameworkInfo)); + + mesos.send(call); + } + + AWAIT_READY(subscribed); + + v1::FrameworkID frameworkId(subscribed->framework_id()); + + // Update `executorInfo` with the subscribed `frameworkId`. + executorInfo.mutable_framework_id()->CopyFrom(devolve(frameworkId)); + + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + + Future<v1::scheduler::Event::Update> update; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&update)); + + const v1::Offer& offer = offers->offers(0); + const SlaveID slaveId = devolve(offer.agent_id()); + + v1::TaskInfo taskInfo = + evolve(createTask(slaveId, resources, "sleep 1000")); + + taskInfo.mutable_executor()->CopyFrom(evolve(executorInfo)); + + v1::TaskGroupInfo taskGroup; + taskGroup.add_tasks()->CopyFrom(taskInfo); + + { + Call call; + call.mutable_framework_id()->CopyFrom(frameworkId); + call.set_type(Call::ACCEPT); + + Call::Accept* accept = call.mutable_accept(); + accept->add_offer_ids()->CopyFrom(offer.id()); + + v1::Offer::Operation* operation = accept->add_operations(); + operation->set_type(v1::Offer::Operation::LAUNCH_GROUP); + + v1::Offer::Operation::LaunchGroup* launchGroup = + operation->mutable_launch_group(); + + launchGroup->mutable_executor()->CopyFrom(evolve(executorInfo)); + launchGroup->mutable_task_group()->CopyFrom(taskGroup); + + mesos.send(call); + } + + AWAIT_READY(update); + + ASSERT_EQ(TASK_RUNNING, update->status().state()); + EXPECT_EQ(taskInfo.task_id(), update->status().task_id()); + EXPECT_TRUE(update->status().has_timestamp()); +} + } // namespace tests { } // namespace internal { } // namespace mesos { http://git-wip-us.apache.org/repos/asf/mesos/blob/d2da824c/src/tests/master_validation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp index 0f8d33b..da43f99 100644 --- a/src/tests/master_validation_tests.cpp +++ b/src/tests/master_validation_tests.cpp @@ -2105,9 +2105,9 @@ TEST_F(TaskGroupValidationTest, TaskUsesNetworkInfo) } -// Ensures that a task in a task group with an executor +// Ensures that a task in a task group with a different executor // is rejected during `TaskGroupInfo` validation. -TEST_F(TaskGroupValidationTest, TaskUsesExecutor) +TEST_F(TaskGroupValidationTest, TaskUsesDifferentExecutor) { Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); @@ -2116,9 +2116,12 @@ TEST_F(TaskGroupValidationTest, TaskUsesExecutor) Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); ASSERT_SOME(slave); + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.mutable_id()->set_value("Test_Framework"); + MockScheduler sched; MesosSchedulerDriver driver( - &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL); + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); EXPECT_CALL(sched, registered(&driver, _, _)) .Times(1); @@ -2137,23 +2140,25 @@ TEST_F(TaskGroupValidationTest, TaskUsesExecutor) Resources resources = Resources::parse("cpus:1;mem:512;disk:32").get(); ExecutorInfo executor(DEFAULT_EXECUTOR_INFO); + executor.mutable_framework_id()->CopyFrom(frameworkInfo.id()); executor.set_type(ExecutorInfo::CUSTOM); executor.mutable_resources()->CopyFrom(resources); - // Create an invalid task that has executor. + // Create an invalid task that has a different executor. TaskInfo task1; task1.set_name("1"); task1.mutable_task_id()->set_value("1"); task1.mutable_slave_id()->MergeFrom(offer.slave_id()); task1.mutable_resources()->MergeFrom(resources); task1.mutable_executor()->MergeFrom(executor); + task1.mutable_executor()->set_type(ExecutorInfo::DEFAULT); // Create a valid task. TaskInfo task2; task2.set_name("2"); task2.mutable_task_id()->set_value("2"); task2.mutable_slave_id()->MergeFrom(offer.slave_id()); - task1.mutable_resources()->MergeFrom(resources); + task2.mutable_resources()->MergeFrom(resources); TaskGroupInfo taskGroup; taskGroup.add_tasks()->CopyFrom(task1); @@ -2180,8 +2185,9 @@ TEST_F(TaskGroupValidationTest, TaskUsesExecutor) EXPECT_EQ(task1.task_id(), task1Status->task_id()); EXPECT_EQ(TASK_ERROR, task1Status->state()); EXPECT_EQ(TaskStatus::REASON_TASK_GROUP_INVALID, task1Status->reason()); - EXPECT_EQ("Task '1' is invalid: 'TaskInfo.executor' must not be set", - task1Status->message()); + EXPECT_EQ( + "The `ExecutorInfo` of task '1' is different from executor 'default'", + task1Status->message()); AWAIT_READY(task2Status); EXPECT_EQ(task2.task_id(), task2Status->task_id());