Added test case for registry GC race condition.

If GC occurs concurrently with another operation that changes the
registry (e.g., reregistration of an agent that is also going to be
removed by the GC operation), the GC might not be able to prune as
many entries as expected from the registry.

Along the way, this commit updates the master's logging to emit
a warning in this situation. It also corrects an inaccuracy: we
should report the number of agents we actually pruned from the
registry, not the number we _attempted_ to reclaim.

Review: https://reviews.apache.org/r/51891/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/47bd3e45
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/47bd3e45
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/47bd3e45

Branch: refs/heads/master
Commit: 47bd3e458b8cfcfd1ca0114419e56069da15453f
Parents: 6172da9
Author: Neil Conway <neil.con...@gmail.com>
Authored: Mon Sep 19 15:48:54 2016 -0700
Committer: Vinod Kone <vinodk...@gmail.com>
Committed: Mon Sep 19 15:48:54 2016 -0700

----------------------------------------------------------------------
 src/master/master.cpp         |  28 +++-
 src/tests/partition_tests.cpp | 280 +++++++++++++++++++++++++++++++++++++
 2 files changed, 301 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/47bd3e45/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index beea5ff..e4499b5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1766,16 +1766,30 @@ void Master::_doRegistryGc(
   // `PruneUnreachable` registry operation should never fail.
   CHECK(registrarResult.get());
 
-  // TODO(neilc): Add a metric for # of agents discarded from the registry?
-  LOG(INFO) << "Garbage collected " << toRemove.size()
-            << " unreachable agents from the registry";
-
-  // Update in-memory state to be consistent with registry changes.
+  // Update in-memory state to be consistent with registry changes. If
+  // there was a concurrent registry operation that also modified the
+  // unreachable list (e.g., an agent in `toRemove` concurrently
+  // reregistered), entries in `toRemove` might not appear in
+  // `slaves.unreachable`.
+  //
+  // TODO(neilc): It would be nice to verify that the effect of these
+  // in-memory updates is equivalent to the changes made by the registry
+  // operation, but there isn't an easy way to do that.
+  size_t numRemoved = 0;
   foreach (const SlaveID& slave, toRemove) {
-    // NOTE: `slave` might not appear in `slaves.unreachable` if there
-    // have been concurrent changes.
+    if (!slaves.unreachable.contains(slave)) {
+      LOG(WARNING) << "Failed to garbage collect " << slave
+                   << " from the unreachable list";
+      continue;
+    }
+
     slaves.unreachable.erase(slave);
+    numRemoved++;
   }
+
+  // TODO(neilc): Add a metric for # of agents discarded from the registry?
+  LOG(INFO) << "Garbage collected " << numRemoved
+            << " unreachable agents from the registry";
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/47bd3e45/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index c860d2a..895a073 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -1881,6 +1881,286 @@ TEST_P(PartitionTest, RegistryGcByAge)
 }
 
 
+// This test checks what happens when these two operations race:
+// garbage collecting some slave IDs from the "unreachable" list in
+// the registry and moving a slave from the unreachable list back to
+// the admitted list. We add three agents to the unreachable list and
+// configure GC to only keep a single agent. Concurrently with GC
+// running, we arrange for one of those agents to reregister with the
+// master.
+TEST_P(PartitionTest, RegistryGcRace2)
+{
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.registry_strict = GetParam();
+  masterFlags.registry_max_agent_count = 1;
+
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Allow the master to PING the slave, but drop all PONG messages
+  // from the slave. Note that we don't match on the master / slave
+  // PIDs because it's actually the `SlaveObserver` process that sends
+  // the pings.
+  Future<Message> ping = FUTURE_MESSAGE(
+      Eq(PingSlaveMessage().GetTypeName()), _, _);
+
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage1 =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector1 = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave1 = StartSlave(detector1.get());
+  ASSERT_SOME(slave1);
+
+  // Wait for the slave to register and get the slave id.
+  AWAIT_READY(slaveRegisteredMessage1);
+  SlaveID slaveId1 = slaveRegisteredMessage1.get().slave_id();
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<Nothing> resourceOffers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureSatisfy(&resourceOffers))
+    .WillRepeatedly(Return());
+
+  driver.start();
+
+  AWAIT_READY(resourceOffers);
+
+  // Induce a partition of the slave.
+  size_t pings1 = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings1++;
+    if (pings1 == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Future<Nothing> slaveLost1;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost1));
+
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillRepeatedly(Return());
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+
+  AWAIT_READY(slaveLost1);
+
+  // `slave1` will try to re-register below when we advance the clock.
+  // Prevent this by dropping all future messages from it.
+  DROP_MESSAGES(_, slave1.get()->pid, _);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage2 =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  StandaloneMasterDetector detector2(master.get()->pid);
+
+  Try<Owned<cluster::Slave>> slave2 = StartSlave(&detector2);
+  ASSERT_SOME(slave2);
+
+  // Wait for the slave to register and get the slave id.
+  AWAIT_READY(slaveRegisteredMessage2);
+  SlaveID slaveId2 = slaveRegisteredMessage2.get().slave_id();
+
+  // Induce a partition of the slave.
+  size_t pings2 = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings2++;
+    if (pings2 == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Future<Nothing> slaveLost2;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost2));
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+
+  AWAIT_READY(slaveLost2);
+
+  // `slave2` will try to re-register below when we advance the clock.
+  // Prevent this by dropping the next `ReregisterSlaveMessage` from it.
+  Future<Message> reregisterSlave2 =
+    DROP_MESSAGE(Eq(ReregisterSlaveMessage().GetTypeName()),
+                 slave2.get()->pid,
+                 master.get()->pid);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage3 =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Owned<MasterDetector> detector3 = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> slave3 = StartSlave(detector3.get());
+  ASSERT_SOME(slave3);
+
+  // Wait for the slave to register and get the slave id.
+  AWAIT_READY(slaveRegisteredMessage3);
+  SlaveID slaveId3 = slaveRegisteredMessage3.get().slave_id();
+
+  // Induce a partition of the slave.
+  size_t pings3 = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings3++;
+    if (pings3 == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Future<Nothing> slaveLost3;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost3));
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+
+  AWAIT_READY(slaveLost3);
+  AWAIT_READY(reregisterSlave2);
+
+  // `slave3` will try to re-register below when we advance the clock.
+  // Prevent this by dropping all future messages from it.
+  DROP_MESSAGES(_, slave3.get()->pid, _);
+
+  // Cause `slave2` to reregister with the master. We expect the
+  // master to update the registry to mark the slave as reachable; we
+  // intercept the registry operation.
+  Future<Owned<master::Operation>> markReachable;
+  Promise<bool> markReachableContinue;
+  EXPECT_CALL(*master.get()->registrar.get(), apply(_))
+    .WillOnce(DoAll(FutureArg<0>(&markReachable),
+                    Return(markReachableContinue.future())));
+
+  detector2.appoint(master.get()->pid);
+
+  AWAIT_READY(markReachable);
+  EXPECT_NE(
+      nullptr,
+      dynamic_cast<master::MarkSlaveReachable*>(
+          markReachable.get().get()));
+
+  // Trigger GC. Because GC has been configured to preserve a single
+  // unreachable slave (the slave marked unreachable most recently),
+  // this should result in attempting to prune `slave1` and `slave2`
+  // from the unreachable list. We intercept the registry operation to
+  // force the race condition with the reregistration of `slave2`.
+  Future<Owned<master::Operation>> pruneUnreachable;
+  Promise<bool> pruneUnreachableContinue;
+  EXPECT_CALL(*master.get()->registrar.get(), apply(_))
+    .WillOnce(DoAll(FutureArg<0>(&pruneUnreachable),
+                    Return(pruneUnreachableContinue.future())));
+
+  Clock::advance(masterFlags.registry_gc_interval);
+
+  AWAIT_READY(pruneUnreachable);
+  EXPECT_NE(
+      nullptr,
+      dynamic_cast<master::PruneUnreachable*>(
+          pruneUnreachable.get().get()));
+
+  // Apply the registry operation to mark the slave reachable, then
+  // pass the result back to the master to allow it to continue. We
+  // validate that `slave2` is reregistered successfully.
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(),
+                    master.get()->pid,
+                    slave2.get()->pid);
+
+  Future<bool> applyReachable =
+    master.get()->registrar->unmocked_apply(markReachable.get());
+
+  AWAIT_READY(applyReachable);
+  markReachableContinue.set(applyReachable.get());
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // Apply the registry operation to prune the unreachable list, then
+  // pass the result back to the master to allow it to continue.
+  Future<bool> applyPrune =
+    master.get()->registrar->unmocked_apply(pruneUnreachable.get());
+
+  AWAIT_READY(applyPrune);
+  pruneUnreachableContinue.set(applyPrune.get());
+
+  // We expect that `slave1` has been removed from the unreachable
+  // list, `slave2` is registered, and `slave3` is still in the
+  // unreachable list. We use reconciliation to verify this.
+
+  TaskStatus status1;
+  status1.mutable_task_id()->set_value(UUID::random().toString());
+  status1.mutable_slave_id()->CopyFrom(slaveId1);
+  status1.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate1;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate1));
+
+  driver.reconcileTasks({status1});
+
+  AWAIT_READY(reconcileUpdate1);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate1.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate1.get().reason());
+  EXPECT_FALSE(reconcileUpdate1.get().has_unreachable_time());
+
+  TaskStatus status2;
+  status2.mutable_task_id()->set_value(UUID::random().toString());
+  status2.mutable_slave_id()->CopyFrom(slaveId2);
+  status2.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate2));
+
+  driver.reconcileTasks({status2});
+
+  AWAIT_READY(reconcileUpdate2);
+  EXPECT_EQ(TASK_LOST, reconcileUpdate2.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate2.get().reason());
+  EXPECT_FALSE(reconcileUpdate2.get().has_unreachable_time());
+
+  TaskStatus status3;
+  status3.mutable_task_id()->set_value(UUID::random().toString());
+  status3.mutable_slave_id()->CopyFrom(slaveId3);
+  status3.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate3;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate3));
+
+  driver.reconcileTasks({status3});
+
+  AWAIT_READY(reconcileUpdate3);
+  EXPECT_EQ(TASK_UNREACHABLE, reconcileUpdate3.get().state());
+  EXPECT_EQ(TaskStatus::REASON_RECONCILIATION, 
reconcileUpdate3.get().reason());
+  EXPECT_TRUE(reconcileUpdate3.get().has_unreachable_time());
+
+  driver.stop();
+  driver.join();
+
+  Clock::resume();
+}
+
+
 // This test checks that the master behaves correctly if a slave fails
 // health checks twice. At present, this can only occur if the registry
 // operation to mark the slave unreachable takes so long that the

Reply via email to