This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.5.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 97f73a9e844f9f37d54a97c8993dbf05cffc9592
Author: Chun-Hung Hsiao <chhs...@apache.org>
AuthorDate: Wed Mar 28 22:47:58 2018 -0700

    Enabled `--fetch_stall_timeout` in curl-based URI fetcher plugins.
    
    This patch passes the `--fetch_stall_timeout` agent flag into
    `DockerFetcherPlugin` (through setting flag `docker_stall_timeout` in
    the Docker store) and `CurlFetcherPlugin` (through setting flag
    `curl_stall_timeout` in the Appc store).
    
    Review: https://reviews.apache.org/r/65876/
---
 .../containerizer/mesos/provisioner/appc/store.cpp |  5 +-
 .../mesos/provisioner/docker/store.cpp             |  1 +
 src/uri/fetchers/curl.cpp                          | 23 +++++++-
 src/uri/fetchers/curl.hpp                          | 12 +++-
 src/uri/fetchers/docker.cpp                        | 64 ++++++++++++++++------
 src/uri/fetchers/docker.hpp                        |  1 +
 6 files changed, 85 insertions(+), 21 deletions(-)

diff --git a/src/slave/containerizer/mesos/provisioner/appc/store.cpp 
b/src/slave/containerizer/mesos/provisioner/appc/store.cpp
index c1f9661..f30c166 100644
--- a/src/slave/containerizer/mesos/provisioner/appc/store.cpp
+++ b/src/slave/containerizer/mesos/provisioner/appc/store.cpp
@@ -131,7 +131,10 @@ Try<Owned<slave::Store>> Store::create(
   // TODO(jojy): Uri fetcher has 'shared' semantics for the
   // provisioner. It's a shared pointer which needs to be injected
   // from top level into the store (instead of being created here).
-  Try<Owned<uri::Fetcher>> uriFetcher = uri::fetcher::create();
+  uri::fetcher::Flags _flags;
+  _flags.curl_stall_timeout = flags.fetcher_stall_timeout;
+
+  Try<Owned<uri::Fetcher>> uriFetcher = uri::fetcher::create(_flags);
   if (uriFetcher.isError()) {
     return Error("Failed to create uri fetcher: " + uriFetcher.error());
   }
diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.cpp 
b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
index d64e6eb..d277cc6 100644
--- a/src/slave/containerizer/mesos/provisioner/docker/store.cpp
+++ b/src/slave/containerizer/mesos/provisioner/docker/store.cpp
@@ -141,6 +141,7 @@ Try<Owned<slave::Store>> Store::create(
   // TODO(dpravat): Remove after resolving MESOS-5473.
 #ifndef __WINDOWS__
   _flags.docker_config = flags.docker_config;
+  _flags.docker_stall_timeout = flags.fetcher_stall_timeout;
 #endif
 
   Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create(_flags);
diff --git a/src/uri/fetchers/curl.cpp b/src/uri/fetchers/curl.cpp
index f34daf2..2f67a86 100644
--- a/src/uri/fetchers/curl.cpp
+++ b/src/uri/fetchers/curl.cpp
@@ -54,6 +54,16 @@ using process::Subprocess;
 namespace mesos {
 namespace uri {
 
+CurlFetcherPlugin::Flags::Flags()
+{
+  add(&Flags::curl_stall_timeout,
+      "curl_stall_timeout",
+      "Amount of time for the fetcher to wait before considering a download\n"
+      "being too slow and abort it when the download stalls (i.e., the speed\n"
+      "keeps below one byte per second).\n");
+}
+
+
 const char CurlFetcherPlugin::NAME[] = "curl";
 
 
@@ -61,7 +71,7 @@ Try<Owned<Fetcher::Plugin>> CurlFetcherPlugin::create(const 
Flags& flags)
 {
   // TODO(jieyu): Make sure curl is available.
 
-  return Owned<Fetcher::Plugin>(new CurlFetcherPlugin());
+  return Owned<Fetcher::Plugin>(new CurlFetcherPlugin(flags));
 }
 
 
@@ -98,7 +108,7 @@ Future<Nothing> CurlFetcherPlugin::fetch(
   // TODO(jieyu): Allow user to specify the name of the output file.
   const string output = path::join(directory, Path(uri.path()).basename());
 
-  const vector<string> argv = {
+  vector<string> argv = {
     "curl",
     "-s",                 // Don't show progress meter or error messages.
     "-S",                 // Makes curl show an error message if it fails.
@@ -108,6 +118,15 @@ Future<Nothing> CurlFetcherPlugin::fetch(
     strings::trim(stringify(uri))
   };
 
+  // Add a timeout for curl to abort when the download speed keeps low
+  // (1 byte per second by default) for the specified duration. See:
+  // https://curl.haxx.se/docs/manpage.html#-y
+  if (flags.curl_stall_timeout.isSome()) {
+    argv.push_back("-y");
+    argv.push_back(
+        std::to_string(static_cast<long>(flags.curl_stall_timeout->secs())));
+  }
+
   Try<Subprocess> s = subprocess(
       "curl",
       argv,
diff --git a/src/uri/fetchers/curl.hpp b/src/uri/fetchers/curl.hpp
index 909c2eb..e07c1e2 100644
--- a/src/uri/fetchers/curl.hpp
+++ b/src/uri/fetchers/curl.hpp
@@ -30,7 +30,13 @@ namespace uri {
 class CurlFetcherPlugin : public Fetcher::Plugin
 {
 public:
-  class Flags : public virtual flags::FlagsBase {};
+  class Flags : public virtual flags::FlagsBase
+  {
+  public:
+    Flags();
+
+    Option<Duration> curl_stall_timeout;
+  };
 
   static const char NAME[];
 
@@ -48,7 +54,9 @@ public:
       const Option<std::string>& data = None()) const;
 
 private:
-  CurlFetcherPlugin() {}
+  explicit CurlFetcherPlugin(const Flags& _flags) : flags(_flags) {}
+
+  const Flags flags;
 };
 
 } // namespace uri {
diff --git a/src/uri/fetchers/docker.cpp b/src/uri/fetchers/docker.cpp
index daf7452..aa28161 100644
--- a/src/uri/fetchers/docker.cpp
+++ b/src/uri/fetchers/docker.cpp
@@ -93,7 +93,8 @@ static set<string> schemes()
 // streaming).
 static Future<http::Response> curl(
     const string& uri,
-    const http::Headers& headers = http::Headers())
+    const http::Headers& headers,
+    const Option<Duration>& stallTimeout)
 {
   vector<string> argv = {
     "curl",
@@ -110,6 +111,14 @@ static Future<http::Response> curl(
     argv.push_back(key + ": " + value);
   }
 
+  // Add a timeout for curl to abort when the download speed keeps low
+  // (1 byte per second by default) for the specified duration. See:
+  // https://curl.haxx.se/docs/manpage.html#-y
+  if (stallTimeout.isSome()) {
+    argv.push_back("-y");
+    argv.push_back(std::to_string(static_cast<long>(stallTimeout->secs())));
+  }
+
   argv.push_back(strings::trim(uri));
 
   // TODO(jieyu): Kill the process if discard is called.
@@ -199,9 +208,10 @@ static Future<http::Response> curl(
 
 static Future<http::Response> curl(
     const URI& uri,
-    const http::Headers& headers = http::Headers())
+    const http::Headers& headers,
+    const Option<Duration>& stallTimeout)
 {
-  return curl(stringify(uri), headers);
+  return curl(stringify(uri), headers, stallTimeout);
 }
 
 
@@ -209,7 +219,8 @@ static Future<http::Response> curl(
 static Future<int> download(
     const string& uri,
     const string& blobPath,
-    const http::Headers& headers = http::Headers())
+    const http::Headers& headers,
+    const Option<Duration>& stallTimeout)
 {
   vector<string> argv = {
     "curl",
@@ -225,6 +236,13 @@ static Future<int> download(
     argv.push_back(key + ": " + value);
   }
 
+  // Add a timeout for curl to abort when the download speed keeps below
+  // 1 byte per second. See: https://curl.haxx.se/docs/manpage.html#-y
+  if (stallTimeout.isSome()) {
+    argv.push_back("-y");
+    argv.push_back(std::to_string(static_cast<long>(stallTimeout->secs())));
+  }
+
   argv.push_back(uri);
 
   // TODO(jieyu): Kill the process if discard is called.
@@ -294,7 +312,7 @@ static Future<int> download(
       if (tokens.size() == 2) {
         // Headers are not attached because the request is already
         // authenticated.
-        return download(tokens[1], blobPath);
+        return download(tokens[1], blobPath, http::Headers(), stallTimeout);
       }
 
       return code.get();
@@ -305,10 +323,12 @@ static Future<int> download(
 static Future<int> download(
     const URI& uri,
     const string& directory,
-    const http::Headers& headers = http::Headers())
+    const http::Headers& headers,
+    const Option<Duration>& stallTimeout)
 {
   const string blobPath = path::join(directory, Path(uri.path()).basename());
-  return download(strings::trim(stringify(uri)), blobPath, headers);
+  return download(
+      strings::trim(stringify(uri)), blobPath, headers, stallTimeout);
 }
 
 
@@ -375,9 +395,11 @@ class DockerFetcherPluginProcess : public 
Process<DockerFetcherPluginProcess>
 {
 public:
   DockerFetcherPluginProcess(
-      const hashmap<string, spec::Config::Auth>& _auths)
+      const hashmap<string, spec::Config::Auth>& _auths,
+      const Option<Duration>& _stallTimeout)
     : ProcessBase(process::ID::generate("docker-fetcher-plugin")),
-      auths(_auths) {}
+      auths(_auths),
+      stallTimeout(_stallTimeout) {}
 
   Future<Nothing> fetch(
       const URI& uri,
@@ -426,6 +448,9 @@ private:
   // keyed by registry URL.
   // For example, "https://index.docker.io/v1/"; -> spec::Config::Auth
   hashmap<string, spec::Config::Auth> auths;
+
+  // Timeout for curl to wait when a net download stalls.
+  const Option<Duration> stallTimeout;
 };
 
 
@@ -434,6 +459,12 @@ DockerFetcherPlugin::Flags::Flags()
   add(&Flags::docker_config,
       "docker_config",
       "The default docker config file.");
+
+  add(&Flags::docker_stall_timeout,
+      "docker_stall_timeout",
+      "Amount of time for the fetcher to wait before considering a download\n"
+      "being too slow and abort it when the download stalls (i.e., the speed\n"
+      "keeps below one byte per second).");
 }
 
 
@@ -457,7 +488,8 @@ Try<Owned<Fetcher::Plugin>> 
DockerFetcherPlugin::create(const Flags& flags)
   }
 
   Owned<DockerFetcherPluginProcess> process(new DockerFetcherPluginProcess(
-      hashmap<string, spec::Config::Auth>(auths)));
+      hashmap<string, spec::Config::Auth>(auths),
+      flags.docker_stall_timeout));
 
   return Owned<Fetcher::Plugin>(new DockerFetcherPlugin(process));
 }
@@ -569,7 +601,7 @@ Future<Nothing> DockerFetcherPluginProcess::fetch(
     {"Accept", "application/vnd.docker.distribution.manifest.v1+json"}
   };
 
-  return curl(manifestUri, manifestHeaders + basicAuthHeaders)
+  return curl(manifestUri, manifestHeaders + basicAuthHeaders, stallTimeout)
     .then(defer(self(),
                 &Self::_fetch,
                 uri,
@@ -594,7 +626,7 @@ Future<Nothing> DockerFetcherPluginProcess::_fetch(
     return getAuthHeader(manifestUri, basicAuthHeaders, response)
       .then(defer(self(), [=](
           const http::Headers& authHeaders) -> Future<Nothing> {
-        return curl(manifestUri, manifestHeaders + authHeaders)
+        return curl(manifestUri, manifestHeaders + authHeaders, stallTimeout)
           .then(defer(self(),
                       &Self::__fetch,
                       uri,
@@ -702,7 +734,7 @@ Future<Nothing> DockerFetcherPluginProcess::fetchBlob(
 {
   URI blobUri = getBlobUri(uri);
 
-  return download(blobUri, directory, authHeaders)
+  return download(blobUri, directory, authHeaders, stallTimeout)
     .then(defer(self(), [=](int code) -> Future<Nothing> {
       if (code == http::Status::UNAUTHORIZED) {
         // If we get a '401 Unauthorized', we assume that 'authHeaders'
@@ -727,7 +759,7 @@ Future<Nothing> DockerFetcherPluginProcess::_fetchBlob(
   // HTTP headers from 'download'. Currently, 'download' only returns
   // the HTTP response code because we don't support parsing HTTP
   // headers alone. Revisit this once that's supported.
-  return curl(blobUri, basicAuthHeaders)
+  return curl(blobUri, basicAuthHeaders, stallTimeout)
     .then(defer(self(), [=](const http::Response& response) -> Future<Nothing> 
{
       // We expect a '401 Unauthorized' response here since the
       // 'download' with the same URI returns a '401 Unauthorized'.
@@ -740,7 +772,7 @@ Future<Nothing> DockerFetcherPluginProcess::_fetchBlob(
       return getAuthHeader(blobUri, basicAuthHeaders, response)
         .then(defer(self(), [=](
             const http::Headers& authHeaders) -> Future<Nothing> {
-          return download(blobUri, directory, authHeaders)
+          return download(blobUri, directory, authHeaders, stallTimeout)
             .then(defer(self(),
                         &Self::__fetchBlob,
                         lambda::_1));
@@ -812,7 +844,7 @@ Future<http::Headers> 
DockerFetcherPluginProcess::getAuthHeader(
       "service=" + authParam.at("service") + "&" +
       "scope=" + authParam.at("scope");
 
-    return curl(authServerUri, basicAuthHeaders)
+    return curl(authServerUri, basicAuthHeaders, stallTimeout)
       .then([authServerUri](
           const http::Response& response) -> Future<http::Headers> {
         if (response.code != http::Status::OK) {
diff --git a/src/uri/fetchers/docker.hpp b/src/uri/fetchers/docker.hpp
index 6a89c0a..cdbab9a 100644
--- a/src/uri/fetchers/docker.hpp
+++ b/src/uri/fetchers/docker.hpp
@@ -40,6 +40,7 @@ public:
     Flags();
 
     Option<JSON::Object> docker_config;
+    Option<Duration> docker_stall_timeout;
   };
 
   static const char NAME[];

Reply via email to