Repository: mesos
Updated Branches:
  refs/heads/master 1c6d9e5e6 -> d4a903a4a


Delayed construction of the agent's resource provider manager.

By delaying the construction of the agent's resource provider manager
we prepare for a following patch introducing a dependency of the
resource provider manager on the agent's ID.

Depending on whether the agent was able to recover an agent ID from
its log or still needs to obtain one in a first registration with the
master, we can only construct the resource provider manager after
different points in the initialization of the agent. To capture the
common code we introduce a helper function performing the necessary
steps.

Review: https://reviews.apache.org/r/66308/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6850353e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6850353e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6850353e

Branch: refs/heads/master
Commit: 6850353e4146ae82de2a2b98d3ce77c5328f3b26
Parents: 1c6d9e5
Author: Benjamin Bannier <benjamin.bann...@mesosphere.io>
Authored: Tue May 1 13:08:48 2018 -0700
Committer: Chun-Hung Hsiao <chhs...@mesosphere.io>
Committed: Tue May 1 13:08:48 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                           | 94 +++++++++++++++++-----
 src/slave/slave.hpp                           |  6 +-
 src/tests/resource_provider_manager_tests.cpp |  9 ++-
 3 files changed, 87 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6850353e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d0ff5f8..d313777 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -30,6 +30,8 @@
 #include <utility>
 #include <vector>
 
+#include <glog/logging.h>
+
 #include <mesos/type_utils.hpp>
 
 #include <mesos/authentication/secret_generator.hpp>
@@ -772,15 +774,20 @@ void Slave::initialize()
           logRequest(request);
           return http.executor(request, principal);
         });
+  route(
+      "/api/v1/resource_provider",
+      READWRITE_HTTP_AUTHENTICATION_REALM,
+      Http::RESOURCE_PROVIDER_HELP(),
+      [this](const http::Request& request, const Option<Principal>& principal)
+        -> Future<http::Response> {
+        logRequest(request);
+
+        if (resourceProviderManager.get() == nullptr) {
+          return http::ServiceUnavailable();
+        }
 
-  route("/api/v1/resource_provider",
-        READWRITE_HTTP_AUTHENTICATION_REALM,
-        Http::RESOURCE_PROVIDER_HELP(),
-        [this](const http::Request& request,
-               const Option<Principal>& principal) {
-          logRequest(request);
-          return resourceProviderManager.api(request, principal);
-        });
+        return resourceProviderManager->api(request, principal);
+      });
 
   // TODO(ijimenez): Remove this endpoint at the end of the
   // deprecation cycle on 0.26.
@@ -1502,6 +1509,8 @@ void Slave::registered(
 
       CHECK_SOME(state::checkpoint(path, info));
 
+      initializeResourceProviderManager(flags, info.id());
+
       // We start the local resource providers daemon once the agent is
       // running, so the resource providers can use the agent API.
       localResourceProviderDaemon->start(info.id());
@@ -4344,7 +4353,7 @@ void Slave::applyOperation(const ApplyOperationMessage& 
message)
   }
 
   if (resourceProviderId.isSome()) {
-    resourceProviderManager.applyOperation(message);
+    CHECK_NOTNULL(resourceProviderManager.get())->applyOperation(message);
     return;
   }
 
@@ -4417,7 +4426,7 @@ void Slave::reconcileOperations(const 
ReconcileOperationsMessage& message)
   }
 
   if (containsResourceProviderOperations) {
-    resourceProviderManager.reconcileOperations(message);
+    CHECK_NOTNULL(resourceProviderManager.get())->reconcileOperations(message);
   }
 }
 
@@ -4549,7 +4558,19 @@ void Slave::operationStatusAcknowledgement(
 {
   Operation* operation = getOperation(acknowledgement.operation_uuid());
   if (operation != nullptr) {
-    resourceProviderManager.acknowledgeOperationStatus(acknowledgement);
+    // If the operation was on resource provider resources forward the
+    // acknowledgement to the resource provider manager as well.
+    Result<ResourceProviderID> resourceProviderId =
+      getResourceProviderId(operation->info());
+
+    CHECK(!resourceProviderId.isError())
+      << "Could not determine resource provider of operation " << operation
+      << ": " << resourceProviderId.error();
+
+    if (resourceProviderId.isSome()) {
+      CHECK_NOTNULL(resourceProviderManager.get())
+        ->acknowledgeOperationStatus(acknowledgement);
+    }
 
     CHECK(operation->statuses_size() > 0);
     if (protobuf::isTerminalState(
@@ -7319,10 +7340,8 @@ void Slave::__recover(const Future<Nothing>& future)
     detection = detector->detect()
       .onAny(defer(self(), &Slave::detected, lambda::_1));
 
-    if (capabilities.resourceProvider) {
-      // Start listening for messages from the resource provider manager.
-      resourceProviderManager.messages().get().onAny(
-          defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+    if (info.has_id()) {
+      initializeResourceProviderManager(flags, info.id());
     }
 
     // Forward oversubscribed resources.
@@ -7600,7 +7619,7 @@ void Slave::handleResourceProviderMessage(
                << (message.isFailed() ? message.failure() : "future 
discarded");
 
     // Wait for the next message.
-    resourceProviderManager.messages().get()
+    CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
       .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
 
     return;
@@ -7859,7 +7878,7 @@ void Slave::handleResourceProviderMessage(
   }
 
   // Wait for the next message.
-  resourceProviderManager.messages().get()
+  CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
     .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
 }
 
@@ -8114,6 +8133,24 @@ void Slave::apply(Operation* operation)
 Future<Nothing> Slave::publishResources(
     const Option<Resources>& additionalResources)
 {
+  // If the resource provider manager has not been created yet no resource
+  // providers have been added and we do not need to publish anything.
+  if (resourceProviderManager == nullptr) {
+    // We check whether the passed additional resources are compatible
+    // with the expectation that no resource provider resources are in
+    // use, yet. This is not an exhaustive consistency check.
+    if (additionalResources.isSome()) {
+      foreach (const Resource& resource, additionalResources.get()) {
+        CHECK(!resource.has_provider_id())
+          << "Cannot publish resource provider resources "
+          << additionalResources.get()
+          << " until resource providers have subscribed";
+      }
+    }
+
+    return Nothing();
+  }
+
   Resources resources;
 
   // NOTE: For resources providers that serve quantity-based resources
@@ -8134,7 +8171,8 @@ Future<Nothing> Slave::publishResources(
     resources += additionalResources.get();
   }
 
-  return resourceProviderManager.publishResources(resources);
+  return CHECK_NOTNULL(resourceProviderManager.get())
+    ->publishResources(resources);
 }
 
 
@@ -8754,6 +8792,26 @@ double Slave::_resources_revocable_percent(const string& 
name)
 }
 
 
+void Slave::initializeResourceProviderManager(
+    const Flags& flags,
+    const SlaveID& slaveId)
+{
+  // To simplify reasoning about lifetimes we do not allow
+  // reinitialization of the resource provider manager.
+  if (resourceProviderManager.get() != nullptr) {
+    return;
+  }
+
+  resourceProviderManager.reset(new ResourceProviderManager());
+
+  if (capabilities.resourceProvider) {
+    // Start listening for messages from the resource provider manager.
+    resourceProviderManager->messages().get().onAny(
+        defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+  }
+}
+
+
 Framework::Framework(
     Slave* _slave,
     const Flags& slaveFlags,

http://git-wip-us.apache.org/repos/asf/mesos/blob/6850353e/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index fb911ef..4a3d014 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -711,6 +711,10 @@ private:
       const SlaveInfo& previous,
       const SlaveInfo& current) const;
 
+  void initializeResourceProviderManager(
+      const Flags& flags,
+      const SlaveID& slaveId);
+
   protobuf::master::Capabilities requiredMasterCapabilities;
 
   const Flags flags;
@@ -812,7 +816,7 @@ private:
   // (allocated and oversubscribable) resources.
   Option<Resources> oversubscribedResources;
 
-  ResourceProviderManager resourceProviderManager;
+  process::Owned<ResourceProviderManager> resourceProviderManager;
   process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
 
   // Local resource providers known by the agent.

http://git-wip-us.apache.org/repos/asf/mesos/blob/6850353e/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp 
b/src/tests/resource_provider_manager_tests.cpp
index c52541b..0de4e79 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -755,14 +755,17 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
-
   Owned<MasterDetector> detector = master.get()->createDetector();
 
+  // For the agent's resource provider manager to start,
+  // the agent needs to have been assigned an agent ID.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
   Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
   ASSERT_SOME(agent);
 
-  AWAIT_READY(__recover);
+  AWAIT_READY(slaveRegisteredMessage);
 
   // Wait for recovery to be complete.
   Clock::pause();

Reply via email to