Changed master to send TASK_DROPPED for task launch errors. When a task launch fails due to a transient error (e.g., insufficient available resources at an agent), the master sends a TASK_LOST update to the framework. For PARTITION_AWARE frameworks, we now send TASK_DROPPED instead.
Review: https://reviews.apache.org/r/52659/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4eac0b06 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4eac0b06 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4eac0b06 Branch: refs/heads/master Commit: 4eac0b0663de87c0fdde6f9e8c42566f99a3dfaf Parents: f8a0c28 Author: Neil Conway <neil.con...@gmail.com> Authored: Wed Oct 19 16:32:02 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Wed Oct 19 16:32:02 2016 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 38 ++++- src/tests/master_authorization_tests.cpp | 126 +++++++++++++-- src/tests/master_tests.cpp | 224 ++++++++++++++++++++++++-- 3 files changed, 358 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/4eac0b06/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 324391a..2fc41f5 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -3498,7 +3498,9 @@ void Master::accept( } } - // If invalid, send TASK_LOST for the launch attempts. + // If invalid, send TASK_DROPPED for the launch attempts. If the + // framework is not partition-aware, send TASK_LOST instead. + // // TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to // consistently handle message dropping. It would be ideal if the // 'drop' overload can handle both resource recovery and lost task @@ -3507,6 +3509,12 @@ void Master::accept( LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids() << "': " << error.get().message; + TaskState newTaskState = TASK_DROPPED; + if (!protobuf::frameworkHasCapability( + framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) { + newTaskState = TASK_LOST; + } + foreach (const Offer::Operation& operation, accept.operations()) { if (operation.type() != Offer::Operation::LAUNCH && operation.type() != Offer::Operation::LAUNCH_GROUP) { @@ -3527,16 +3535,21 @@ void Master::accept( framework->id(), task.slave_id(), task.task_id(), - TASK_LOST, + newTaskState, TaskStatus::SOURCE_MASTER, None(), "Task launched with invalid offers: " + error.get().message, TaskStatus::REASON_INVALID_OFFERS); - metrics->tasks_lost++; + if (protobuf::frameworkHasCapability( + framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) { + metrics->tasks_dropped++; + } else { + metrics->tasks_lost++; + } metrics->incrementTasksStates( - TASK_LOST, + newTaskState, TaskStatus::SOURCE_MASTER, TaskStatus::REASON_INVALID_OFFERS); @@ -3702,6 +3715,12 @@ void Master::_accept( Slave* slave = slaves.registered.get(slaveId); if (slave == nullptr || !slave->connected) { + TaskState newTaskState = TASK_DROPPED; + if (!protobuf::frameworkHasCapability( + framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) { + newTaskState = TASK_LOST; + } + foreach (const Offer::Operation& operation, accept.operations()) { if (operation.type() != Offer::Operation::LAUNCH && operation.type() != Offer::Operation::LAUNCH_GROUP) { @@ -3734,16 +3753,21 @@ void Master::_accept( framework->id(), task.slave_id(), task.task_id(), - TASK_LOST, + newTaskState, TaskStatus::SOURCE_MASTER, None(), slave == nullptr ? "Agent removed" : "Agent disconnected", reason); - metrics->tasks_lost++; + if (protobuf::frameworkHasCapability( + framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) { + metrics->tasks_dropped++; + } else { + metrics->tasks_lost++; + } metrics->incrementTasksStates( - TASK_LOST, + newTaskState, TaskStatus::SOURCE_MASTER, reason); http://git-wip-us.apache.org/repos/asf/mesos/blob/4eac0b06/src/tests/master_authorization_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_authorization_tests.cpp b/src/tests/master_authorization_tests.cpp index a53e270..001d4b3 100644 --- a/src/tests/master_authorization_tests.cpp +++ b/src/tests/master_authorization_tests.cpp @@ -567,8 +567,9 @@ TEST_F(MasterAuthorizationTest, KillPendingTaskInTaskGroup) // This test verifies that a slave removal that comes before -// '_accept()' is called results in TASK_LOST. -TEST_F(MasterAuthorizationTest, SlaveRemoved) +// '_accept()' is called results in TASK_LOST for a framework that is +// not partition-aware. +TEST_F(MasterAuthorizationTest, SlaveRemovedLost) { MockAuthorizer authorizer; Try<Owned<cluster::Master>> master = StartMaster(&authorizer); @@ -646,9 +647,7 @@ TEST_F(MasterAuthorizationTest, SlaveRemoved) // Check metrics. JSON::Object stats = Metrics(); - EXPECT_EQ(1u, stats.values.count("master/tasks_lost")); - EXPECT_EQ(1u, stats.values.count( - "master/task_lost/source_master/reason_slave_removed")); + EXPECT_EQ(0u, stats.values["master/tasks_dropped"]); EXPECT_EQ(1u, stats.values["master/tasks_lost"]); EXPECT_EQ( 1u, stats.values["master/task_lost/source_master/reason_slave_removed"]); @@ -669,9 +668,117 @@ TEST_F(MasterAuthorizationTest, SlaveRemoved) } +// This test verifies that a slave removal that comes before +// '_accept()' is called results in TASK_DROPPED for a framework that +// is partition-aware. +TEST_F(MasterAuthorizationTest, SlaveRemovedDropped) +{ + MockAuthorizer authorizer; + Try<Owned<cluster::Master>> master = StartMaster(&authorizer); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + TestContainerizer containerizer(&exec); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer); + ASSERT_SOME(slave); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)) + .Times(1); + + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + + TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID); + + // Return a pending future from authorizer. + Future<Nothing> authorize; + Promise<bool> promise; + EXPECT_CALL(authorizer, authorized(_)) + .WillOnce(DoAll(FutureSatisfy(&authorize), + Return(promise.future()))); + + driver.launchTasks(offers.get()[0].id(), {task}); + + // Wait until authorization is in progress. + AWAIT_READY(authorize); + + Future<Nothing> slaveLost; + EXPECT_CALL(sched, slaveLost(&driver, _)) + .WillOnce(FutureSatisfy(&slaveLost)); + + // Stop the slave with explicit shutdown as otherwise with + // checkpointing the master will wait for the slave to reconnect. + slave.get()->shutdown(); + slave->reset(); + + AWAIT_READY(slaveLost); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + Future<Nothing> recoverResources = + FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources); + + // Now complete authorization. + promise.set(true); + + // Framework should get a TASK_DROPPED. + AWAIT_READY(status); + + EXPECT_EQ(TASK_DROPPED, status.get().state()); + EXPECT_EQ(TaskStatus::SOURCE_MASTER, status.get().source()); + EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status.get().reason()); + + // No task launch should happen resulting in all resources being + // returned to the allocator. + AWAIT_READY(recoverResources); + + // Check metrics. + JSON::Object stats = Metrics(); + EXPECT_EQ(0u, stats.values["master/tasks_lost"]); + EXPECT_EQ(1u, stats.values["master/tasks_dropped"]); + EXPECT_EQ( + 1u, + stats.values["master/task_dropped/source_master/reason_slave_removed"]); + + // Make sure the task is not known to master anymore. + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .Times(0); + + driver.reconcileTasks({}); + + // We settle the clock here to ensure any updates sent by the master + // are received. There shouldn't be any updates in this case. + Clock::pause(); + Clock::settle(); + + driver.stop(); + driver.join(); +} + + // This test verifies that a slave disconnection that comes before -// '_launchTasks()' is called results in TASK_LOST. -TEST_F(MasterAuthorizationTest, SlaveDisconnected) +// '_launchTasks()' is called results in TASK_LOST for a framework +// that is not partition-aware. +TEST_F(MasterAuthorizationTest, SlaveDisconnectedLost) { MockAuthorizer authorizer; Try<Owned<cluster::Master>> master = StartMaster(&authorizer); @@ -750,11 +857,8 @@ TEST_F(MasterAuthorizationTest, SlaveDisconnected) // Check metrics. JSON::Object stats = Metrics(); - EXPECT_EQ(1u, stats.values.count("master/tasks_lost")); + EXPECT_EQ(0u, stats.values["master/tasks_dropped"]); EXPECT_EQ(1u, stats.values["master/tasks_lost"]); - EXPECT_EQ(1u, - stats.values.count( - "master/task_lost/source_master/reason_slave_removed")); EXPECT_EQ( 1u, stats.values["master/task_lost/source_master/reason_slave_removed"]); http://git-wip-us.apache.org/repos/asf/mesos/blob/4eac0b06/src/tests/master_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp index df492d3..b31502f 100644 --- a/src/tests/master_tests.cpp +++ b/src/tests/master_tests.cpp @@ -1488,8 +1488,10 @@ TEST_F(MasterTest, LaunchCombinedOfferTest) } -// Test ensures offers for launchTasks cannot span multiple slaves. -TEST_F(MasterTest, LaunchAcrossSlavesTest) +// This test ensures that the offers provided to a single launchTasks +// call cannot span multiple slaves. A non-partition-aware framework +// should receive TASK_LOST. +TEST_F(MasterTest, LaunchAcrossSlavesLost) { Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); @@ -1580,24 +1582,134 @@ TEST_F(MasterTest, LaunchAcrossSlavesTest) // Check metrics. JSON::Object stats = Metrics(); - EXPECT_EQ(1u, stats.values.count("master/tasks_lost")); + EXPECT_EQ(0u, stats.values["master/tasks_dropped"]); EXPECT_EQ(1u, stats.values["master/tasks_lost"]); EXPECT_EQ( 1u, + stats.values["master/task_lost/source_master/reason_invalid_offers"]); + + driver.stop(); + driver.join(); +} + + +// This test ensures that the offers provided to a single launchTasks +// call cannot span multiple slaves. A partition-aware framework +// should receive TASK_DROPPED. +TEST_F(MasterTest, LaunchAcrossSlavesDropped) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + TestContainerizer containerizer(&exec); + + // See LaunchCombinedOfferTest() for resource size motivation. + Resources fullSlave = Resources::parse("cpus:2;mem:1024").get(); + Resources twoSlaves = fullSlave + fullSlave; + + slave::Flags flags = CreateSlaveFlags(); + flags.resources = Option<string>(stringify(fullSlave)); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> slave1 = + StartSlave(detector.get(), &containerizer, flags); + ASSERT_SOME(slave1); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + Future<vector<Offer>> offers1; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers1)); + + driver.start(); + + AWAIT_READY(offers1); + EXPECT_NE(0u, offers1.get().size()); + Resources resources1(offers1.get()[0].resources()); + EXPECT_EQ(2, resources1.cpus().get()); + EXPECT_EQ(Megabytes(1024), resources1.mem().get()); + + // Test that offers cannot span multiple slaves. + Future<vector<Offer>> offers2; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers2)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + // Create new Flags as we require another work_dir for checkpoints. + slave::Flags flags2 = CreateSlaveFlags(); + flags2.resources = Option<string>(stringify(fullSlave)); + + Try<Owned<cluster::Slave>> slave2 = + StartSlave(detector.get(), &containerizer, flags2); + ASSERT_SOME(slave2); + + AWAIT_READY(offers2); + EXPECT_NE(0u, offers2.get().size()); + Resources resources2(offers1.get()[0].resources()); + EXPECT_EQ(2, resources2.cpus().get()); + EXPECT_EQ(Megabytes(1024), resources2.mem().get()); + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->MergeFrom(offers1.get()[0].slave_id()); + task.mutable_resources()->MergeFrom(twoSlaves); + task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); + + Future<TaskStatus> status; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + vector<OfferID> combinedOffers; + combinedOffers.push_back(offers1.get()[0].id()); + combinedOffers.push_back(offers2.get()[0].id()); + + Future<Nothing> recoverResources = + FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources); + + driver.launchTasks(combinedOffers, {task}); + + AWAIT_READY(status); + EXPECT_EQ(TASK_DROPPED, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_INVALID_OFFERS, status.get().reason()); + + // The resources of the invalid offers should be recovered. + AWAIT_READY(recoverResources); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + // Check metrics. + JSON::Object stats = Metrics(); + EXPECT_EQ(1u, stats.values.count("master/tasks_dropped")); + EXPECT_EQ(1u, stats.values["master/tasks_dropped"]); + EXPECT_EQ( + 1u, stats.values.count( - "master/task_lost/source_master/reason_invalid_offers")); + "master/task_dropped/source_master/reason_invalid_offers")); EXPECT_EQ( 1u, - stats.values["master/task_lost/source_master/reason_invalid_offers"]); + stats.values["master/task_dropped/source_master/reason_invalid_offers"]); driver.stop(); driver.join(); } -// Test ensures that an offer cannot appear more than once in offers -// for launchTasks. -TEST_F(MasterTest, LaunchDuplicateOfferTest) +// This test ensures that an offer cannot appear more than once in the +// offers provided to a single launchTasks call. A non-partition-aware +// framework should receive TASK_LOST. +TEST_F(MasterTest, LaunchDuplicateOfferLost) { Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); @@ -1671,15 +1783,103 @@ TEST_F(MasterTest, LaunchDuplicateOfferTest) // Check metrics. JSON::Object stats = Metrics(); - EXPECT_EQ(1u, stats.values.count("master/tasks_lost")); + EXPECT_EQ(0u, stats.values["master/tasks_dropped"]); EXPECT_EQ(1u, stats.values["master/tasks_lost"]); EXPECT_EQ( 1u, - stats.values.count( - "master/task_lost/source_master/reason_invalid_offers")); + stats.values["master/task_lost/source_master/reason_invalid_offers"]); + + driver.stop(); + driver.join(); +} + + +// This test ensures that an offer cannot appear more than once in the +// offers provided to a single launchTasks call. A partition-aware +// framework should receive TASK_DROPPED. +TEST_F(MasterTest, LaunchDuplicateOfferDropped) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + MockExecutor exec(DEFAULT_EXECUTOR_ID); + TestContainerizer containerizer(&exec); + + // See LaunchCombinedOfferTest() for resource size motivation. + Resources fullSlave = Resources::parse("cpus:2;mem:1024").get(); + + slave::Flags flags = CreateSlaveFlags(); + flags.resources = Option<string>(stringify(fullSlave)); + + Owned<MasterDetector> detector = master.get()->createDetector(); + + Try<Owned<cluster::Slave>> slave = + StartSlave(detector.get(), &containerizer, flags); + ASSERT_SOME(slave); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + EXPECT_CALL(sched, registered(&driver, _, _)); + + // Test that same offers cannot be used more than once. + // Kill 2nd task and get offer for full slave. + Future<vector<Offer>> offers; + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillOnce(FutureArg<1>(&offers)) + .WillRepeatedly(Return()); // Ignore subsequent offers. + + driver.start(); + + AWAIT_READY(offers); + EXPECT_NE(0u, offers.get().size()); + Resources resources(offers.get()[0].resources()); + EXPECT_EQ(2, resources.cpus().get()); + EXPECT_EQ(Megabytes(1024), resources.mem().get()); + + vector<OfferID> combinedOffers; + combinedOffers.push_back(offers.get()[0].id()); + combinedOffers.push_back(offers.get()[0].id()); + + TaskInfo task; + task.set_name(""); + task.mutable_task_id()->set_value("1"); + task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id()); + task.mutable_resources()->MergeFrom(fullSlave); + task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO); + + Future<TaskStatus> status; + + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&status)); + + Future<Nothing> recoverResources = + FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources); + + driver.launchTasks(combinedOffers, {task}); + + AWAIT_READY(status); + EXPECT_EQ(TASK_DROPPED, status.get().state()); + EXPECT_EQ(TaskStatus::REASON_INVALID_OFFERS, status.get().reason()); + + // The resources of the invalid offers should be recovered. + AWAIT_READY(recoverResources); + + EXPECT_CALL(exec, shutdown(_)) + .Times(AtMost(1)); + + // Check metrics. + JSON::Object stats = Metrics(); + EXPECT_EQ(0u, stats.values["master/tasks_lost"]); + EXPECT_EQ(1u, stats.values["master/tasks_dropped"]); EXPECT_EQ( 1u, - stats.values["master/task_lost/source_master/reason_invalid_offers"]); + stats.values["master/task_dropped/source_master/reason_invalid_offers"]); driver.stop(); driver.join();