This is an automated email from the ASF dual-hosted git repository.

asekretenko pushed a commit to branch 1.8.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit cd49d821de12a3d2a237ae57844878b649b50295
Author: Andrei Sekretenko <asekrete...@apache.org>
AuthorDate: Thu Aug 20 17:02:51 2020 +0200

    Deduplicated concurrent image pulls by docker store.
    
    This patch makes the docker store reuse a pending pull if asked for
    an image that is already being pulled.
    
    The pull caching follows the same approach as the earlier attempt in
    https://reviews.apache.org/r/39331 . However, handing out futures to
    the store callers and handling their discards are performed differently,
    using a form of reference counting, so that the pull itself is discarded
    only if all the futures returned by `Store::get()` have been discarded.
    
    Review: https://reviews.apache.org/r/72790
---
 .../mesos/provisioner/docker/store.cpp             | 110 ++++++++++++++++-----
 1 file changed, 85 insertions(+), 25 deletions(-)

diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.cpp 
b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
index bf2be90..bbba8cf 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/store.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
@@ -143,6 +143,26 @@ private:
   Owned<MetadataManager> metadataManager;
   Owned<Puller> puller;
 
+  // The get() method deduplicates image pulls by caching a single Future
+  // per each initiated image pull, handing out `undiscardable(pull)` to the
+  // caller (to prevent discards by the callers from propagating and discarding
+  // the pull) and tracking the number of the handed out `undiscardable`s for
+  // each pending pull that themselves have not been discarded yet 
(`useCount`).
+  //
+  // The pull future itself is discarded as soon as its `useCount` drops to
+  // zero to notify the puller that the pull has been cancelled.
+
+  // A pull future and its use count.
+  struct Pull
+  {
+    Future<Image> future;
+    size_t useCount;
+  };
+
+  // The pending pull futures are keyed by the stringified `ImageReference`
+  // for the image being pulled.
+  hashmap<string, Pull> pulling;
+
   // For executing path removals in a separated actor.
   process::Executor executor;
 
@@ -338,34 +358,69 @@ Future<Image> StoreProcess::_get(
     }
   }
 
-  Try<string> staging =
-    os::mkdtemp(paths::getStagingTempDir(flags.docker_store_dir));
+  const string pullKey = stringify(reference);
 
-  if (staging.isError()) {
-    return Failure(
-        "Failed to create a staging directory: " + staging.error());
-  }
+  Future<Image> pull = [&]() -> Future<Image> {
+    if (pulling.contains(pullKey)) {
+      pulling.at(pullKey).useCount += 1;
 
-  LOG(INFO) << "Pulling image '" << reference << "'";
-
-  return metrics.image_pull.time(puller->pull(
-      reference,
-      staging.get(),
-      backend,
-      config)
-    .then(defer(self(), &Self::moveLayers, staging.get(), lambda::_1, backend))
-    .then(defer(self(), [=](const Image& image) {
-      LOG(INFO) << "Caching image '" << reference << "'";
-      return metadataManager->put(image);
-    }))
-    .onAny(defer(self(), [=](const Future<Image>& image) {
-      LOG(INFO) << "Removing staging directory '" << staging.get() << "'";
-      Try<Nothing> rmdir = os::rmdir(staging.get());
-      if (rmdir.isError()) {
-        LOG(WARNING) << "Failed to remove staging directory '" << staging.get()
-                     << "': " << rmdir.error();
+      // NOTE: In the rare case when the future has already failed but the
+      // onAny() callback has not been executed yet, we will hand out an 
already
+      // FAILED future. This is basically equivalent to handing out a PENDING
+      // future that will fail immediately, and should not be an issue.
+      CHECK(!pulling.at(pullKey).future.hasDiscard());
+      return pulling.at(pullKey).future;
+    }
+
+    Try<string> staging =
+      os::mkdtemp(paths::getStagingTempDir(flags.docker_store_dir));
+
+    if (staging.isError()) {
+      return Failure(
+          "Failed to create a staging directory: " + staging.error());
+    }
+
+    LOG(INFO) << "Pulling image '" << reference << "'";
+
+    Future<Image> pull  = metrics.image_pull.time(
+        puller->pull(reference, staging.get(), backend, config)
+          .then(defer(
+              self(), &Self::moveLayers, staging.get(), lambda::_1, backend))
+          .then(defer(
+              self(),
+              [=](const Image& image) {
+                LOG(INFO) << "Caching image '" << reference << "'";
+                return metadataManager->put(image);
+              }))
+          .onAny(defer(self(), [=](const Future<Image>& image) {
+            pulling.erase(pullKey);
+
+            LOG(INFO) << "Removing staging directory '" << staging.get() << 
"'";
+            Try<Nothing> rmdir = os::rmdir(staging.get());
+            if (rmdir.isError()) {
+              LOG(WARNING) << "Failed to remove staging directory '"
+                           << staging.get() << "': " << rmdir.error();
+            }
+          })));
+
+    pulling[pullKey] = {pull, 1};
+    return pull;
+  }();
+
+  // Each call of this method returns an `undiscardable` future referencing the
+  // `pull` future. The `pull` future itself will be discarded as soon as
+  // all the `undiscardable`s have been discarded.
+  return undiscardable(pull)
+    .onDiscard(defer(self(), [=]() {
+      if (pulling.contains(pullKey)) {
+        pulling.at(pullKey).useCount -= 1;
+
+        if (pulling.at(pullKey).useCount == 0) {
+          pulling.at(pullKey).future.discard();
+          pulling.erase(pullKey);
+        }
       }
-    })));
+    }));
 }
 
 
@@ -537,6 +592,11 @@ Future<Nothing> StoreProcess::prune(
     const vector<mesos::Image>& excludedImages,
     const hashset<string>& activeLayerPaths)
 {
+  // All existing pulling should have finished.
+  if (!pulling.empty()) {
+    return Failure("Cannot prune and pull at the same time");
+  }
+
   vector<spec::ImageReference> imageReferences;
   imageReferences.reserve(excludedImages.size());
 

Reply via email to