Repository: mesos Updated Branches: refs/heads/master 1c6d9e5e6 -> d4a903a4a
Delayed construction of the agent's resource provider manager. By delaying the construction of the agent's resource provider manager we prepare for a following patch introducing a dependency of the resource provider manager on the agent's ID. Depending on whether the agent was able to recover an agent ID from its log or still needs to obtain one in a first registration with the master, we can only construct the resource provider manager after different points in the initialization of the agent. To capture the common code we introduce a helper function performing the necessary steps. Review: https://reviews.apache.org/r/66308/ Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6850353e Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6850353e Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6850353e Branch: refs/heads/master Commit: 6850353e4146ae82de2a2b98d3ce77c5328f3b26 Parents: 1c6d9e5 Author: Benjamin Bannier <benjamin.bann...@mesosphere.io> Authored: Tue May 1 13:08:48 2018 -0700 Committer: Chun-Hung Hsiao <chhs...@mesosphere.io> Committed: Tue May 1 13:08:48 2018 -0700 ---------------------------------------------------------------------- src/slave/slave.cpp | 94 +++++++++++++++++----- src/slave/slave.hpp | 6 +- src/tests/resource_provider_manager_tests.cpp | 9 ++- 3 files changed, 87 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/6850353e/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index d0ff5f8..d313777 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -30,6 +30,8 @@ #include <utility> #include <vector> +#include <glog/logging.h> + #include <mesos/type_utils.hpp> #include <mesos/authentication/secret_generator.hpp> @@ -772,15 +774,20 @@ void Slave::initialize() logRequest(request); return http.executor(request, principal); }); + route( + "/api/v1/resource_provider", + READWRITE_HTTP_AUTHENTICATION_REALM, + Http::RESOURCE_PROVIDER_HELP(), + [this](const http::Request& request, const Option<Principal>& principal) + -> Future<http::Response> { + logRequest(request); + + if (resourceProviderManager.get() == nullptr) { + return http::ServiceUnavailable(); + } - route("/api/v1/resource_provider", - READWRITE_HTTP_AUTHENTICATION_REALM, - Http::RESOURCE_PROVIDER_HELP(), - [this](const http::Request& request, - const Option<Principal>& principal) { - logRequest(request); - return resourceProviderManager.api(request, principal); - }); + return resourceProviderManager->api(request, principal); + }); // TODO(ijimenez): Remove this endpoint at the end of the // deprecation cycle on 0.26. @@ -1502,6 +1509,8 @@ void Slave::registered( CHECK_SOME(state::checkpoint(path, info)); + initializeResourceProviderManager(flags, info.id()); + // We start the local resource providers daemon once the agent is // running, so the resource providers can use the agent API. localResourceProviderDaemon->start(info.id()); @@ -4344,7 +4353,7 @@ void Slave::applyOperation(const ApplyOperationMessage& message) } if (resourceProviderId.isSome()) { - resourceProviderManager.applyOperation(message); + CHECK_NOTNULL(resourceProviderManager.get())->applyOperation(message); return; } @@ -4417,7 +4426,7 @@ void Slave::reconcileOperations(const ReconcileOperationsMessage& message) } if (containsResourceProviderOperations) { - resourceProviderManager.reconcileOperations(message); + CHECK_NOTNULL(resourceProviderManager.get())->reconcileOperations(message); } } @@ -4549,7 +4558,19 @@ void Slave::operationStatusAcknowledgement( { Operation* operation = getOperation(acknowledgement.operation_uuid()); if (operation != nullptr) { - resourceProviderManager.acknowledgeOperationStatus(acknowledgement); + // If the operation was on resource provider resources forward the + // acknowledgement to the resource provider manager as well. + Result<ResourceProviderID> resourceProviderId = + getResourceProviderId(operation->info()); + + CHECK(!resourceProviderId.isError()) + << "Could not determine resource provider of operation " << operation + << ": " << resourceProviderId.error(); + + if (resourceProviderId.isSome()) { + CHECK_NOTNULL(resourceProviderManager.get()) + ->acknowledgeOperationStatus(acknowledgement); + } CHECK(operation->statuses_size() > 0); if (protobuf::isTerminalState( @@ -7319,10 +7340,8 @@ void Slave::__recover(const Future<Nothing>& future) detection = detector->detect() .onAny(defer(self(), &Slave::detected, lambda::_1)); - if (capabilities.resourceProvider) { - // Start listening for messages from the resource provider manager. - resourceProviderManager.messages().get().onAny( - defer(self(), &Self::handleResourceProviderMessage, lambda::_1)); + if (info.has_id()) { + initializeResourceProviderManager(flags, info.id()); } // Forward oversubscribed resources. @@ -7600,7 +7619,7 @@ void Slave::handleResourceProviderMessage( << (message.isFailed() ? message.failure() : "future discarded"); // Wait for the next message. - resourceProviderManager.messages().get() + CHECK_NOTNULL(resourceProviderManager.get())->messages().get() .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1)); return; @@ -7859,7 +7878,7 @@ void Slave::handleResourceProviderMessage( } // Wait for the next message. - resourceProviderManager.messages().get() + CHECK_NOTNULL(resourceProviderManager.get())->messages().get() .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1)); } @@ -8114,6 +8133,24 @@ void Slave::apply(Operation* operation) Future<Nothing> Slave::publishResources( const Option<Resources>& additionalResources) { + // If the resource provider manager has not been created yet no resource + // providers have been added and we do not need to publish anything. + if (resourceProviderManager == nullptr) { + // We check whether the passed additional resources are compatible + // with the expectation that no resource provider resources are in + // use, yet. This is not an exhaustive consistency check. + if (additionalResources.isSome()) { + foreach (const Resource& resource, additionalResources.get()) { + CHECK(!resource.has_provider_id()) + << "Cannot publish resource provider resources " + << additionalResources.get() + << " until resource providers have subscribed"; + } + } + + return Nothing(); + } + Resources resources; // NOTE: For resources providers that serve quantity-based resources @@ -8134,7 +8171,8 @@ Future<Nothing> Slave::publishResources( resources += additionalResources.get(); } - return resourceProviderManager.publishResources(resources); + return CHECK_NOTNULL(resourceProviderManager.get()) + ->publishResources(resources); } @@ -8754,6 +8792,26 @@ double Slave::_resources_revocable_percent(const string& name) } +void Slave::initializeResourceProviderManager( + const Flags& flags, + const SlaveID& slaveId) +{ + // To simplify reasoning about lifetimes we do not allow + // reinitialization of the resource provider manager. + if (resourceProviderManager.get() != nullptr) { + return; + } + + resourceProviderManager.reset(new ResourceProviderManager()); + + if (capabilities.resourceProvider) { + // Start listening for messages from the resource provider manager. + resourceProviderManager->messages().get().onAny( + defer(self(), &Self::handleResourceProviderMessage, lambda::_1)); + } +} + + Framework::Framework( Slave* _slave, const Flags& slaveFlags, http://git-wip-us.apache.org/repos/asf/mesos/blob/6850353e/src/slave/slave.hpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index fb911ef..4a3d014 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -711,6 +711,10 @@ private: const SlaveInfo& previous, const SlaveInfo& current) const; + void initializeResourceProviderManager( + const Flags& flags, + const SlaveID& slaveId); + protobuf::master::Capabilities requiredMasterCapabilities; const Flags flags; @@ -812,7 +816,7 @@ private: // (allocated and oversubscribable) resources. Option<Resources> oversubscribedResources; - ResourceProviderManager resourceProviderManager; + process::Owned<ResourceProviderManager> resourceProviderManager; process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon; // Local resource providers known by the agent. http://git-wip-us.apache.org/repos/asf/mesos/blob/6850353e/src/tests/resource_provider_manager_tests.cpp ---------------------------------------------------------------------- diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp index c52541b..0de4e79 100644 --- a/src/tests/resource_provider_manager_tests.cpp +++ b/src/tests/resource_provider_manager_tests.cpp @@ -755,14 +755,17 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint) Try<Owned<cluster::Master>> master = StartMaster(); ASSERT_SOME(master); - Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover); - Owned<MasterDetector> detector = master.get()->createDetector(); + // For the agent's resource provider manager to start, + // the agent needs to have been assigned an agent ID. + Future<SlaveRegisteredMessage> slaveRegisteredMessage = + FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _); + Try<Owned<cluster::Slave>> agent = StartSlave(detector.get()); ASSERT_SOME(agent); - AWAIT_READY(__recover); + AWAIT_READY(slaveRegisteredMessage); // Wait for recovery to be complete. Clock::pause();