This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch 1.7.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 8649cb0950dd63bc850c77f7d3dabbd443ce9c58 Author: Chun-Hung Hsiao <chhs...@mesosphere.io> AuthorDate: Fri Mar 1 12:09:06 2019 -0800 Do not fail a task if it doesn't use resources from a failed provider. `Slave::publishResources` will no longer ask all resource providers to publish all allocated resources. Instead, it only asks those of the task's resources to publish resources, so a failed resource provider would only fail tasks that want to use its resources. Review: https://reviews.apache.org/r/70081 --- src/resource_provider/manager.cpp | 4 +- src/slave/slave.cpp | 102 ++++++++++++++++++++++---------------- src/slave/slave.hpp | 8 +-- 3 files changed, 63 insertions(+), 51 deletions(-) diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp index 3783947..fdac03a 100644 --- a/src/resource_provider/manager.cpp +++ b/src/resource_provider/manager.cpp @@ -894,9 +894,7 @@ void ResourceProviderManagerProcess::updatePublishResourcesStatus( // TODO(jieyu): Consider to include an error message in // 'UpdatePublishResourcesStatus' and surface that to the caller. resourceProvider->publishes.at(uuid)->fail( - "Failed to publish resources for resource provider " + - stringify(resourceProvider->info.id()) + ": Received " + - stringify(update.status()) + " status"); + "Received " + stringify(update.status()) + " status"); } resourceProvider->publishes.erase(uuid); diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp index bfd2bf4..10af517 100644 --- a/src/slave/slave.cpp +++ b/src/slave/slave.cpp @@ -3064,7 +3064,7 @@ void Slave::__run( LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup) << " for executor " << *executor; - publishResources() + publishResources(executor->containerId, executor->allocatedResources()) .then(defer(self(), [=] { return containerizer->update( executor->containerId, @@ -3526,12 +3526,7 @@ void Slave::launchExecutor( << "' of framework " << framework->id(); // Launch the container. - // NOTE: Since we modify the ExecutorInfo to include the task's - // resources when launching the executor, these resources need to be - // published before the containerizer preparing them. This should be - // revisited after MESOS-600. - publishResources( - taskInfo.isSome() ? taskInfo->resources() : Option<Resources>::none()) + publishResources(executor->containerId, resources) .then(defer(self(), [=] { return containerizer->launch( executor->containerId, @@ -4806,7 +4801,7 @@ void Slave::subscribe( } } - publishResources() + publishResources(executor->containerId, executor->allocatedResources()) .then(defer(self(), [=] { return containerizer->update( executor->containerId, @@ -4963,7 +4958,7 @@ void Slave::registerExecutor( } } - publishResources() + publishResources(executor->containerId, executor->allocatedResources()) .then(defer(self(), [=] { return containerizer->update( executor->containerId, @@ -8191,48 +8186,71 @@ void Slave::apply(Operation* operation) Future<Nothing> Slave::publishResources( - const Option<Resources>& additionalResources) + const ContainerID& containerId, const Resources& resources) { - // If the resource provider manager has not been created yet no resource - // providers have been added and we do not need to publish anything. - if (resourceProviderManager == nullptr) { - // We check whether the passed additional resources are compatible - // with the expectation that no resource provider resources are in - // use, yet. This is not an exhaustive consistency check. - if (additionalResources.isSome()) { - foreach (const Resource& resource, additionalResources.get()) { - CHECK(!resource.has_provider_id()) - << "Cannot publish resource provider resources " - << additionalResources.get() - << " until resource providers have subscribed"; - } + hashset<ResourceProviderID> resourceProviderIds; + foreach (const Resource& resource, resources) { + if (resource.has_provider_id()) { + resourceProviderIds.insert(resource.provider_id()); } - - return Nothing(); } - Resources resources; + vector<Future<Nothing>> futures; + foreach (const ResourceProviderID& resourceProviderId, resourceProviderIds) { + auto hasResourceProviderId = [&](const Resource& resource) { + return resource.has_provider_id() && + resource.provider_id() == resourceProviderId; + }; - // NOTE: For resources providers that serve quantity-based resources - // without any identifiers (such as memory), it is very hard to keep - // track of published resources. So instead of implementing diff-based - // resource publishing, we implement an "ensure-all" semantics, and - // always calculate the total resources that need to remain published. - foreachvalue (const Framework* framework, frameworks) { - // NOTE: We do not call `framework->allocatedResource()` here - // because we do not want to publsh resources for pending tasks that - // have not been authorized yet. - foreachvalue (const Executor* executor, framework->executors) { - resources += executor->allocatedResources(); + // NOTE: For resources providers that serve quantity-based resources without + // identifier (such as cpus and mem), we cannot achieve idempotency with + // diff-based resource publishing, so we have to implement the "ensure-all" + // semantics, and always calculate the total resources to publish. + Option<Resources> containerResources; + Resources complementaryResources; + foreachvalue (const Framework* framework, frameworks) { + foreachvalue (const Executor* executor, framework->executors) { + if (executor->containerId == containerId) { + containerResources = resources.filter(hasResourceProviderId); + } else { + complementaryResources += + executor->allocatedResources().filter(hasResourceProviderId); + } + } + } + + if (containerResources.isNone()) { + // NOTE: This actually should not happen, as the callers have already + // ensured the existence of the executor before calling this function + // synchronously. However we still treat this as a nonfatal error since + // this might change in the future. + LOG(WARNING) << "Ignoring publishing resources for container " + << containerId << ": Executor cannot be found"; + + return Nothing(); } - } - if (additionalResources.isSome()) { - resources += additionalResources.get(); + // Since we already have resources from any resource provider in the + // resource pool, the resource provider manager must have been created. + futures.push_back( + CHECK_NOTNULL(resourceProviderManager.get()) + ->publishResources(containerResources.get() + complementaryResources) + .repair([=](const Future<Nothing>& future) -> Future<Nothing> { + // TODO(chhsiao): Consider surfacing the set of published resources + // and only fail if `published - complementaryResources` does not + // contain `containerResources`. + return Failure( + "Failed to publish resources '" + + stringify(containerResources.get()) + "' for container " + + stringify(containerId) + ": " + future.failure()); + })); } - return CHECK_NOTNULL(resourceProviderManager.get()) - ->publishResources(resources); + // NOTE: Resource cleanups (e.g., unpublishing) are not performed at task + // completion, but rather done __lazily__ when necessary. This is not just an + // optimization but required because resource allocations are tied to task + // lifecycles. As a result, no cleanup is needed here if any future fails. + return collect(futures).then([] { return Nothing(); }); } diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp index 28d6590..2d5019d 100644 --- a/src/slave/slave.hpp +++ b/src/slave/slave.hpp @@ -668,13 +668,9 @@ private: void apply(Operation* operation); - // Publish all resources that are needed to run the current set of - // tasks and executors on the agent. - // NOTE: The `additionalResources` parameter is for publishing - // additional task resources when launching executors. Consider - // removing this parameter once we revisited MESOS-600. + // Prepare all resources to be consumed by the specified container. process::Future<Nothing> publishResources( - const Option<Resources>& additionalResources = None()); + const ContainerID& containerId, const Resources& resources); // PullGauge methods. double _frameworks_active()