Repository: mesos Updated Branches: refs/heads/master 623e46791 -> 499173d39
Updated default executor tests. Reorganized so that objects are defined closer to their usage. Review: https://reviews.apache.org/r/57059 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/499173d3 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/499173d3 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/499173d3 Branch: refs/heads/master Commit: 499173d395db40e753e86ec5847b2e3944b87c35 Parents: f38a3f1 Author: Vinod Kone <vinodk...@gmail.com> Authored: Fri Feb 24 17:59:48 2017 -0800 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Tue Mar 7 16:06:24 2017 -0800 ---------------------------------------------------------------------- src/tests/default_executor_tests.cpp | 398 +++++++++++++----------------- 1 file changed, 177 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/499173d3/src/tests/default_executor_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/default_executor_tests.cpp b/src/tests/default_executor_tests.cpp index 88d29b5..a09ae11 100644 --- a/src/tests/default_executor_tests.cpp +++ b/src/tests/default_executor_tests.cpp @@ -105,19 +105,6 @@ TEST_P(DefaultExecutorTest, TaskRunning) Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::Resources resources = - v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_resources()->CopyFrom(resources); - // Disable AuthN on the agent. slave::Flags flags = CreateSlaveFlags(); flags.authenticate_http_readwrite = false; @@ -127,6 +114,8 @@ TEST_P(DefaultExecutorTest, TaskRunning) Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); ASSERT_SOME(slave); + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) .WillOnce(FutureSatisfy(&connected)); @@ -154,25 +143,26 @@ TEST_P(DefaultExecutorTest, TaskRunning) Call call; call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); mesos.send(call); } AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - // Update `executorInfo` with the subscribed `frameworkId`. + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + executorInfo.mutable_resources()->CopyFrom(resources); AWAIT_READY(offers); EXPECT_NE(0, offers->offers().size()); - Future<v1::scheduler::Event::Update> update; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&update)); - const v1::Offer& offer = offers->offers(0); const v1::AgentID agentId = offer.agent_id(); @@ -182,6 +172,10 @@ TEST_P(DefaultExecutorTest, TaskRunning) v1::TaskGroupInfo taskGroup; taskGroup.add_tasks()->CopyFrom(taskInfo); + Future<v1::scheduler::Event::Update> update; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&update)); + { Call call; call.mutable_framework_id()->CopyFrom(frameworkId); @@ -247,19 +241,6 @@ TEST_P(DefaultExecutorTest, KillTask) Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::Resources resources = - v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_resources()->CopyFrom(resources); - // Disable AuthN on the agent. slave::Flags flags = CreateSlaveFlags(); flags.authenticate_http_readwrite = false; @@ -269,6 +250,8 @@ TEST_P(DefaultExecutorTest, KillTask) Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); ASSERT_SOME(slave); + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) .WillOnce(FutureSatisfy(&connected)); @@ -295,32 +278,26 @@ TEST_P(DefaultExecutorTest, KillTask) Call call; call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); mesos.send(call); } AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - // Update `executorInfo` with the subscribed `frameworkId`. + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + executorInfo.mutable_resources()->CopyFrom(resources); AWAIT_READY(offers1); EXPECT_NE(0, offers1->offers().size()); - Future<v1::scheduler::Event::Offers> offers2; - EXPECT_CALL(*scheduler, offers(_, _)) - .WillOnce(FutureArg<1>(&offers2)) - .WillRepeatedly(Return()); - - Future<v1::scheduler::Event::Update> runningUpdate1; - Future<v1::scheduler::Event::Update> runningUpdate2; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&runningUpdate1)) - .WillOnce(FutureArg<1>(&runningUpdate2)); - const v1::Offer& offer1 = offers1->offers(0); const v1::AgentID agentId = offer1.agent_id(); @@ -336,6 +313,17 @@ TEST_P(DefaultExecutorTest, KillTask) const hashset<v1::TaskID> tasks1{taskInfo1.task_id(), taskInfo2.task_id()}; + Future<v1::scheduler::Event::Update> runningUpdate1; + Future<v1::scheduler::Event::Update> runningUpdate2; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&runningUpdate1)) + .WillOnce(FutureArg<1>(&runningUpdate2)); + + Future<v1::scheduler::Event::Offers> offers2; + EXPECT_CALL(*scheduler, offers(_, _)) + .WillOnce(FutureArg<1>(&offers2)) + .WillRepeatedly(Return()); + { Call call; call.mutable_framework_id()->CopyFrom(frameworkId); @@ -375,19 +363,18 @@ TEST_P(DefaultExecutorTest, KillTask) ASSERT_EQ(tasks1, tasksRunning); AWAIT_READY(offers2); - const v1::Offer& offer2 = offers2->offers(0); - Future<v1::scheduler::Event::Update> runningUpdate3; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&runningUpdate3)); - v1::TaskInfo taskInfo3 = v1::createTask(agentId, resources, SLEEP_COMMAND(1000)); v1::TaskGroupInfo taskGroup2; taskGroup2.add_tasks()->CopyFrom(taskInfo3); + Future<v1::scheduler::Event::Update> runningUpdate3; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&runningUpdate3)); + // Launch the second task group. { Call call; @@ -543,19 +530,6 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure) Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::Resources resources = - v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_resources()->CopyFrom(resources); - // Disable AuthN on the agent. slave::Flags flags = CreateSlaveFlags(); flags.authenticate_http_readwrite = false; @@ -564,6 +538,8 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure) Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); ASSERT_SOME(slave); + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) .WillOnce(FutureSatisfy(&connected)); @@ -591,27 +567,26 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure) Call call; call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); mesos.send(call); } AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - // Update `executorInfo` with the subscribed `frameworkId`. + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + executorInfo.mutable_resources()->CopyFrom(resources); AWAIT_READY(offers); EXPECT_NE(0, offers->offers().size()); - Future<v1::scheduler::Event::Update> runningUpdate1; - Future<v1::scheduler::Event::Update> runningUpdate2; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&runningUpdate1)) - .WillOnce(FutureArg<1>(&runningUpdate2)); - const v1::Offer& offer = offers->offers(0); const v1::AgentID agentId = offer.agent_id(); @@ -627,6 +602,12 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure) taskGroup.add_tasks()->CopyFrom(taskInfo1); taskGroup.add_tasks()->CopyFrom(taskInfo2); + Future<v1::scheduler::Event::Update> runningUpdate1; + Future<v1::scheduler::Event::Update> runningUpdate2; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&runningUpdate1)) + .WillOnce(FutureArg<1>(&runningUpdate2)); + { Call call; call.mutable_framework_id()->CopyFrom(frameworkId); @@ -661,6 +642,12 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure) ASSERT_EQ(tasks, tasksRunning); + Future<v1::scheduler::Event::Update> update1; + Future<v1::scheduler::Event::Update> update2; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&update1)) + .WillOnce(FutureArg<1>(&update2)); + // Acknowledge the TASK_RUNNING updates to receive the next updates. { @@ -701,12 +688,6 @@ TEST_P(DefaultExecutorTest, KillTaskGroupOnTaskFailure) taskStates.insert({taskInfo1.task_id(), v1::TASK_FAILED}); taskStates.insert({taskInfo2.task_id(), v1::TASK_KILLED}); - Future<v1::scheduler::Event::Update> update1; - Future<v1::scheduler::Event::Update> update2; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&update1)) - .WillOnce(FutureArg<1>(&update2)); - AWAIT_READY(update1); AWAIT_READY(update2); @@ -729,19 +710,6 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor) Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::Resources resources = - v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_resources()->CopyFrom(resources); - // Disable AuthN on the agent. slave::Flags flags = CreateSlaveFlags(); flags.authenticate_http_readwrite = false; @@ -751,6 +719,8 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor) Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); ASSERT_SOME(slave); + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) .WillOnce(FutureSatisfy(&connected)); @@ -778,25 +748,26 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor) Call call; call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); mesos.send(call); } AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - // Update `executorInfo` with the subscribed `frameworkId`. + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + executorInfo.mutable_resources()->CopyFrom(resources); AWAIT_READY(offers); EXPECT_NE(0, offers->offers().size()); - Future<v1::scheduler::Event::Update> update; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&update)); - const v1::Offer& offer = offers->offers(0); const v1::AgentID agentId = offer.agent_id(); @@ -808,6 +779,10 @@ TEST_P(DefaultExecutorTest, TaskUsesExecutor) v1::TaskGroupInfo taskGroup; taskGroup.add_tasks()->CopyFrom(taskInfo); + Future<v1::scheduler::Event::Update> update; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&update)); + { Call call; call.mutable_framework_id()->CopyFrom(frameworkId); @@ -882,12 +857,6 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask) AWAIT_READY(connected); AWAIT_READY(subscribed); - - AWAIT_READY(offers); - EXPECT_NE(0, offers->offers().size()); - - const v1::Offer& offer = offers->offers(0); - v1::FrameworkID frameworkId(subscribed->framework_id()); v1::ExecutorInfo executorInfo = v1::createExecutorInfo( @@ -896,9 +865,13 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask) "cpus:0.1;mem:32;disk:32", v1::ExecutorInfo::DEFAULT); - // Update `executorInfo` with the subscribed `frameworkId`. executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + AWAIT_READY(offers); + EXPECT_NE(0, offers->offers().size()); + + const v1::Offer& offer = offers->offers(0); + v1::TaskInfo task1 = v1::createTask( offer.agent_id(), v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), @@ -909,6 +882,10 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask) v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(), v1::createCommandInfo(SLEEP_COMMAND(1000))); + v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( + executorInfo, + v1::createTaskGroupInfo({task1, task2})); + Future<Event::Update> updateRunning1; Future<Event::Update> updateRunning2; EXPECT_CALL(*scheduler, update(_, _)) @@ -923,10 +900,6 @@ TEST_P(DefaultExecutorTest, ROOT_ContainerStatusForTask) frameworkId, offer.agent_id()))); - v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( - executorInfo, - v1::createTaskGroupInfo({task1, task2})); - mesos.send(v1::createCallAccept(frameworkId, offer, {launchGroup})); AWAIT_READY(updateRunning1); @@ -959,19 +932,6 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorTest, CommitSuicideOnTaskFailure) Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::Resources resources = - v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_resources()->CopyFrom(resources); - // Disable AuthN on the agent. slave::Flags flags = CreateSlaveFlags(); flags.authenticate_http_readwrite = false; @@ -980,6 +940,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorTest, CommitSuicideOnTaskFailure) Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); ASSERT_SOME(slave); + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) .WillOnce(FutureSatisfy(&connected)); @@ -1007,27 +969,26 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorTest, CommitSuicideOnTaskFailure) Call call; call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); mesos.send(call); } AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - // Update `executorInfo` with the subscribed `frameworkId`. + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); + executorInfo.mutable_resources()->CopyFrom(resources); AWAIT_READY(offers); EXPECT_NE(0, offers->offers().size()); - Future<v1::scheduler::Event::Update> runningUpdate; - Future<v1::scheduler::Event::Update> failedUpdate; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&runningUpdate)) - .WillOnce(FutureArg<1>(&failedUpdate)); - const v1::Offer& offer = offers->offers(0); const v1::AgentID agentId = offer.agent_id(); @@ -1037,6 +998,12 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorTest, CommitSuicideOnTaskFailure) v1::TaskGroupInfo taskGroup; taskGroup.add_tasks()->CopyFrom(taskInfo1); + Future<v1::scheduler::Event::Update> runningUpdate; + Future<v1::scheduler::Event::Update> failedUpdate; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&runningUpdate)) + .WillOnce(FutureArg<1>(&failedUpdate)); + Future<v1::scheduler::Event::Failure> executorFailure; EXPECT_CALL(*scheduler, failure(_, _)) .WillOnce(FutureArg<1>(&executorFailure)); @@ -1103,19 +1070,6 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::Resources resources = - v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_resources()->CopyFrom(resources); - // Disable AuthN on the agent. slave::Flags flags = CreateSlaveFlags(); flags.authenticate_http_readwrite = false; @@ -1124,6 +1078,8 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); ASSERT_SOME(slave); + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) .WillOnce(FutureSatisfy(&connected)); @@ -1151,31 +1107,26 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) Call call; call.set_type(Call::SUBSCRIBE); Call::Subscribe* subscribe = call.mutable_subscribe(); - subscribe->mutable_framework_info()->CopyFrom(frameworkInfo); + subscribe->mutable_framework_info()->CopyFrom(v1::DEFAULT_FRAMEWORK_INFO); mesos.send(call); } AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - // Update `executorInfo` with the subscribed `frameworkId`. + v1::Resources resources = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + executorInfo.mutable_resources()->CopyFrom(resources); AWAIT_READY(offers); EXPECT_NE(0, offers->offers().size()); - Future<v1::scheduler::Event::Update> runningUpdate1; - Future<v1::scheduler::Event::Update> runningUpdate2; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&runningUpdate1)) - .WillOnce(FutureArg<1>(&runningUpdate2)); - - Future<v1::scheduler::Event::Failure> executorFailure; - EXPECT_CALL(*scheduler, failure(_, _)) - .WillOnce(FutureArg<1>(&executorFailure)); - const v1::Offer& offer = offers->offers(0); const v1::AgentID agentId = offer.agent_id(); @@ -1193,6 +1144,16 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) const hashset<v1::TaskID> tasks{taskInfo1.task_id(), taskInfo2.task_id()}; + Future<v1::scheduler::Event::Update> runningUpdate1; + Future<v1::scheduler::Event::Update> runningUpdate2; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&runningUpdate1)) + .WillOnce(FutureArg<1>(&runningUpdate2)); + + Future<v1::scheduler::Event::Failure> executorFailure; + EXPECT_CALL(*scheduler, failure(_, _)) + .WillOnce(FutureArg<1>(&executorFailure)); + { Call call; call.mutable_framework_id()->CopyFrom(frameworkId); @@ -1306,27 +1267,9 @@ TEST_P(DefaultExecutorTest, CommitSuicideOnKillTask) // launched using reserved resources. TEST_P(DefaultExecutorTest, ReservedResources) { - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - frameworkInfo.set_role("role"); - Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::Resources unreserved = - v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - - v1::Resources reserved = unreserved.flatten( - frameworkInfo.role(), - v1::createReservationInfo(frameworkInfo.principal())).get(); - - v1::ExecutorInfo executorInfo; - executorInfo.set_type(v1::ExecutorInfo::DEFAULT); - - executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); - executorInfo.mutable_resources()->CopyFrom(reserved); - // Disable AuthN on the agent. slave::Flags flags = CreateSlaveFlags(); flags.authenticate_http_readwrite = false; @@ -1335,6 +1278,8 @@ TEST_P(DefaultExecutorTest, ReservedResources) Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); ASSERT_SOME(slave); + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) .WillOnce(FutureSatisfy(&connected)); @@ -1346,6 +1291,9 @@ TEST_P(DefaultExecutorTest, ReservedResources) AWAIT_READY(connected); + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_role("role"); + Future<v1::scheduler::Event::Subscribed> subscribed; EXPECT_CALL(*scheduler, subscribed(_, _)) .WillOnce(FutureArg<1>(&subscribed)); @@ -1367,19 +1315,25 @@ TEST_P(DefaultExecutorTest, ReservedResources) } AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - // Update `executorInfo` with the subscribed `frameworkId`. + v1::Resources unreserved = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + // Launch the executor using reserved resources. + v1::Resources reserved = unreserved.flatten( + frameworkInfo.role(), + v1::createReservationInfo(frameworkInfo.principal())).get(); + + v1::ExecutorInfo executorInfo; + executorInfo.set_type(v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID); executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + executorInfo.mutable_resources()->CopyFrom(reserved); AWAIT_READY(offers); EXPECT_NE(0, offers->offers().size()); - Future<v1::scheduler::Event::Update> runningUpdate; - EXPECT_CALL(*scheduler, update(_, _)) - .WillOnce(FutureArg<1>(&runningUpdate)); - const v1::Offer& offer = offers->offers(0); const v1::AgentID agentId = offer.agent_id(); @@ -1390,6 +1344,10 @@ TEST_P(DefaultExecutorTest, ReservedResources) v1::TaskGroupInfo taskGroup; taskGroup.add_tasks()->CopyFrom(taskInfo); + Future<v1::scheduler::Event::Update> runningUpdate; + EXPECT_CALL(*scheduler, update(_, _)) + .WillOnce(FutureArg<1>(&runningUpdate)); + { Call call; call.mutable_framework_id()->CopyFrom(frameworkId); @@ -1455,39 +1413,9 @@ INSTANTIATE_TEST_CASE_P( // reserved persistent resources which can be accessed by its tasks. TEST_P(PersistentVolumeDefaultExecutor, ROOT_PersistentResources) { - v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; - frameworkInfo.set_role(DEFAULT_TEST_ROLE); - Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); - auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); - - v1::Resources unreserved = - v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); - - v1::Resources reserved = unreserved.flatten( - frameworkInfo.role(), - v1::createReservationInfo(frameworkInfo.principal())).get(); - - v1::Resources volume = v1::createPersistentVolume( - Megabytes(1), - frameworkInfo.role(), - "id1", - "executor_volume_path", - frameworkInfo.principal(), - None(), - frameworkInfo.principal()); - - v1::ExecutorInfo executorInfo = v1::createExecutorInfo( - v1::DEFAULT_EXECUTOR_ID.value(), - None(), - None(), - v1::ExecutorInfo::DEFAULT); - - v1::Resources executorResources = reserved.apply(v1::CREATE(volume)).get(); - executorInfo.mutable_resources()->CopyFrom(executorResources); - // Disable AuthN on the agent. slave::Flags flags = CreateSlaveFlags(); flags.authenticate_http_readwrite = false; @@ -1497,6 +1425,11 @@ TEST_P(PersistentVolumeDefaultExecutor, ROOT_PersistentResources) Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags); ASSERT_SOME(slave); + auto scheduler = std::make_shared<v1::MockHTTPScheduler>(); + + v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO; + frameworkInfo.set_role(DEFAULT_TEST_ROLE); + Future<Nothing> connected; EXPECT_CALL(*scheduler, connected(_)) .WillOnce(DoAll(v1::scheduler::SendSubscribe(frameworkInfo), @@ -1522,11 +1455,34 @@ TEST_P(PersistentVolumeDefaultExecutor, ROOT_PersistentResources) .WillRepeatedly(Return()); // Ignore heartbeats. AWAIT_READY(subscribed); - v1::FrameworkID frameworkId(subscribed->framework_id()); - // Update `executorInfo` with the subscribed `frameworkId`. + v1::Resources unreserved = + v1::Resources::parse("cpus:0.1;mem:32;disk:32").get(); + + v1::Resources reserved = unreserved.flatten( + frameworkInfo.role(), + v1::createReservationInfo(frameworkInfo.principal())).get(); + + v1::Resources volume = v1::createPersistentVolume( + Megabytes(1), + frameworkInfo.role(), + "id1", + "executor_volume_path", + frameworkInfo.principal(), + None(), + frameworkInfo.principal()); + + v1::Resources executorResources = reserved.apply(v1::CREATE(volume)).get(); + + v1::ExecutorInfo executorInfo = v1::createExecutorInfo( + v1::DEFAULT_EXECUTOR_ID.value(), + None(), + None(), + v1::ExecutorInfo::DEFAULT); + executorInfo.mutable_framework_id()->CopyFrom(frameworkId); + executorInfo.mutable_resources()->CopyFrom(executorResources); AWAIT_READY(offers); EXPECT_NE(0, offers->offers().size()); @@ -1557,6 +1513,12 @@ TEST_P(PersistentVolumeDefaultExecutor, ROOT_PersistentResources) sandboxPath->set_type(mesos::v1::Volume::Source::SandboxPath::PARENT); sandboxPath->set_path("executor_volume_path"); + v1::Offer::Operation reserve = v1::RESERVE(reserved); + v1::Offer::Operation create = v1::CREATE(volume); + v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( + executorInfo, + v1::createTaskGroupInfo({taskInfo})); + Future<Event::Update> updateRunning; Future<Event::Update> updateFinished; EXPECT_CALL(*scheduler, update(_, _)) @@ -1566,12 +1528,6 @@ TEST_P(PersistentVolumeDefaultExecutor, ROOT_PersistentResources) offer.agent_id()))) .WillOnce(FutureArg<1>(&updateFinished)); - v1::Offer::Operation reserve = v1::RESERVE(reserved); - v1::Offer::Operation create = v1::CREATE(volume); - v1::Offer::Operation launchGroup = v1::LAUNCH_GROUP( - executorInfo, - v1::createTaskGroupInfo({taskInfo})); - mesos.send(v1::createCallAccept( frameworkId, offer,