Added the registrar.

From: Benjamin Hindman <b...@berkeley.edu>
Review: https://reviews.apache.org/r/14383


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7edb0004
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7edb0004
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7edb0004

Branch: refs/heads/master
Commit: 7edb0004b974db8ac2c5e9be84e57e7349136de6
Parents: a8310ad
Author: Benjamin Mahler <bmah...@twitter.com>
Authored: Tue Oct 29 12:08:23 2013 -0700
Committer: Benjamin Mahler <bmah...@twitter.com>
Committed: Tue Oct 29 12:27:01 2013 -0700

----------------------------------------------------------------------
 src/Makefile.am               |   5 +-
 src/master/registrar.cpp      | 415 +++++++++++++++++++++++++++++++++++++
 src/master/registrar.hpp      |  53 +++++
 src/master/registry.proto     |   2 +-
 src/tests/registrar_tests.cpp | 154 ++++++++++++++
 src/tests/state_tests.cpp     | 148 ++++++-------
 6 files changed, 701 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7edb0004/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 0b32d74..a11c76b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -167,6 +167,7 @@ libmesos_no_3rdparty_la_SOURCES =                           
        \
        master/master.cpp                                               \
        master/registry.hpp                                             \
        master/registry.proto                                           \
+       master/registrar.cpp                                            \
        slave/constants.cpp                                             \
        slave/gc.cpp                                                    \
        slave/monitor.cpp                                               \
@@ -223,6 +224,7 @@ libmesos_no_3rdparty_la_SOURCES += common/attributes.hpp    
        \
        master/allocator.hpp                                            \
        master/constants.hpp master/drf_sorter.hpp master/flags.hpp     \
        master/hierarchical_allocator_process.hpp                       \
+       master/registrar.hpp                                            \
        master/master.hpp master/sorter.hpp                             \
        messages/messages.hpp slave/constants.hpp                       \
        slave/flags.hpp slave/gc.hpp slave/monitor.hpp                  \
@@ -298,7 +300,7 @@ libmesos_no_3rdparty_la_LIBADD += libstate.la
 # The final result!
 lib_LTLIBRARIES += libmesos.la
 
-libmesos_la_SOURCES = $(MESOS_PROTO) # Part of the distribution.
+libmesos_la_SOURCES = $(MESOS_PROTO) # Include as part of the distribution.
 
 libmesos_la_LDFLAGS = -release $(PACKAGE_VERSION) -shared
 
@@ -781,6 +783,7 @@ mesos_tests_SOURCES =                       \
   tests/paths_tests.cpp                        \
   tests/protobuf_io_tests.cpp          \
   tests/reaper_tests.cpp               \
+  tests/registrar_tests.cpp            \
   tests/resource_offers_tests.cpp      \
   tests/resources_tests.cpp            \
   tests/sasl_tests.cpp                 \

http://git-wip-us.apache.org/repos/asf/mesos/blob/7edb0004/src/master/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.cpp b/src/master/registrar.cpp
new file mode 100644
index 0000000..42fe30e
--- /dev/null
+++ b/src/master/registrar.cpp
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <deque>
+#include <string>
+
+#include <process/defer.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/process.hpp>
+
+#include <stout/lambda.hpp>
+#include <stout/none.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+
+#include "common/type_utils.hpp"
+
+#include "master/registrar.hpp"
+#include "master/registry.hpp"
+
+#include "state/protobuf.hpp"
+
+using mesos::internal::state::protobuf::State;
+using mesos::internal::state::protobuf::Variable;
+
+using process::dispatch;
+using process::Future;
+using process::Process;
+using process::Promise;
+using process::spawn;
+using process::terminate;
+using process::wait; // Necessary on some OS's to disambiguate.
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+class RegistrarProcess : public Process<RegistrarProcess>
+{
+public:
+  RegistrarProcess(State* _state)
+    : ProcessBase("registrar"),
+      state(_state)
+  {
+    slaves.variable = None();
+    slaves.updating = false;
+  }
+
+  virtual ~RegistrarProcess() {}
+
+  // Registrar implementation.
+  Future<bool> admit(const SlaveID& id, const SlaveInfo& info);
+  Future<bool> readmit(const SlaveInfo& info);
+  Future<bool> remove(const SlaveInfo& info);
+
+private:
+  template <typename T>
+  struct Mutation : process::Promise<bool>
+  {
+    virtual Try<T> apply(T t) = 0;
+  };
+
+  struct Admit : Mutation<registry::Slaves>
+  {
+    Admit(const SlaveID& _id, const SlaveInfo& _info)
+      : id(_id), info(_info) {}
+
+    virtual Try<registry::Slaves> apply(registry::Slaves slaves)
+    {
+      // Check and see if this slave already exists.
+      foreach (const registry::Slave& slave, slaves.slaves()) {
+        if (slave.info().id() == id) {
+          set(false);
+          return slaves; // No mutation.
+        }
+      }
+
+      // Okay, add the slave!
+      registry::Slave* slave = slaves.add_slaves();
+      slave->mutable_info()->CopyFrom(info);
+      slave->mutable_info()->mutable_id()->MergeFrom(id);
+      return slaves;
+    }
+
+    const SlaveID id;
+    const SlaveInfo info;
+  };
+
+  // NOTE: even thought readmission does not mutate the state we model
+  // it as a mutation so that it is performed in sequence with other
+  // mutations.
+  struct Readmit : Mutation<registry::Slaves>
+  {
+    Readmit(const SlaveInfo& _info) : info(_info) { CHECK(info.has_id()); }
+
+    virtual Try<registry::Slaves> apply(registry::Slaves slaves)
+    {
+      bool found = false;
+      foreach (const registry::Slave& slave, slaves.slaves()) {
+        if (slave.info().id() == info.id()) {
+          set(true);
+          found = true;
+        }
+      }
+      if (!found) {
+        set(false);
+      }
+      return slaves;
+    }
+
+    const SlaveInfo info;
+  };
+
+  struct Remove : Mutation<registry::Slaves>
+  {
+    Remove(const SlaveInfo& _info) : info(_info) { CHECK(info.has_id()); }
+
+    virtual Try<registry::Slaves> apply(registry::Slaves slaves)
+    {
+      bool removed = false;
+      for (int i = 0; i < slaves.slaves().size(); i++) {
+        const registry::Slave& slave = slaves.slaves(i);
+        if (slave.info().id() == info.id()) {
+          for (int j = i + 1; j < slaves.slaves().size(); j++) {
+            slaves.mutable_slaves()->SwapElements(i, j);
+          }
+          slaves.mutable_slaves()->RemoveLast();
+          removed = true;
+          break;
+        }
+      }
+      if (!removed) {
+        set(false);
+      }
+      return slaves; // May or may not have been mutated.
+    }
+
+    const SlaveInfo info;
+  };
+
+  struct {
+    Option<Variable<registry::Slaves> > variable;
+    std::deque<Mutation<registry::Slaves>*> mutations;
+    bool updating; // Used to signify fetching (recovering) or storing.
+  } slaves;
+
+  // Continuations.
+  Future<bool> _admit(const SlaveID& id, const SlaveInfo& info);
+  Future<bool> _readmit(const SlaveInfo& info);
+  Future<bool> _remove(const SlaveInfo& info);
+
+  // Helper for recovering state (performing fetch).
+  Future<Nothing> recover();
+  void _recover(const Future<Variable<registry::Slaves> >& recovery);
+
+  // Helper for updating state (performing store).
+  void update();
+  Future<bool> _update(const Option<Variable<registry::Slaves> >& variable);
+  void __update();
+
+  State* state;
+
+  // Used to compose our operations with recovery.
+  Promise<Nothing> recovered;
+};
+
+
+Future<Nothing> RegistrarProcess::recover()
+{
+  LOG(INFO) << "Recovering registrar";
+
+  // "Recover" the 'slaves' variable by fetching it from the state.
+  if (slaves.variable.isNone() && !slaves.updating) {
+    state->fetch<registry::Slaves>("slaves")
+      .onAny(defer(self(), &Self::_recover, lambda::_1));
+
+    // TODO(benh): Don't wait forever to recover?
+  }
+
+  // TODO(benh): Recover other variables too.
+
+  return recovered.future();
+}
+
+
+void RegistrarProcess::_recover(
+    const Future<Variable<registry::Slaves> >& recovery)
+{
+  slaves.updating = false;
+
+  CHECK(!recovery.isPending());
+
+  if (recovery.isFailed() || recovery.isDiscarded()) {
+    LOG(WARNING) << "Failed to recover registrar: " << recovery.isFailed()
+      ? recovery.failure() : "future discarded";
+    recover(); // Retry! TODO(benh): Don't retry forever?
+  } else {
+    LOG(INFO) << "Successfully recovered registrar";
+
+    // Save the slaves variable.
+    slaves.variable = recovery.get();
+
+    // Signal the recovery is complete.
+    recovered.set(Nothing());
+  }
+}
+
+
+Future<bool> RegistrarProcess::admit(
+    const SlaveID& id,
+    const SlaveInfo& info)
+{
+  return recover()
+    .then(defer(self(), &Self::_admit, id, info));
+}
+
+
+Future<bool> RegistrarProcess::_admit(
+    const SlaveID& id,
+    const SlaveInfo& info)
+{
+  CHECK_SOME(slaves.variable);
+  Mutation<registry::Slaves>* mutation = new Admit(id, info);
+  slaves.mutations.push_back(mutation);
+  Future<bool> future = mutation->future();
+  if (!slaves.updating) {
+    update();
+  }
+  return future;
+}
+
+
+Future<bool> RegistrarProcess::readmit(const SlaveInfo& info)
+{
+  return recover()
+    .then(defer(self(), &Self::_readmit, info));
+}
+
+
+Future<bool> RegistrarProcess::_readmit(
+    const SlaveInfo& info)
+{
+  CHECK_SOME(slaves.variable);
+
+  if (!info.has_id()) {
+    return Future<bool>::failed("Expecting SlaveInfo to have a SlaveID");
+  }
+
+  Mutation<registry::Slaves>* mutation = new Readmit(info);
+  slaves.mutations.push_back(mutation);
+  Future<bool> future = mutation->future();
+  if (!slaves.updating) {
+    update();
+  }
+  return future;
+}
+
+
+Future<bool> RegistrarProcess::remove(const SlaveInfo& info)
+{
+  return recover()
+    .then(defer(self(), &Self::_remove, info));
+}
+
+
+Future<bool> RegistrarProcess::_remove(
+    const SlaveInfo& info)
+{
+  CHECK_SOME(slaves.variable);
+
+  if (!info.has_id()) {
+    return Future<bool>::failed("Expecting SlaveInfo to have a SlaveID");
+  }
+
+  Mutation<registry::Slaves>* mutation = new Remove(info);
+  slaves.mutations.push_back(mutation);
+  Future<bool> future = mutation->future();
+  if (!slaves.updating) {
+    update();
+  }
+  return future;
+}
+
+
+void RegistrarProcess::update()
+{
+  if (!slaves.mutations.empty()) {
+    CHECK(!slaves.updating);
+
+    slaves.updating = true;
+
+    LOG(INFO) << "Attempting to update 'slaves'";
+
+    CHECK_SOME(slaves.variable);
+
+    Variable<registry::Slaves> variable = slaves.variable.get();
+
+    foreach (Mutation<registry::Slaves>* mutation, slaves.mutations) {
+      Try<registry::Slaves> slaves = mutation->apply(variable.get());
+      if (slaves.isError()) {
+        mutation->fail("Failed to mutate 'slaves': " + slaves.error());
+      } else {
+        Try<Variable<registry::Slaves> > v = variable.mutate(slaves.get());
+        if (v.isError()) {
+          mutation->fail("Failed to mutate 'slaves': " + v.error());
+        } else {
+          variable = v.get();
+        }
+      }
+    }
+
+    // Perform the store! Save the future so we can associate it with
+    // the mutations that are part of this update.
+    Future<bool> future =
+      state->store(variable).then(defer(self(), &Self::_update, lambda::_1));
+
+    // TODO(benh): Add a timeout so we don't wait forever.
+
+    // Toggle 'updating' if the store fails or is discarded.
+    future
+      .onDiscarded(defer(self(), &Self::__update))
+      .onFailed(defer(self(), &Self::__update));
+
+    // Now associate the store with all the mutations.
+    while (!slaves.mutations.empty()) {
+      Mutation<registry::Slaves>* mutation = slaves.mutations.front();
+      slaves.mutations.pop_front();
+      mutation->associate(future); // No-op if already failed above.
+      delete mutation;
+    }
+  }
+}
+
+
+Future<bool> RegistrarProcess::_update(
+    const Option<Variable<registry::Slaves> >& variable)
+{
+  slaves.updating = false;
+
+  if (variable.isNone()) {
+    LOG(WARNING) << "Failed to update 'slaves': version mismatch";
+    return Future<bool>::failed("Failed to update 'slaves': version mismatch");
+  }
+
+  LOG(INFO) << "Successfully updated 'slaves'";
+
+  slaves.variable = variable.get();
+
+  if (!slaves.mutations.empty()) {
+    update();
+  }
+
+  return true;
+}
+
+
+void RegistrarProcess::__update()
+{
+  LOG(WARNING) << "Failed to update 'slaves'";
+  slaves.updating = false;
+}
+
+
+Registrar::Registrar(State* state)
+{
+  process = new RegistrarProcess(state);
+  spawn(process);
+}
+
+
+Registrar::~Registrar()
+{
+  terminate(process);
+  wait(process);
+}
+
+
+Future<bool> Registrar::admit(
+    const SlaveID& id,
+    const SlaveInfo& info)
+{
+  return dispatch(process, &RegistrarProcess::admit, id, info);
+}
+
+
+Future<bool> Registrar::readmit(const SlaveInfo& info)
+{
+  return dispatch(process, &RegistrarProcess::readmit, info);
+}
+
+
+Future<bool> Registrar::remove(const SlaveInfo& info)
+{
+  return dispatch(process, &RegistrarProcess::remove, info);
+}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/7edb0004/src/master/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/master/registrar.hpp b/src/master/registrar.hpp
new file mode 100644
index 0000000..5742c36
--- /dev/null
+++ b/src/master/registrar.hpp
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MASTER_REGISTRAR_HPP__
+#define __MASTER_REGISTRAR_HPP__
+
+#include <mesos/mesos.hpp>
+
+#include <process/future.hpp>
+
+#include "state/protobuf.hpp"
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+// Forward declaration.
+class RegistrarProcess;
+
+class Registrar
+{
+public:
+  Registrar(state::protobuf::State* state);
+  ~Registrar();
+
+  process::Future<bool> admit(const SlaveID& id, const SlaveInfo& info);
+  process::Future<bool> readmit(const SlaveInfo& info);
+  process::Future<bool> remove(const SlaveInfo& info);
+
+private:
+  RegistrarProcess* process;
+};
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __MASTER_REGISTRAR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/7edb0004/src/master/registry.proto
----------------------------------------------------------------------
diff --git a/src/master/registry.proto b/src/master/registry.proto
index 877bfa1..bd85099 100644
--- a/src/master/registry.proto
+++ b/src/master/registry.proto
@@ -27,6 +27,6 @@ message Slave {
 }
 
 
-message Registry {
+message Slaves {
   repeated Slave slaves = 1;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/7edb0004/src/tests/registrar_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
new file mode 100644
index 0000000..51975f5
--- /dev/null
+++ b/src/tests/registrar_tests.cpp
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <map>
+#include <string>
+
+#include <process/gmock.hpp>
+#include <process/gtest.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+
+#include "master/registrar.hpp"
+
+#include "state/leveldb.hpp"
+#include "state/protobuf.hpp"
+#include "state/storage.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+
+using namespace process;
+
+using std::map;
+using std::string;
+
+using testing::_;
+using testing::Eq;
+
+namespace mesos {
+namespace internal {
+namespace master {
+
+class RegistrarTest : public ::testing::Test
+{
+public:
+  RegistrarTest()
+    : storage(NULL),
+      state(NULL),
+      path(os::getcwd() + "/.state") {}
+
+protected:
+  virtual void SetUp()
+  {
+    os::rmdir(path);
+    storage = new state::LevelDBStorage(path);
+    state = new state::protobuf::State(storage);
+  }
+
+  virtual void TearDown()
+  {
+    delete state;
+    delete storage;
+    os::rmdir(path);
+  }
+
+  state::Storage* storage;
+  state::protobuf::State* state;
+
+private:
+  const std::string path;
+};
+
+
+TEST_F(RegistrarTest, admit)
+{
+  Registrar registrar(state);
+
+  SlaveID id1;
+  id1.set_value("1");
+
+  SlaveInfo info1;
+  info1.set_hostname("localhost");
+  info1.mutable_id()->CopyFrom(id1);
+
+  AWAIT_EQ(true, registrar.admit(id1, info1));
+  AWAIT_EQ(false, registrar.admit(id1, info1));
+}
+
+
+TEST_F(RegistrarTest, readmit)
+{
+  Registrar registrar(state);
+
+  SlaveID id1;
+  id1.set_value("1");
+
+  SlaveInfo info1;
+  info1.set_hostname("localhost");
+  info1.mutable_id()->CopyFrom(id1);
+
+  SlaveID id2;
+  id2.set_value("2");
+
+  SlaveInfo info2;
+  info2.set_hostname("localhost");
+  info2.mutable_id()->CopyFrom(id2);
+
+  AWAIT_EQ(true, registrar.admit(id1, info1));
+
+  AWAIT_EQ(true, registrar.readmit(info1));
+
+  AWAIT_EQ(false, registrar.readmit(info2));
+}
+
+
+TEST_F(RegistrarTest, remove)
+{
+  Registrar registrar(state);
+
+  SlaveID id1;
+  id1.set_value("1");
+
+  SlaveInfo info1;
+  info1.set_hostname("localhost");
+  info1.mutable_id()->CopyFrom(id1);
+
+  SlaveID id2;
+  id2.set_value("2");
+
+  SlaveInfo info2;
+  info2.set_hostname("localhost");
+  info2.mutable_id()->CopyFrom(id2);
+
+  AWAIT_EQ(true, registrar.admit(id1, info1));
+
+  AWAIT_EQ(true, registrar.admit(id2, info2));
+
+  AWAIT_EQ(true, registrar.remove(info1));
+
+  AWAIT_EQ(false, registrar.remove(info1));
+
+  AWAIT_EQ(true, registrar.admit(id1, info1));
+
+  AWAIT_EQ(true, registrar.remove(info2));
+}
+
+} // namespace master {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/7edb0004/src/tests/state_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/state_tests.cpp b/src/tests/state_tests.cpp
index efc7315..03c5388 100644
--- a/src/tests/state_tests.cpp
+++ b/src/tests/state_tests.cpp
@@ -51,7 +51,7 @@ using namespace mesos::internal;
 
 using namespace process;
 
-using mesos::internal::registry::Registry;
+using mesos::internal::registry::Slaves;
 using mesos::internal::registry::Slave;
 
 using state::LevelDBStorage;
@@ -66,50 +66,50 @@ using state::protobuf::Variable;
 
 void FetchAndStoreAndFetch(State* state)
 {
-  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves");
   AWAIT_READY(future1);
 
-  Variable<Registry> variable = future1.get();
+  Variable<Slaves> variable = future1.get();
 
-  Registry registry1 = variable.get();
-  EXPECT_TRUE(registry1.slaves().size() == 0);
+  Slaves slaves1 = variable.get();
+  EXPECT_TRUE(slaves1.slaves().size() == 0);
 
-  Slave* slave = registry1.add_slaves();
+  Slave* slave = slaves1.add_slaves();
   slave->mutable_info()->set_hostname("localhost");
 
-  variable = variable.mutate(registry1);
+  variable = variable.mutate(slaves1);
 
-  Future<Option<Variable<Registry> > > future2 = state->store(variable);
+  Future<Option<Variable<Slaves> > > future2 = state->store(variable);
   AWAIT_READY(future2);
   ASSERT_SOME(future2.get());
 
-  future1 = state->fetch<Registry>("registry");
+  future1 = state->fetch<Slaves>("slaves");
   AWAIT_READY(future1);
 
   variable = future1.get();
 
-  Registry registry2 = variable.get();
-  ASSERT_TRUE(registry2.slaves().size() == 1);
-  EXPECT_EQ("localhost", registry2.slaves(0).info().hostname());
+  Slaves slaves2 = variable.get();
+  ASSERT_TRUE(slaves2.slaves().size() == 1);
+  EXPECT_EQ("localhost", slaves2.slaves(0).info().hostname());
 }
 
 
 void FetchAndStoreAndStoreAndFetch(State* state)
 {
-  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves");
   AWAIT_READY(future1);
 
-  Variable<Registry> variable = future1.get();
+  Variable<Slaves> variable = future1.get();
 
-  Registry registry1 = variable.get();
-  EXPECT_TRUE(registry1.slaves().size() == 0);
+  Slaves slaves1 = variable.get();
+  EXPECT_TRUE(slaves1.slaves().size() == 0);
 
-  Slave* slave = registry1.add_slaves();
+  Slave* slave = slaves1.add_slaves();
   slave->mutable_info()->set_hostname("localhost");
 
-  variable = variable.mutate(registry1);
+  variable = variable.mutate(slaves1);
 
-  Future<Option<Variable<Registry> > > future2 = state->store(variable);
+  Future<Option<Variable<Slaves> > > future2 = state->store(variable);
   AWAIT_READY(future2);
   ASSERT_SOME(future2.get());
 
@@ -119,75 +119,75 @@ void FetchAndStoreAndStoreAndFetch(State* state)
   AWAIT_READY(future2);
   ASSERT_SOME(future2.get());
 
-  future1 = state->fetch<Registry>("registry");
+  future1 = state->fetch<Slaves>("slaves");
   AWAIT_READY(future1);
 
   variable = future1.get();
 
-  Registry registry2 = variable.get();
-  ASSERT_TRUE(registry2.slaves().size() == 1);
-  EXPECT_EQ("localhost", registry2.slaves(0).info().hostname());
+  Slaves slaves2 = variable.get();
+  ASSERT_TRUE(slaves2.slaves().size() == 1);
+  EXPECT_EQ("localhost", slaves2.slaves(0).info().hostname());
 }
 
 
 void FetchAndStoreAndStoreFailAndFetch(State* state)
 {
-  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves");
   AWAIT_READY(future1);
 
-  Variable<Registry> variable1 = future1.get();
+  Variable<Slaves> variable1 = future1.get();
 
-  Registry registry1 = variable1.get();
-  EXPECT_TRUE(registry1.slaves().size() == 0);
+  Slaves slaves1 = variable1.get();
+  EXPECT_TRUE(slaves1.slaves().size() == 0);
 
-  Slave* slave1 = registry1.add_slaves();
+  Slave* slave1 = slaves1.add_slaves();
   slave1->mutable_info()->set_hostname("localhost1");
 
-  Variable<Registry> variable2 = variable1.mutate(registry1);
+  Variable<Slaves> variable2 = variable1.mutate(slaves1);
 
-  Future<Option<Variable<Registry> > > future2 = state->store(variable2);
+  Future<Option<Variable<Slaves> > > future2 = state->store(variable2);
   AWAIT_READY(future2);
   ASSERT_SOME(future2.get());
 
-  Registry registry2 = variable1.get();
-  EXPECT_TRUE(registry2.slaves().size() == 0);
+  Slaves slaves2 = variable1.get();
+  EXPECT_TRUE(slaves2.slaves().size() == 0);
 
-  Slave* slave2 = registry2.add_slaves();
+  Slave* slave2 = slaves2.add_slaves();
   slave2->mutable_info()->set_hostname("localhost2");
 
-  variable2 = variable1.mutate(registry2);
+  variable2 = variable1.mutate(slaves2);
 
   future2 = state->store(variable2);
   AWAIT_READY(future2);
   EXPECT_TRUE(future2.get().isNone());
 
-  future1 = state->fetch<Registry>("registry");
+  future1 = state->fetch<Slaves>("slaves");
   AWAIT_READY(future1);
 
   variable1 = future1.get();
 
-  registry1 = variable1.get();
-  ASSERT_TRUE(registry1.slaves().size() == 1);
-  EXPECT_EQ("localhost1", registry1.slaves(0).info().hostname());
+  slaves1 = variable1.get();
+  ASSERT_TRUE(slaves1.slaves().size() == 1);
+  EXPECT_EQ("localhost1", slaves1.slaves(0).info().hostname());
 }
 
 
 void FetchAndStoreAndExpungeAndFetch(State* state)
 {
-  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves");
   AWAIT_READY(future1);
 
-  Variable<Registry> variable = future1.get();
+  Variable<Slaves> variable = future1.get();
 
-  Registry registry1 = variable.get();
-  EXPECT_TRUE(registry1.slaves().size() == 0);
+  Slaves slaves1 = variable.get();
+  EXPECT_TRUE(slaves1.slaves().size() == 0);
 
-  Slave* slave = registry1.add_slaves();
+  Slave* slave = slaves1.add_slaves();
   slave->mutable_info()->set_hostname("localhost");
 
-  variable = variable.mutate(registry1);
+  variable = variable.mutate(slaves1);
 
-  Future<Option<Variable<Registry> > > future2 = state->store(variable);
+  Future<Option<Variable<Slaves> > > future2 = state->store(variable);
   AWAIT_READY(future2);
   ASSERT_SOME(future2.get());
 
@@ -197,32 +197,32 @@ void FetchAndStoreAndExpungeAndFetch(State* state)
   AWAIT_READY(future3);
   ASSERT_TRUE(future3.get());
 
-  future1 = state->fetch<Registry>("registry");
+  future1 = state->fetch<Slaves>("slaves");
   AWAIT_READY(future1);
 
   variable = future1.get();
 
-  Registry registry2 = variable.get();
-  ASSERT_EQ(0, registry2.slaves().size());
+  Slaves slaves2 = variable.get();
+  ASSERT_EQ(0, slaves2.slaves().size());
 }
 
 
 void FetchAndStoreAndExpungeAndExpunge(State* state)
 {
-  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves");
   AWAIT_READY(future1);
 
-  Variable<Registry> variable = future1.get();
+  Variable<Slaves> variable = future1.get();
 
-  Registry registry1 = variable.get();
-  EXPECT_TRUE(registry1.slaves().size() == 0);
+  Slaves slaves1 = variable.get();
+  EXPECT_TRUE(slaves1.slaves().size() == 0);
 
-  Slave* slave = registry1.add_slaves();
+  Slave* slave = slaves1.add_slaves();
   slave->mutable_info()->set_hostname("localhost");
 
-  variable = variable.mutate(registry1);
+  variable = variable.mutate(slaves1);
 
-  Future<Option<Variable<Registry> > > future2 = state->store(variable);
+  Future<Option<Variable<Slaves> > > future2 = state->store(variable);
   AWAIT_READY(future2);
   ASSERT_SOME(future2.get());
 
@@ -240,20 +240,20 @@ void FetchAndStoreAndExpungeAndExpunge(State* state)
 
 void FetchAndStoreAndExpungeAndStoreAndFetch(State* state)
 {
-  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves");
   AWAIT_READY(future1);
 
-  Variable<Registry> variable = future1.get();
+  Variable<Slaves> variable = future1.get();
 
-  Registry registry1 = variable.get();
-  EXPECT_TRUE(registry1.slaves().size() == 0);
+  Slaves slaves1 = variable.get();
+  EXPECT_TRUE(slaves1.slaves().size() == 0);
 
-  Slave* slave = registry1.add_slaves();
+  Slave* slave = slaves1.add_slaves();
   slave->mutable_info()->set_hostname("localhost");
 
-  variable = variable.mutate(registry1);
+  variable = variable.mutate(slaves1);
 
-  Future<Option<Variable<Registry> > > future2 = state->store(variable);
+  Future<Option<Variable<Slaves> > > future2 = state->store(variable);
   AWAIT_READY(future2);
   ASSERT_SOME(future2.get());
 
@@ -267,40 +267,40 @@ void FetchAndStoreAndExpungeAndStoreAndFetch(State* state)
   AWAIT_READY(future2);
   ASSERT_SOME(future2.get());
 
-  future1 = state->fetch<Registry>("registry");
+  future1 = state->fetch<Slaves>("slaves");
   AWAIT_READY(future1);
 
   variable = future1.get();
 
-  Registry registry2 = variable.get();
-  ASSERT_TRUE(registry2.slaves().size() == 1);
-  EXPECT_EQ("localhost", registry2.slaves(0).info().hostname());
+  Slaves slaves2 = variable.get();
+  ASSERT_TRUE(slaves2.slaves().size() == 1);
+  EXPECT_EQ("localhost", slaves2.slaves(0).info().hostname());
 }
 
 
 void Names(State* state)
 {
-  Future<Variable<Registry> > future1 = state->fetch<Registry>("registry");
+  Future<Variable<Slaves> > future1 = state->fetch<Slaves>("slaves");
   AWAIT_READY(future1);
 
-  Variable<Registry> variable = future1.get();
+  Variable<Slaves> variable = future1.get();
 
-  Registry registry1 = variable.get();
-  EXPECT_TRUE(registry1.slaves().size() == 0);
+  Slaves slaves1 = variable.get();
+  EXPECT_TRUE(slaves1.slaves().size() == 0);
 
-  Slave* slave = registry1.add_slaves();
+  Slave* slave = slaves1.add_slaves();
   slave->mutable_info()->set_hostname("localhost");
 
-  variable = variable.mutate(registry1);
+  variable = variable.mutate(slaves1);
 
-  Future<Option<Variable<Registry> > > future2 = state->store(variable);
+  Future<Option<Variable<Slaves> > > future2 = state->store(variable);
   AWAIT_READY(future2);
   ASSERT_SOME(future2.get());
 
   Future<std::vector<std::string> > names = state->names();
   AWAIT_READY(names);
   ASSERT_TRUE(names.get().size() == 1);
-  EXPECT_EQ("registry", names.get()[0]);
+  EXPECT_EQ("slaves", names.get()[0]);
 }
 
 

Reply via email to