This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.7.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 8649cb0950dd63bc850c77f7d3dabbd443ce9c58
Author: Chun-Hung Hsiao <chhs...@mesosphere.io>
AuthorDate: Fri Mar 1 12:09:06 2019 -0800

    Do not fail a task if it doesn't use resources from a failed provider.
    
    `Slave::publishResources` will no longer ask all resource providers to
    publish all allocated resources. Instead, it only asks those of the
    task's resources to publish resources, so a failed resource provider
    would only fail tasks that want to use its resources.
    
    Review: https://reviews.apache.org/r/70081
---
 src/resource_provider/manager.cpp |   4 +-
 src/slave/slave.cpp               | 102 ++++++++++++++++++++++----------------
 src/slave/slave.hpp               |   8 +--
 3 files changed, 63 insertions(+), 51 deletions(-)

diff --git a/src/resource_provider/manager.cpp 
b/src/resource_provider/manager.cpp
index 3783947..fdac03a 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -894,9 +894,7 @@ void 
ResourceProviderManagerProcess::updatePublishResourcesStatus(
     // TODO(jieyu): Consider to include an error message in
     // 'UpdatePublishResourcesStatus' and surface that to the caller.
     resourceProvider->publishes.at(uuid)->fail(
-        "Failed to publish resources for resource provider " +
-        stringify(resourceProvider->info.id()) + ": Received " +
-        stringify(update.status()) + " status");
+        "Received " + stringify(update.status()) + " status");
   }
 
   resourceProvider->publishes.erase(uuid);
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index bfd2bf4..10af517 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3064,7 +3064,7 @@ void Slave::__run(
       LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup)
                 << " for executor " << *executor;
 
-      publishResources()
+      publishResources(executor->containerId, executor->allocatedResources())
         .then(defer(self(), [=] {
           return containerizer->update(
               executor->containerId,
@@ -3526,12 +3526,7 @@ void Slave::launchExecutor(
             << "' of framework " << framework->id();
 
   // Launch the container.
-  // NOTE: Since we modify the ExecutorInfo to include the task's
-  // resources when launching the executor, these resources need to be
-  // published before the containerizer preparing them. This should be
-  // revisited after MESOS-600.
-  publishResources(
-      taskInfo.isSome() ? taskInfo->resources() : Option<Resources>::none())
+  publishResources(executor->containerId, resources)
     .then(defer(self(), [=] {
       return containerizer->launch(
           executor->containerId,
@@ -4806,7 +4801,7 @@ void Slave::subscribe(
         }
       }
 
-      publishResources()
+      publishResources(executor->containerId, executor->allocatedResources())
         .then(defer(self(), [=] {
           return containerizer->update(
               executor->containerId,
@@ -4963,7 +4958,7 @@ void Slave::registerExecutor(
         }
       }
 
-      publishResources()
+      publishResources(executor->containerId, executor->allocatedResources())
         .then(defer(self(), [=] {
           return containerizer->update(
               executor->containerId,
@@ -8191,48 +8186,71 @@ void Slave::apply(Operation* operation)
 
 
 Future<Nothing> Slave::publishResources(
-    const Option<Resources>& additionalResources)
+    const ContainerID& containerId, const Resources& resources)
 {
-  // If the resource provider manager has not been created yet no resource
-  // providers have been added and we do not need to publish anything.
-  if (resourceProviderManager == nullptr) {
-    // We check whether the passed additional resources are compatible
-    // with the expectation that no resource provider resources are in
-    // use, yet. This is not an exhaustive consistency check.
-    if (additionalResources.isSome()) {
-      foreach (const Resource& resource, additionalResources.get()) {
-        CHECK(!resource.has_provider_id())
-          << "Cannot publish resource provider resources "
-          << additionalResources.get()
-          << " until resource providers have subscribed";
-      }
+  hashset<ResourceProviderID> resourceProviderIds;
+  foreach (const Resource& resource, resources) {
+    if (resource.has_provider_id()) {
+      resourceProviderIds.insert(resource.provider_id());
     }
-
-    return Nothing();
   }
 
-  Resources resources;
+  vector<Future<Nothing>> futures;
+  foreach (const ResourceProviderID& resourceProviderId, resourceProviderIds) {
+    auto hasResourceProviderId = [&](const Resource& resource) {
+      return resource.has_provider_id() &&
+             resource.provider_id() == resourceProviderId;
+    };
 
-  // NOTE: For resources providers that serve quantity-based resources
-  // without any identifiers (such as memory), it is very hard to keep
-  // track of published resources. So instead of implementing diff-based
-  // resource publishing, we implement an "ensure-all" semantics, and
-  // always calculate the total resources that need to remain published.
-  foreachvalue (const Framework* framework, frameworks) {
-    // NOTE: We do not call `framework->allocatedResource()` here
-    // because we do not want to publsh resources for pending tasks that
-    // have not been authorized yet.
-    foreachvalue (const Executor* executor, framework->executors) {
-      resources += executor->allocatedResources();
+    // NOTE: For resources providers that serve quantity-based resources 
without
+    // identifier (such as cpus and mem), we cannot achieve idempotency with
+    // diff-based resource publishing, so we have to implement the "ensure-all"
+    // semantics, and always calculate the total resources to publish.
+    Option<Resources> containerResources;
+    Resources complementaryResources;
+    foreachvalue (const Framework* framework, frameworks) {
+      foreachvalue (const Executor* executor, framework->executors) {
+        if (executor->containerId == containerId) {
+          containerResources = resources.filter(hasResourceProviderId);
+        } else {
+          complementaryResources +=
+            executor->allocatedResources().filter(hasResourceProviderId);
+        }
+      }
+    }
+
+    if (containerResources.isNone()) {
+      // NOTE: This actually should not happen, as the callers have already
+      // ensured the existence of the executor before calling this function
+      // synchronously. However we still treat this as a nonfatal error since
+      // this might change in the future.
+      LOG(WARNING) << "Ignoring publishing resources for container "
+                   << containerId << ": Executor cannot be found";
+
+      return Nothing();
     }
-  }
 
-  if (additionalResources.isSome()) {
-    resources += additionalResources.get();
+    // Since we already have resources from any resource provider in the
+    // resource pool, the resource provider manager must have been created.
+    futures.push_back(
+        CHECK_NOTNULL(resourceProviderManager.get())
+          ->publishResources(containerResources.get() + complementaryResources)
+          .repair([=](const Future<Nothing>& future) -> Future<Nothing> {
+            // TODO(chhsiao): Consider surfacing the set of published resources
+            // and only fail if `published - complementaryResources` does not
+            // contain `containerResources`.
+            return Failure(
+                "Failed to publish resources '" +
+                stringify(containerResources.get()) + "' for container " +
+                stringify(containerId) + ": " + future.failure());
+          }));
   }
 
-  return CHECK_NOTNULL(resourceProviderManager.get())
-    ->publishResources(resources);
+  // NOTE: Resource cleanups (e.g., unpublishing) are not performed at task
+  // completion, but rather done __lazily__ when necessary. This is not just an
+  // optimization but required because resource allocations are tied to task
+  // lifecycles. As a result, no cleanup is needed here if any future fails.
+  return collect(futures).then([] { return Nothing(); });
 }
 
 
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 28d6590..2d5019d 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -668,13 +668,9 @@ private:
 
   void apply(Operation* operation);
 
-  // Publish all resources that are needed to run the current set of
-  // tasks and executors on the agent.
-  // NOTE: The `additionalResources` parameter is for publishing
-  // additional task resources when launching executors. Consider
-  // removing this parameter once we revisited MESOS-600.
+  // Prepare all resources to be consumed by the specified container.
   process::Future<Nothing> publishResources(
-      const Option<Resources>& additionalResources = None());
+      const ContainerID& containerId, const Resources& resources);
 
   // PullGauge methods.
   double _frameworks_active()

Reply via email to