Changed master to send TASK_UNKNOWN during reconciliation. Previously, the master would send TASK_LOST in response to explicit reconciliation requests for (a) unknown tasks at registered slaves and (b) tasks at unknown slaves (neither registered nor unreachable). The master will now send TASK_UNKNOWN for these situations if the framework has the PARTITION_AWARE capability.
Review: https://reviews.apache.org/r/52693/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/97bd957f Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/97bd957f Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/97bd957f Branch: refs/heads/master Commit: 97bd957ffc45b3484dd82321c5e7fe7f02f9d79c Parents: 5082181 Author: Neil Conway <neil.con...@gmail.com> Authored: Wed Oct 19 16:32:14 2016 -0700 Committer: Vinod Kone <vinodk...@gmail.com> Committed: Wed Oct 19 16:32:14 2016 -0700 ---------------------------------------------------------------------- src/master/master.cpp | 36 ++++++++++------ src/tests/partition_tests.cpp | 14 +++---- src/tests/reconciliation_tests.cpp | 73 ++++++++++++++++++++++++++++++--- 3 files changed, 99 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/97bd957f/src/master/master.cpp ---------------------------------------------------------------------- diff --git a/src/master/master.cpp b/src/master/master.cpp index 2fc41f5..bf6bb1a 100644 --- a/src/master/master.cpp +++ b/src/master/master.cpp @@ -6248,17 +6248,13 @@ void Master::_reconcileTasks( // Explicit reconciliation occurs for the following cases: // (1) Task is known, but pending: TASK_STAGING. // (2) Task is known: send the latest state. - // (3) Task is unknown, slave is registered: TASK_LOST. + // (3) Task is unknown, slave is registered: TASK_UNKNOWN. // (4) Task is unknown, slave is transitioning: no-op. // (5) Task is unknown, slave is unreachable: TASK_UNREACHABLE. - // (6) Task is unknown, slave is unknown: TASK_LOST. + // (6) Task is unknown, slave is unknown: TASK_UNKNOWN. // - // When using a non-strict registry, case (6) may result in - // a TASK_LOST for a task that may later be non-terminal. This - // is better than no reply at all because the framework can take - // action for TASK_LOST. Later, if the task is running, the - // framework can discover it with implicit reconciliation and will - // be able to kill it. + // For cases (3), (5), and (6), TASK_LOST is sent instead if the + // framework has not opted-in to the PARTITION_AWARE capability. foreach (const TaskStatus& status, statuses) { Option<SlaveID> slaveId = None(); if (status.has_slave_id()) { @@ -6304,12 +6300,20 @@ void Master::_reconcileTasks( None(), protobuf::getTaskContainerStatus(*task)); } else if (slaveId.isSome() && slaves.registered.contains(slaveId.get())) { - // (3) Task is unknown, slave is registered: TASK_LOST. + // (3) Task is unknown, slave is registered: TASK_UNKNOWN. If + // the framework does not have the PARTITION_AWARE capability, + // send TASK_LOST for backward compatibility. + TaskState taskState = TASK_UNKNOWN; + if (!protobuf::frameworkHasCapability( + framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + update = protobuf::createStatusUpdate( framework->id(), slaveId.get(), status.task_id(), - TASK_LOST, + taskState, TaskStatus::SOURCE_MASTER, None(), "Reconciliation: Task is unknown to the agent", @@ -6347,12 +6351,20 @@ void Master::_reconcileTasks( None(), unreachableTime); } else { - // (6) Task is unknown, slave is unknown: TASK_LOST. + // (6) Task is unknown, slave is unknown: TASK_UNKNOWN. If the + // framework does not have the PARTITION_AWARE capability, send + // TASK_LOST for backward compatibility. + TaskState taskState = TASK_UNKNOWN; + if (!protobuf::frameworkHasCapability( + framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) { + taskState = TASK_LOST; + } + update = protobuf::createStatusUpdate( framework->id(), slaveId, status.task_id(), - TASK_LOST, + taskState, TaskStatus::SOURCE_MASTER, None(), "Reconciliation: Task is unknown", http://git-wip-us.apache.org/repos/asf/mesos/blob/97bd957f/src/tests/partition_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp index 12fe859..5a0d4bd 100644 --- a/src/tests/partition_tests.cpp +++ b/src/tests/partition_tests.cpp @@ -1603,7 +1603,7 @@ TEST_F(PartitionTest, RegistryGcByCount) driver.reconcileTasks({status2}); AWAIT_READY(reconcileUpdate2); - EXPECT_EQ(TASK_LOST, reconcileUpdate2.get().state()); + EXPECT_EQ(TASK_UNKNOWN, reconcileUpdate2.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate2.get().reason()); EXPECT_FALSE(reconcileUpdate2.get().has_unreachable_time()); @@ -1746,7 +1746,7 @@ TEST_F(PartitionTest, RegistryGcByCountManySlaves) driver.reconcileTasks({status2}); AWAIT_READY(reconcileUpdate2); - EXPECT_EQ(TASK_LOST, reconcileUpdate2.get().state()); + EXPECT_EQ(TASK_UNKNOWN, reconcileUpdate2.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate2.get().reason()); EXPECT_FALSE(reconcileUpdate2.get().has_unreachable_time()); @@ -1965,7 +1965,7 @@ TEST_F(PartitionTest, RegistryGcByAge) driver.reconcileTasks({status3}); AWAIT_READY(reconcileUpdate3); - EXPECT_EQ(TASK_LOST, reconcileUpdate3.get().state()); + EXPECT_EQ(TASK_UNKNOWN, reconcileUpdate3.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate3.get().reason()); EXPECT_FALSE(reconcileUpdate3.get().has_unreachable_time()); @@ -2004,7 +2004,7 @@ TEST_F(PartitionTest, RegistryGcByAge) driver.reconcileTasks({status5}); AWAIT_READY(reconcileUpdate5); - EXPECT_EQ(TASK_LOST, reconcileUpdate5.get().state()); + EXPECT_EQ(TASK_UNKNOWN, reconcileUpdate5.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate5.get().reason()); EXPECT_FALSE(reconcileUpdate5.get().has_unreachable_time()); @@ -2022,7 +2022,7 @@ TEST_F(PartitionTest, RegistryGcByAge) // configure GC to only keep a single agent. Concurrently with GC // running, we arrange for one of those agents to reregister with the // master. -TEST_F(PartitionTest, RegistryGcRace2) +TEST_F(PartitionTest, RegistryGcRace) { master::Flags masterFlags = CreateMasterFlags(); masterFlags.registry_max_agent_count = 1; @@ -2251,7 +2251,7 @@ TEST_F(PartitionTest, RegistryGcRace2) driver.reconcileTasks({status1}); AWAIT_READY(reconcileUpdate1); - EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state()); + EXPECT_EQ(TASK_UNKNOWN, reconcileUpdate1.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate1.get().reason()); EXPECT_FALSE(reconcileUpdate1.get().has_unreachable_time()); @@ -2267,7 +2267,7 @@ TEST_F(PartitionTest, RegistryGcRace2) driver.reconcileTasks({status2}); AWAIT_READY(reconcileUpdate2); - EXPECT_EQ(TASK_LOST, reconcileUpdate2.get().state()); + EXPECT_EQ(TASK_UNKNOWN, reconcileUpdate2.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, reconcileUpdate2.get().reason()); EXPECT_FALSE(reconcileUpdate2.get().has_unreachable_time()); http://git-wip-us.apache.org/repos/asf/mesos/blob/97bd957f/src/tests/reconciliation_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/reconciliation_tests.cpp b/src/tests/reconciliation_tests.cpp index d2bba48..71073a0 100644 --- a/src/tests/reconciliation_tests.cpp +++ b/src/tests/reconciliation_tests.cpp @@ -230,8 +230,8 @@ TEST_F(ReconciliationTest, TaskStateMatch) // This test verifies that reconciliation of a task that belongs to an -// unknown slave results in TASK_LOST, even if the framework has -// enabled the PARTITION_AWARE capability. +// unknown slave results in TASK_UNKNOWN if the framework has enabled +// the PARTITION_AWARE capability. TEST_F(ReconciliationTest, UnknownSlave) { Try<Owned<cluster::Master>> master = StartMaster(); @@ -266,9 +266,9 @@ TEST_F(ReconciliationTest, UnknownSlave) driver.reconcileTasks({status}); - // Framework should receive TASK_LOST because the slave is unknown. + // Framework should receive TASK_UNKNOWN because the slave is unknown. AWAIT_READY(update); - EXPECT_EQ(TASK_LOST, update.get().state()); + EXPECT_EQ(TASK_UNKNOWN, update.get().state()); EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, update.get().reason()); EXPECT_FALSE(update.get().has_unreachable_time()); @@ -278,7 +278,8 @@ TEST_F(ReconciliationTest, UnknownSlave) // This test verifies that reconciliation of an unknown task that -// belongs to a known slave results in TASK_LOST. +// belongs to a known slave results in TASK_LOST if the framework is +// not partition-aware. TEST_F(ReconciliationTest, UnknownTask) { Try<Owned<cluster::Master>> master = StartMaster(); @@ -334,6 +335,68 @@ TEST_F(ReconciliationTest, UnknownTask) } +// This test verifies that reconciliation of an unknown task that +// belongs to a known slave results in TASK_UNKNOWN if the framework +// is partition-aware. +TEST_F(ReconciliationTest, UnknownTaskPartitionAware) +{ + Try<Owned<cluster::Master>> master = StartMaster(); + ASSERT_SOME(master); + + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + + Owned<MasterDetector> detector = master.get()->createDetector(); + Try<Owned<cluster::Slave>> slave = StartSlave(detector.get()); + ASSERT_SOME(slave); + + // Wait for the slave to register and get the slave id. + AWAIT_READY(slaveRegisteredMessage); + const SlaveID slaveId = slaveRegisteredMessage.get().slave_id(); + + FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO; + frameworkInfo.add_capabilities()->set_type( + FrameworkInfo::Capability::PARTITION_AWARE); + + MockScheduler sched; + MesosSchedulerDriver driver( + &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL); + + Future<FrameworkID> frameworkId; + EXPECT_CALL(sched, registered(&driver, _, _)) + .WillOnce(FutureArg<1>(&frameworkId)); + + EXPECT_CALL(sched, resourceOffers(&driver, _)) + .WillRepeatedly(Return()); // Ignore offers. + + driver.start(); + + // Wait until the framework is registered. + AWAIT_READY(frameworkId); + + Future<TaskStatus> update; + EXPECT_CALL(sched, statusUpdate(&driver, _)) + .WillOnce(FutureArg<1>(&update)); + + // Create a task status with a random task id. + TaskStatus status; + status.mutable_task_id()->set_value(UUID::random().toString()); + status.mutable_slave_id()->CopyFrom(slaveId); + status.set_state(TASK_STAGING); // Dummy value. + + driver.reconcileTasks({status}); + + // Framework should receive TASK_UNKNOWN for an unknown task. + AWAIT_READY(update); + EXPECT_EQ(TASK_UNKNOWN, update.get().state()); + EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, update.get().reason()); + EXPECT_FALSE(update.get().has_unreachable_time()); + + driver.stop(); + driver.join(); +} + + // This test verifies that the killTask request of an unknown task // results in reconciliation. In this case, the task is unknown // and there are no transitional slaves.