Added the initial implementation for applying offer operations. The resource provider manager provides an `applyOfferOperation` method for offer operation affecting resource providers. The resources on which the operation should be applied contains a resource provider ID. This will be extracted and an event will be sent to the respective resource provider.
(This is based on https://reviews.apache.org/r/61810) Review: https://reviews.apache.org/r/63480 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/97062ac8 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/97062ac8 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/97062ac8 Branch: refs/heads/master Commit: 97062ac861b2642d6a882226b767f3ccd1a3c1db Parents: 9cdac39 Author: Jan Schlicht <j...@mesosphere.io> Authored: Mon Oct 30 13:57:09 2017 +0100 Committer: Jie Yu <yujie....@gmail.com> Committed: Mon Nov 6 14:37:26 2017 -0800 ---------------------------------------------------------------------- src/messages/messages.proto | 4 +- src/resource_provider/manager.cpp | 128 +++++++++++++++++++++++++++------ src/resource_provider/manager.hpp | 4 ++ src/slave/slave.cpp | 34 ++++++++- 4 files changed, 148 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/messages/messages.proto ---------------------------------------------------------------------- diff --git a/src/messages/messages.proto b/src/messages/messages.proto index 2fbca22..1610c2b 100644 --- a/src/messages/messages.proto +++ b/src/messages/messages.proto @@ -705,7 +705,9 @@ message OfferOperationStatusUpdate { /** - * This message is sent from the master to the agent to apply an offer + * This message is sent from the master to the resource provider + * manager (either on the agent for local resource providers, or on + * the master for external resource providers) to apply an offer * operation. * * See resource_provider::Event::OPERATION. http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/resource_provider/manager.cpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp index 11f8901..a878507 100644 --- a/src/resource_provider/manager.cpp +++ b/src/resource_provider/manager.cpp @@ -37,6 +37,7 @@ #include "common/http.hpp" #include "common/recordio.hpp" +#include "common/resources_utils.hpp" #include "internal/devolve.hpp" #include "internal/evolve.hpp" @@ -52,7 +53,6 @@ using mesos::resource_provider::Event; using process::Failure; using process::Future; -using process::Owned; using process::Process; using process::ProcessBase; using process::Queue; @@ -140,9 +140,9 @@ public: const http::Request& request, const Option<Principal>& principal); - Queue<ResourceProviderMessage> messages; + void applyOfferOperation(const ApplyOfferOperationMessage& message); - hashmap<ResourceProviderID, ResourceProvider> resourceProviders; + Queue<ResourceProviderMessage> messages; private: void subscribe( @@ -158,6 +158,11 @@ private: const Call::UpdateState& update); ResourceProviderID newResourceProviderId(); + + struct ResourceProviders + { + hashmap<ResourceProviderID, ResourceProvider> subscribed; + } resourceProviders; }; @@ -254,11 +259,12 @@ Future<http::Response> ResourceProviderManagerProcess::api( return ok; } - if (!resourceProviders.contains(call.resource_provider_id())) { - return BadRequest("Resource provider cannot be found"); + if (!resourceProviders.subscribed.contains(call.resource_provider_id())) { + return BadRequest("Resource provider is not subscribed"); } - auto resourceProvider = resourceProviders.at(call.resource_provider_id()); + ResourceProvider& resourceProvider = + resourceProviders.subscribed.at(call.resource_provider_id()); // This isn't a `SUBSCRIBE` call, so the request should include a stream ID. if (!request.headers.contains("Mesos-Stream-Id")) { @@ -302,6 +308,69 @@ Future<http::Response> ResourceProviderManagerProcess::api( } +void ResourceProviderManagerProcess::applyOfferOperation( + const ApplyOfferOperationMessage& message) +{ + const Offer::Operation& operation = message.operation_info(); + const FrameworkID& frameworkId = message.framework_id(); + + Try<UUID> uuid = UUID::fromBytes(message.operation_uuid()); + if (uuid.isError()) { + LOG(ERROR) << "Failed to parse offer operation UUID for operation " + << "'" << operation.id() << "' from framework " + << frameworkId << ": " << uuid.error(); + return; + } + + Result<ResourceProviderID> resourceProviderId = + getResourceProviderId(operation); + + if (!resourceProviderId.isSome()) { + LOG(ERROR) << "Failed to get the resource provider ID of operation " + << "'" << operation.id() << "' (uuid: " << uuid->toString() + << ") from framework " << frameworkId << ": " + << (resourceProviderId.isError() ? resourceProviderId.error() + : "Not found"); + return; + } + + if (!resourceProviders.subscribed.contains(resourceProviderId.get())) { + LOG(WARNING) << "Dropping operation '" << operation.id() << "' (uuid: " + << uuid.get() << ") from framework " << frameworkId + << " because resource provider " << resourceProviderId.get() + << " is not subscribed"; + return; + } + + ResourceProvider& resourceProvider = + resourceProviders.subscribed.at(resourceProviderId.get()); + + CHECK(message.resource_version_uuid().has_resource_provider_id()); + + CHECK_EQ(message.resource_version_uuid().resource_provider_id(), + resourceProviderId.get()) + << "Resource provider ID " + << message.resource_version_uuid().resource_provider_id() + << " in resource version UUID does not match that in the operation " + << resourceProviderId.get(); + + Event event; + event.set_type(Event::OPERATION); + event.mutable_operation()->mutable_framework_id()->CopyFrom(frameworkId); + event.mutable_operation()->mutable_info()->CopyFrom(operation); + event.mutable_operation()->set_operation_uuid(message.operation_uuid()); + event.mutable_operation()->set_resource_version_uuid( + message.resource_version_uuid().uuid()); + + if (!resourceProvider.http.send(event)) { + LOG(WARNING) << "Failed to send operation '" << operation.id() << "' " + << "(uuid: " << uuid.get() << ") from framework " + << frameworkId << " to resource provider " + << resourceProviderId.get() << ": connection closed"; + } +} + + void ResourceProviderManagerProcess::subscribe( const HttpConnection& http, const Call::Subscribe& subscribe) @@ -309,27 +378,36 @@ void ResourceProviderManagerProcess::subscribe( ResourceProviderInfo resourceProviderInfo = subscribe.resource_provider_info(); - // TODO(chhsiao): Reject the subscription if it contains an unknown ID - // or there is already a subscribed instance with the same ID, and add - // tests for re-subscriptions. + LOG(INFO) << "Subscribing resource provider " << resourceProviderInfo; + if (!resourceProviderInfo.has_id()) { + // The resource provider is subscribing for the first time. resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId()); - } - ResourceProvider resourceProvider(resourceProviderInfo, http); + ResourceProvider resourceProvider(resourceProviderInfo, http); - Event event; - event.set_type(Event::SUBSCRIBED); - event.mutable_subscribed()->mutable_provider_id()->CopyFrom( - resourceProvider.info.id()); + Event event; + event.set_type(Event::SUBSCRIBED); + event.mutable_subscribed()->mutable_provider_id()->CopyFrom( + resourceProvider.info.id()); - if (!resourceProvider.http.send(event)) { - LOG(WARNING) << "Unable to send event to resource provider " - << stringify(resourceProvider.info.id()) - << ": connection closed"; + if (!resourceProvider.http.send(event)) { + LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider " + << resourceProvider.info.id() << ": connection closed"; + } + + // TODO(jieyu): Start heartbeat for the resource provider. + + resourceProviders.subscribed.put( + resourceProviderInfo.id(), + resourceProvider); + + return; } - resourceProviders.put(resourceProviderInfo.id(), std::move(resourceProvider)); + // TODO(chhsiao): Reject the subscription if it contains an unknown + // ID or there is already a subscribed instance with the same ID, + // and add tests for re-subscriptions. } @@ -402,6 +480,16 @@ Future<http::Response> ResourceProviderManager::api( } +void ResourceProviderManager::applyOfferOperation( + const ApplyOfferOperationMessage& message) const +{ + return dispatch( + process.get(), + &ResourceProviderManagerProcess::applyOfferOperation, + message); +} + + Queue<ResourceProviderMessage> ResourceProviderManager::messages() const { return process->messages; http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/resource_provider/manager.hpp ---------------------------------------------------------------------- diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp index 3b70e75..e7a9a6c 100644 --- a/src/resource_provider/manager.hpp +++ b/src/resource_provider/manager.hpp @@ -23,6 +23,8 @@ #include <process/owned.hpp> #include <process/queue.hpp> +#include "messages/messages.hpp" + #include "resource_provider/message.hpp" namespace mesos { @@ -49,6 +51,8 @@ public: const process::http::Request& request, const Option<process::http::authentication::Principal>& principal) const; + void applyOfferOperation(const ApplyOfferOperationMessage& message) const; + // Returns a stream of messages from the resource provider manager. process::Queue<ResourceProviderMessage> messages() const; http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/slave/slave.cpp ---------------------------------------------------------------------- diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index 6cbe209..c108239 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -3626,7 +3626,39 @@ Try<Nothing> Slave::syncCheckpointedResources( void Slave::applyOfferOperation(const ApplyOfferOperationMessage& message) { - // TODO(nfnt): Provide implementation here. + Try<UUID> uuid = UUID::fromBytes(message.operation_uuid()); + if (uuid.isError()) { + LOG(ERROR) << "Failed to parse offer operation UUID for operation " + << "'" << message.operation_info().id() << "' " + << "from framework " << message.framework_id() + << ": " << uuid.error(); + return; + } + + Result<ResourceProviderID> resourceProviderId = + getResourceProviderId(message.operation_info()); + + if (resourceProviderId.isError()) { + LOG(ERROR) << "Failed to get the resource provider ID of operation " + << "'" << message.operation_info().id() << "' " + << "(uuid: " << uuid->toString() << ") from framework " + << message.framework_id() << ": " << resourceProviderId.error(); + return; + } + + if (resourceProviderId.isSome()) { + resourceProviderManager.applyOfferOperation(message); + return; + } + + // TODO(jieyu): Handle operations for agent default resources. To + // support rollback, the agent need to checkpoint the total + // resources using the old format (i.e., using `resources.info`). + // It's OK that the offer operations are not checkpointed atomically + // with the total resources for agent default resources. This is + // because the master does not rely on operation feedback to update + // the allocation for old operations, and agent default resources + // only support old operations. }