This is an automated email from the ASF dual-hosted git repository. asekretenko pushed a commit to branch 1.7.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit b4a49fd915cacd111cedc3c583d69bc2b9d6468e Author: Andrei Sekretenko <asekrete...@apache.org> AuthorDate: Thu Aug 20 17:02:51 2020 +0200 Deduplicated concurrent image pulls by docker store. This patch makes the docker store reuse a pending pull if asked for an image that is already being pulled. The pull caching follows the same approach as the earlier attempt in https://reviews.apache.org/r/39331 . However, handing out futures to the store callers and handling their discards are performed differently, using a form of reference counting, so that the pull itself is discarded only if all the futures returned by `Store::get()` have been discarded. Review: https://reviews.apache.org/r/72790 --- .../mesos/provisioner/docker/store.cpp | 110 ++++++++++++++++----- 1 file changed, 85 insertions(+), 25 deletions(-) diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.cpp b/src/slave/containerizer/mesos/provisioner/docker/store.cpp index bf2be90..bbba8cf 100644 --- a/src/slave/containerizer/mesos/provisioner/docker/store.cpp +++ b/src/slave/containerizer/mesos/provisioner/docker/store.cpp @@ -143,6 +143,26 @@ private: Owned<MetadataManager> metadataManager; Owned<Puller> puller; + // The get() method deduplicates image pulls by caching a single Future + // per each initiated image pull, handing out `undiscardable(pull)` to the + // caller (to prevent discards by the callers from propagating and discarding + // the pull) and tracking the number of the handed out `undiscardable`s for + // each pending pull that themselves have not been discarded yet (`useCount`). + // + // The pull future itself is discarded as soon as its `useCount` drops to + // zero to notify the puller that the pull has been cancelled. + + // A pull future and its use count. + struct Pull + { + Future<Image> future; + size_t useCount; + }; + + // The pending pull futures are keyed by the stringified `ImageReference` + // for the image being pulled. + hashmap<string, Pull> pulling; + // For executing path removals in a separated actor. process::Executor executor; @@ -338,34 +358,69 @@ Future<Image> StoreProcess::_get( } } - Try<string> staging = - os::mkdtemp(paths::getStagingTempDir(flags.docker_store_dir)); + const string pullKey = stringify(reference); - if (staging.isError()) { - return Failure( - "Failed to create a staging directory: " + staging.error()); - } + Future<Image> pull = [&]() -> Future<Image> { + if (pulling.contains(pullKey)) { + pulling.at(pullKey).useCount += 1; - LOG(INFO) << "Pulling image '" << reference << "'"; - - return metrics.image_pull.time(puller->pull( - reference, - staging.get(), - backend, - config) - .then(defer(self(), &Self::moveLayers, staging.get(), lambda::_1, backend)) - .then(defer(self(), [=](const Image& image) { - LOG(INFO) << "Caching image '" << reference << "'"; - return metadataManager->put(image); - })) - .onAny(defer(self(), [=](const Future<Image>& image) { - LOG(INFO) << "Removing staging directory '" << staging.get() << "'"; - Try<Nothing> rmdir = os::rmdir(staging.get()); - if (rmdir.isError()) { - LOG(WARNING) << "Failed to remove staging directory '" << staging.get() - << "': " << rmdir.error(); + // NOTE: In the rare case when the future has already failed but the + // onAny() callback has not been executed yet, we will hand out an already + // FAILED future. This is basically equivalent to handing out a PENDING + // future that will fail immediately, and should not be an issue. + CHECK(!pulling.at(pullKey).future.hasDiscard()); + return pulling.at(pullKey).future; + } + + Try<string> staging = + os::mkdtemp(paths::getStagingTempDir(flags.docker_store_dir)); + + if (staging.isError()) { + return Failure( + "Failed to create a staging directory: " + staging.error()); + } + + LOG(INFO) << "Pulling image '" << reference << "'"; + + Future<Image> pull = metrics.image_pull.time( + puller->pull(reference, staging.get(), backend, config) + .then(defer( + self(), &Self::moveLayers, staging.get(), lambda::_1, backend)) + .then(defer( + self(), + [=](const Image& image) { + LOG(INFO) << "Caching image '" << reference << "'"; + return metadataManager->put(image); + })) + .onAny(defer(self(), [=](const Future<Image>& image) { + pulling.erase(pullKey); + + LOG(INFO) << "Removing staging directory '" << staging.get() << "'"; + Try<Nothing> rmdir = os::rmdir(staging.get()); + if (rmdir.isError()) { + LOG(WARNING) << "Failed to remove staging directory '" + << staging.get() << "': " << rmdir.error(); + } + }))); + + pulling[pullKey] = {pull, 1}; + return pull; + }(); + + // Each call of this method returns an `undiscardable` future referencing the + // `pull` future. The `pull` future itself will be discarded as soon as + // all the `undiscardable`s have been discarded. + return undiscardable(pull) + .onDiscard(defer(self(), [=]() { + if (pulling.contains(pullKey)) { + pulling.at(pullKey).useCount -= 1; + + if (pulling.at(pullKey).useCount == 0) { + pulling.at(pullKey).future.discard(); + pulling.erase(pullKey); + } } - }))); + })); } @@ -537,6 +592,11 @@ Future<Nothing> StoreProcess::prune( const vector<mesos::Image>& excludedImages, const hashset<string>& activeLayerPaths) { + // All existing pulling should have finished. + if (!pulling.empty()) { + return Failure("Cannot prune and pull at the same time"); + } + vector<spec::ImageReference> imageReferences; imageReferences.reserve(excludedImages.size());