Changed master to send TASK_DROPPED for task launch errors.

When a task launch fails due to a transient error (e.g., insufficient
available resources at an agent), the master sends a TASK_LOST update to
the framework. For PARTITION_AWARE frameworks, we now send TASK_DROPPED
instead.

Review: https://reviews.apache.org/r/52659/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4eac0b06
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4eac0b06
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4eac0b06

Branch: refs/heads/master
Commit: 4eac0b0663de87c0fdde6f9e8c42566f99a3dfaf
Parents: f8a0c28
Author: Neil Conway <neil.con...@gmail.com>
Authored: Wed Oct 19 16:32:02 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Wed Oct 19 16:32:02 2016 -0700

----------------------------------------------------------------------
 src/master/master.cpp                    |  38 ++++-
 src/tests/master_authorization_tests.cpp | 126 +++++++++++++--
 src/tests/master_tests.cpp               | 224 ++++++++++++++++++++++++--
 3 files changed, 358 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4eac0b06/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 324391a..2fc41f5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3498,7 +3498,9 @@ void Master::accept(
     }
   }
 
-  // If invalid, send TASK_LOST for the launch attempts.
+  // If invalid, send TASK_DROPPED for the launch attempts. If the
+  // framework is not partition-aware, send TASK_LOST instead.
+  //
   // TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to
   // consistently handle message dropping. It would be ideal if the
   // 'drop' overload can handle both resource recovery and lost task
@@ -3507,6 +3509,12 @@ void Master::accept(
     LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids()
                  << "': " << error.get().message;
 
+    TaskState newTaskState = TASK_DROPPED;
+    if (!protobuf::frameworkHasCapability(
+            framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+      newTaskState = TASK_LOST;
+    }
+
     foreach (const Offer::Operation& operation, accept.operations()) {
       if (operation.type() != Offer::Operation::LAUNCH &&
           operation.type() != Offer::Operation::LAUNCH_GROUP) {
@@ -3527,16 +3535,21 @@ void Master::accept(
             framework->id(),
             task.slave_id(),
             task.task_id(),
-            TASK_LOST,
+            newTaskState,
             TaskStatus::SOURCE_MASTER,
             None(),
             "Task launched with invalid offers: " + error.get().message,
             TaskStatus::REASON_INVALID_OFFERS);
 
-        metrics->tasks_lost++;
+        if (protobuf::frameworkHasCapability(
+                framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+          metrics->tasks_dropped++;
+        } else {
+          metrics->tasks_lost++;
+        }
 
         metrics->incrementTasksStates(
-            TASK_LOST,
+            newTaskState,
             TaskStatus::SOURCE_MASTER,
             TaskStatus::REASON_INVALID_OFFERS);
 
@@ -3702,6 +3715,12 @@ void Master::_accept(
   Slave* slave = slaves.registered.get(slaveId);
 
   if (slave == nullptr || !slave->connected) {
+    TaskState newTaskState = TASK_DROPPED;
+    if (!protobuf::frameworkHasCapability(
+            framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+      newTaskState = TASK_LOST;
+    }
+
     foreach (const Offer::Operation& operation, accept.operations()) {
       if (operation.type() != Offer::Operation::LAUNCH &&
           operation.type() != Offer::Operation::LAUNCH_GROUP) {
@@ -3734,16 +3753,21 @@ void Master::_accept(
             framework->id(),
             task.slave_id(),
             task.task_id(),
-            TASK_LOST,
+            newTaskState,
             TaskStatus::SOURCE_MASTER,
             None(),
             slave == nullptr ? "Agent removed" : "Agent disconnected",
             reason);
 
-        metrics->tasks_lost++;
+        if (protobuf::frameworkHasCapability(
+                framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+          metrics->tasks_dropped++;
+        } else {
+          metrics->tasks_lost++;
+        }
 
         metrics->incrementTasksStates(
-            TASK_LOST,
+            newTaskState,
             TaskStatus::SOURCE_MASTER,
             reason);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4eac0b06/src/tests/master_authorization_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_authorization_tests.cpp 
b/src/tests/master_authorization_tests.cpp
index a53e270..001d4b3 100644
--- a/src/tests/master_authorization_tests.cpp
+++ b/src/tests/master_authorization_tests.cpp
@@ -567,8 +567,9 @@ TEST_F(MasterAuthorizationTest, KillPendingTaskInTaskGroup)
 
 
 // This test verifies that a slave removal that comes before
-// '_accept()' is called results in TASK_LOST.
-TEST_F(MasterAuthorizationTest, SlaveRemoved)
+// '_accept()' is called results in TASK_LOST for a framework that is
+// not partition-aware.
+TEST_F(MasterAuthorizationTest, SlaveRemovedLost)
 {
   MockAuthorizer authorizer;
   Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
@@ -646,9 +647,7 @@ TEST_F(MasterAuthorizationTest, SlaveRemoved)
 
   // Check metrics.
   JSON::Object stats = Metrics();
-  EXPECT_EQ(1u, stats.values.count("master/tasks_lost"));
-  EXPECT_EQ(1u, stats.values.count(
-                    "master/task_lost/source_master/reason_slave_removed"));
+  EXPECT_EQ(0u, stats.values["master/tasks_dropped"]);
   EXPECT_EQ(1u, stats.values["master/tasks_lost"]);
   EXPECT_EQ(
       1u, stats.values["master/task_lost/source_master/reason_slave_removed"]);
@@ -669,9 +668,117 @@ TEST_F(MasterAuthorizationTest, SlaveRemoved)
 }
 
 
+// This test verifies that a slave removal that comes before
+// '_accept()' is called results in TASK_DROPPED for a framework that
+// is partition-aware.
+TEST_F(MasterAuthorizationTest, SlaveRemovedDropped)
+{
+  MockAuthorizer authorizer;
+  Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), 
&containerizer);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(1);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+
+  // Return a pending future from authorizer.
+  Future<Nothing> authorize;
+  Promise<bool> promise;
+  EXPECT_CALL(authorizer, authorized(_))
+    .WillOnce(DoAll(FutureSatisfy(&authorize),
+                    Return(promise.future())));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  // Wait until authorization is in progress.
+  AWAIT_READY(authorize);
+
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+
+  // Stop the slave with explicit shutdown as otherwise with
+  // checkpointing the master will wait for the slave to reconnect.
+  slave.get()->shutdown();
+  slave->reset();
+
+  AWAIT_READY(slaveLost);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> recoverResources =
+    FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
+
+  // Now complete authorization.
+  promise.set(true);
+
+  // Framework should get a TASK_DROPPED.
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_DROPPED, status.get().state());
+  EXPECT_EQ(TaskStatus::SOURCE_MASTER, status.get().source());
+  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, status.get().reason());
+
+  // No task launch should happen resulting in all resources being
+  // returned to the allocator.
+  AWAIT_READY(recoverResources);
+
+  // Check metrics.
+  JSON::Object stats = Metrics();
+  EXPECT_EQ(0u, stats.values["master/tasks_lost"]);
+  EXPECT_EQ(1u, stats.values["master/tasks_dropped"]);
+  EXPECT_EQ(
+      1u,
+      stats.values["master/task_dropped/source_master/reason_slave_removed"]);
+
+  // Make sure the task is not known to master anymore.
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .Times(0);
+
+  driver.reconcileTasks({});
+
+  // We settle the clock here to ensure any updates sent by the master
+  // are received. There shouldn't be any updates in this case.
+  Clock::pause();
+  Clock::settle();
+
+  driver.stop();
+  driver.join();
+}
+
+
 // This test verifies that a slave disconnection that comes before
-// '_launchTasks()' is called results in TASK_LOST.
-TEST_F(MasterAuthorizationTest, SlaveDisconnected)
+// '_launchTasks()' is called results in TASK_LOST for a framework
+// that is not partition-aware.
+TEST_F(MasterAuthorizationTest, SlaveDisconnectedLost)
 {
   MockAuthorizer authorizer;
   Try<Owned<cluster::Master>> master = StartMaster(&authorizer);
@@ -750,11 +857,8 @@ TEST_F(MasterAuthorizationTest, SlaveDisconnected)
 
   // Check metrics.
   JSON::Object stats = Metrics();
-  EXPECT_EQ(1u, stats.values.count("master/tasks_lost"));
+  EXPECT_EQ(0u, stats.values["master/tasks_dropped"]);
   EXPECT_EQ(1u, stats.values["master/tasks_lost"]);
-  EXPECT_EQ(1u,
-            stats.values.count(
-                "master/task_lost/source_master/reason_slave_removed"));
   EXPECT_EQ(
       1u,
       stats.values["master/task_lost/source_master/reason_slave_removed"]);

http://git-wip-us.apache.org/repos/asf/mesos/blob/4eac0b06/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index df492d3..b31502f 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -1488,8 +1488,10 @@ TEST_F(MasterTest, LaunchCombinedOfferTest)
 }
 
 
-// Test ensures offers for launchTasks cannot span multiple slaves.
-TEST_F(MasterTest, LaunchAcrossSlavesTest)
+// This test ensures that the offers provided to a single launchTasks
+// call cannot span multiple slaves. A non-partition-aware framework
+// should receive TASK_LOST.
+TEST_F(MasterTest, LaunchAcrossSlavesLost)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
@@ -1580,24 +1582,134 @@ TEST_F(MasterTest, LaunchAcrossSlavesTest)
 
   // Check metrics.
   JSON::Object stats = Metrics();
-  EXPECT_EQ(1u, stats.values.count("master/tasks_lost"));
+  EXPECT_EQ(0u, stats.values["master/tasks_dropped"]);
   EXPECT_EQ(1u, stats.values["master/tasks_lost"]);
   EXPECT_EQ(
       1u,
+      stats.values["master/task_lost/source_master/reason_invalid_offers"]);
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test ensures that the offers provided to a single launchTasks
+// call cannot span multiple slaves. A partition-aware framework
+// should receive TASK_DROPPED.
+TEST_F(MasterTest, LaunchAcrossSlavesDropped)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  // See LaunchCombinedOfferTest() for resource size motivation.
+  Resources fullSlave = Resources::parse("cpus:2;mem:1024").get();
+  Resources twoSlaves = fullSlave + fullSlave;
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.resources = Option<string>(stringify(fullSlave));
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave1 =
+    StartSlave(detector.get(), &containerizer, flags);
+  ASSERT_SOME(slave1);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers1));
+
+  driver.start();
+
+  AWAIT_READY(offers1);
+  EXPECT_NE(0u, offers1.get().size());
+  Resources resources1(offers1.get()[0].resources());
+  EXPECT_EQ(2, resources1.cpus().get());
+  EXPECT_EQ(Megabytes(1024), resources1.mem().get());
+
+  // Test that offers cannot span multiple slaves.
+  Future<vector<Offer>> offers2;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  // Create new Flags as we require another work_dir for checkpoints.
+  slave::Flags flags2 = CreateSlaveFlags();
+  flags2.resources = Option<string>(stringify(fullSlave));
+
+  Try<Owned<cluster::Slave>> slave2 =
+    StartSlave(detector.get(), &containerizer, flags2);
+  ASSERT_SOME(slave2);
+
+  AWAIT_READY(offers2);
+  EXPECT_NE(0u, offers2.get().size());
+  Resources resources2(offers1.get()[0].resources());
+  EXPECT_EQ(2, resources2.cpus().get());
+  EXPECT_EQ(Megabytes(1024), resources2.mem().get());
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers1.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(twoSlaves);
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  vector<OfferID> combinedOffers;
+  combinedOffers.push_back(offers1.get()[0].id());
+  combinedOffers.push_back(offers2.get()[0].id());
+
+  Future<Nothing> recoverResources =
+    FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
+
+  driver.launchTasks(combinedOffers, {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_DROPPED, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_INVALID_OFFERS, status.get().reason());
+
+  // The resources of the invalid offers should be recovered.
+  AWAIT_READY(recoverResources);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  // Check metrics.
+  JSON::Object stats = Metrics();
+  EXPECT_EQ(1u, stats.values.count("master/tasks_dropped"));
+  EXPECT_EQ(1u, stats.values["master/tasks_dropped"]);
+  EXPECT_EQ(
+      1u,
       stats.values.count(
-          "master/task_lost/source_master/reason_invalid_offers"));
+          "master/task_dropped/source_master/reason_invalid_offers"));
   EXPECT_EQ(
       1u,
-      stats.values["master/task_lost/source_master/reason_invalid_offers"]);
+      stats.values["master/task_dropped/source_master/reason_invalid_offers"]);
 
   driver.stop();
   driver.join();
 }
 
 
-// Test ensures that an offer cannot appear more than once in offers
-// for launchTasks.
-TEST_F(MasterTest, LaunchDuplicateOfferTest)
+// This test ensures that an offer cannot appear more than once in the
+// offers provided to a single launchTasks call. A non-partition-aware
+// framework should receive TASK_LOST.
+TEST_F(MasterTest, LaunchDuplicateOfferLost)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
@@ -1671,15 +1783,103 @@ TEST_F(MasterTest, LaunchDuplicateOfferTest)
 
   // Check metrics.
   JSON::Object stats = Metrics();
-  EXPECT_EQ(1u, stats.values.count("master/tasks_lost"));
+  EXPECT_EQ(0u, stats.values["master/tasks_dropped"]);
   EXPECT_EQ(1u, stats.values["master/tasks_lost"]);
   EXPECT_EQ(
       1u,
-      stats.values.count(
-          "master/task_lost/source_master/reason_invalid_offers"));
+      stats.values["master/task_lost/source_master/reason_invalid_offers"]);
+
+  driver.stop();
+  driver.join();
+}
+
+
+// This test ensures that an offer cannot appear more than once in the
+// offers provided to a single launchTasks call. A partition-aware
+// framework should receive TASK_DROPPED.
+TEST_F(MasterTest, LaunchDuplicateOfferDropped)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  // See LaunchCombinedOfferTest() for resource size motivation.
+  Resources fullSlave = Resources::parse("cpus:2;mem:1024").get();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.resources = Option<string>(stringify(fullSlave));
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &containerizer, flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // Test that same offers cannot be used more than once.
+  // Kill 2nd task and get offer for full slave.
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+  Resources resources(offers.get()[0].resources());
+  EXPECT_EQ(2, resources.cpus().get());
+  EXPECT_EQ(Megabytes(1024), resources.mem().get());
+
+  vector<OfferID> combinedOffers;
+  combinedOffers.push_back(offers.get()[0].id());
+  combinedOffers.push_back(offers.get()[0].id());
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(fullSlave);
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  Future<TaskStatus> status;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Future<Nothing> recoverResources =
+    FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
+
+  driver.launchTasks(combinedOffers, {task});
+
+  AWAIT_READY(status);
+  EXPECT_EQ(TASK_DROPPED, status.get().state());
+  EXPECT_EQ(TaskStatus::REASON_INVALID_OFFERS, status.get().reason());
+
+  // The resources of the invalid offers should be recovered.
+  AWAIT_READY(recoverResources);
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  // Check metrics.
+  JSON::Object stats = Metrics();
+  EXPECT_EQ(0u, stats.values["master/tasks_lost"]);
+  EXPECT_EQ(1u, stats.values["master/tasks_dropped"]);
   EXPECT_EQ(
       1u,
-      stats.values["master/task_lost/source_master/reason_invalid_offers"]);
+      stats.values["master/task_dropped/source_master/reason_invalid_offers"]);
 
   driver.stop();
   driver.join();

Reply via email to