This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch 1.5.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 97f73a9e844f9f37d54a97c8993dbf05cffc9592 Author: Chun-Hung Hsiao <chhs...@apache.org> AuthorDate: Wed Mar 28 22:47:58 2018 -0700 Enabled `--fetch_stall_timeout` in curl-based URI fetcher plugins. This patch passes the `--fetch_stall_timeout` agent flag into `DockerFetcherPlugin` (through setting flag `docker_stall_timeout` in the Docker store) and `CurlFetcherPlugin` (through setting flag `curl_stall_timeout` in the Appc store). Review: https://reviews.apache.org/r/65876/ --- .../containerizer/mesos/provisioner/appc/store.cpp | 5 +- .../mesos/provisioner/docker/store.cpp | 1 + src/uri/fetchers/curl.cpp | 23 +++++++- src/uri/fetchers/curl.hpp | 12 +++- src/uri/fetchers/docker.cpp | 64 ++++++++++++++++------ src/uri/fetchers/docker.hpp | 1 + 6 files changed, 85 insertions(+), 21 deletions(-) diff --git a/src/slave/containerizer/mesos/provisioner/appc/store.cpp b/src/slave/containerizer/mesos/provisioner/appc/store.cpp index c1f9661..f30c166 100644 --- a/src/slave/containerizer/mesos/provisioner/appc/store.cpp +++ b/src/slave/containerizer/mesos/provisioner/appc/store.cpp @@ -131,7 +131,10 @@ Try<Owned<slave::Store>> Store::create( // TODO(jojy): Uri fetcher has 'shared' semantics for the // provisioner. It's a shared pointer which needs to be injected // from top level into the store (instead of being created here). - Try<Owned<uri::Fetcher>> uriFetcher = uri::fetcher::create(); + uri::fetcher::Flags _flags; + _flags.curl_stall_timeout = flags.fetcher_stall_timeout; + + Try<Owned<uri::Fetcher>> uriFetcher = uri::fetcher::create(_flags); if (uriFetcher.isError()) { return Error("Failed to create uri fetcher: " + uriFetcher.error()); } diff --git a/src/slave/containerizer/mesos/provisioner/docker/store.cpp b/src/slave/containerizer/mesos/provisioner/docker/store.cpp index d64e6eb..d277cc6 100644 --- a/src/slave/containerizer/mesos/provisioner/docker/store.cpp +++ b/src/slave/containerizer/mesos/provisioner/docker/store.cpp @@ -141,6 +141,7 @@ Try<Owned<slave::Store>> Store::create( // TODO(dpravat): Remove after resolving MESOS-5473. #ifndef __WINDOWS__ _flags.docker_config = flags.docker_config; + _flags.docker_stall_timeout = flags.fetcher_stall_timeout; #endif Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create(_flags); diff --git a/src/uri/fetchers/curl.cpp b/src/uri/fetchers/curl.cpp index f34daf2..2f67a86 100644 --- a/src/uri/fetchers/curl.cpp +++ b/src/uri/fetchers/curl.cpp @@ -54,6 +54,16 @@ using process::Subprocess; namespace mesos { namespace uri { +CurlFetcherPlugin::Flags::Flags() +{ + add(&Flags::curl_stall_timeout, + "curl_stall_timeout", + "Amount of time for the fetcher to wait before considering a download\n" + "being too slow and abort it when the download stalls (i.e., the speed\n" + "keeps below one byte per second).\n"); +} + + const char CurlFetcherPlugin::NAME[] = "curl"; @@ -61,7 +71,7 @@ Try<Owned<Fetcher::Plugin>> CurlFetcherPlugin::create(const Flags& flags) { // TODO(jieyu): Make sure curl is available. - return Owned<Fetcher::Plugin>(new CurlFetcherPlugin()); + return Owned<Fetcher::Plugin>(new CurlFetcherPlugin(flags)); } @@ -98,7 +108,7 @@ Future<Nothing> CurlFetcherPlugin::fetch( // TODO(jieyu): Allow user to specify the name of the output file. const string output = path::join(directory, Path(uri.path()).basename()); - const vector<string> argv = { + vector<string> argv = { "curl", "-s", // Don't show progress meter or error messages. "-S", // Makes curl show an error message if it fails. @@ -108,6 +118,15 @@ Future<Nothing> CurlFetcherPlugin::fetch( strings::trim(stringify(uri)) }; + // Add a timeout for curl to abort when the download speed keeps low + // (1 byte per second by default) for the specified duration. See: + // https://curl.haxx.se/docs/manpage.html#-y + if (flags.curl_stall_timeout.isSome()) { + argv.push_back("-y"); + argv.push_back( + std::to_string(static_cast<long>(flags.curl_stall_timeout->secs()))); + } + Try<Subprocess> s = subprocess( "curl", argv, diff --git a/src/uri/fetchers/curl.hpp b/src/uri/fetchers/curl.hpp index 909c2eb..e07c1e2 100644 --- a/src/uri/fetchers/curl.hpp +++ b/src/uri/fetchers/curl.hpp @@ -30,7 +30,13 @@ namespace uri { class CurlFetcherPlugin : public Fetcher::Plugin { public: - class Flags : public virtual flags::FlagsBase {}; + class Flags : public virtual flags::FlagsBase + { + public: + Flags(); + + Option<Duration> curl_stall_timeout; + }; static const char NAME[]; @@ -48,7 +54,9 @@ public: const Option<std::string>& data = None()) const; private: - CurlFetcherPlugin() {} + explicit CurlFetcherPlugin(const Flags& _flags) : flags(_flags) {} + + const Flags flags; }; } // namespace uri { diff --git a/src/uri/fetchers/docker.cpp b/src/uri/fetchers/docker.cpp index daf7452..aa28161 100644 --- a/src/uri/fetchers/docker.cpp +++ b/src/uri/fetchers/docker.cpp @@ -93,7 +93,8 @@ static set<string> schemes() // streaming). static Future<http::Response> curl( const string& uri, - const http::Headers& headers = http::Headers()) + const http::Headers& headers, + const Option<Duration>& stallTimeout) { vector<string> argv = { "curl", @@ -110,6 +111,14 @@ static Future<http::Response> curl( argv.push_back(key + ": " + value); } + // Add a timeout for curl to abort when the download speed keeps low + // (1 byte per second by default) for the specified duration. See: + // https://curl.haxx.se/docs/manpage.html#-y + if (stallTimeout.isSome()) { + argv.push_back("-y"); + argv.push_back(std::to_string(static_cast<long>(stallTimeout->secs()))); + } + argv.push_back(strings::trim(uri)); // TODO(jieyu): Kill the process if discard is called. @@ -199,9 +208,10 @@ static Future<http::Response> curl( static Future<http::Response> curl( const URI& uri, - const http::Headers& headers = http::Headers()) + const http::Headers& headers, + const Option<Duration>& stallTimeout) { - return curl(stringify(uri), headers); + return curl(stringify(uri), headers, stallTimeout); } @@ -209,7 +219,8 @@ static Future<http::Response> curl( static Future<int> download( const string& uri, const string& blobPath, - const http::Headers& headers = http::Headers()) + const http::Headers& headers, + const Option<Duration>& stallTimeout) { vector<string> argv = { "curl", @@ -225,6 +236,13 @@ static Future<int> download( argv.push_back(key + ": " + value); } + // Add a timeout for curl to abort when the download speed keeps below + // 1 byte per second. See: https://curl.haxx.se/docs/manpage.html#-y + if (stallTimeout.isSome()) { + argv.push_back("-y"); + argv.push_back(std::to_string(static_cast<long>(stallTimeout->secs()))); + } + argv.push_back(uri); // TODO(jieyu): Kill the process if discard is called. @@ -294,7 +312,7 @@ static Future<int> download( if (tokens.size() == 2) { // Headers are not attached because the request is already // authenticated. - return download(tokens[1], blobPath); + return download(tokens[1], blobPath, http::Headers(), stallTimeout); } return code.get(); @@ -305,10 +323,12 @@ static Future<int> download( static Future<int> download( const URI& uri, const string& directory, - const http::Headers& headers = http::Headers()) + const http::Headers& headers, + const Option<Duration>& stallTimeout) { const string blobPath = path::join(directory, Path(uri.path()).basename()); - return download(strings::trim(stringify(uri)), blobPath, headers); + return download( + strings::trim(stringify(uri)), blobPath, headers, stallTimeout); } @@ -375,9 +395,11 @@ class DockerFetcherPluginProcess : public Process<DockerFetcherPluginProcess> { public: DockerFetcherPluginProcess( - const hashmap<string, spec::Config::Auth>& _auths) + const hashmap<string, spec::Config::Auth>& _auths, + const Option<Duration>& _stallTimeout) : ProcessBase(process::ID::generate("docker-fetcher-plugin")), - auths(_auths) {} + auths(_auths), + stallTimeout(_stallTimeout) {} Future<Nothing> fetch( const URI& uri, @@ -426,6 +448,9 @@ private: // keyed by registry URL. // For example, "https://index.docker.io/v1/" -> spec::Config::Auth hashmap<string, spec::Config::Auth> auths; + + // Timeout for curl to wait when a net download stalls. + const Option<Duration> stallTimeout; }; @@ -434,6 +459,12 @@ DockerFetcherPlugin::Flags::Flags() add(&Flags::docker_config, "docker_config", "The default docker config file."); + + add(&Flags::docker_stall_timeout, + "docker_stall_timeout", + "Amount of time for the fetcher to wait before considering a download\n" + "being too slow and abort it when the download stalls (i.e., the speed\n" + "keeps below one byte per second)."); } @@ -457,7 +488,8 @@ Try<Owned<Fetcher::Plugin>> DockerFetcherPlugin::create(const Flags& flags) } Owned<DockerFetcherPluginProcess> process(new DockerFetcherPluginProcess( - hashmap<string, spec::Config::Auth>(auths))); + hashmap<string, spec::Config::Auth>(auths), + flags.docker_stall_timeout)); return Owned<Fetcher::Plugin>(new DockerFetcherPlugin(process)); } @@ -569,7 +601,7 @@ Future<Nothing> DockerFetcherPluginProcess::fetch( {"Accept", "application/vnd.docker.distribution.manifest.v1+json"} }; - return curl(manifestUri, manifestHeaders + basicAuthHeaders) + return curl(manifestUri, manifestHeaders + basicAuthHeaders, stallTimeout) .then(defer(self(), &Self::_fetch, uri, @@ -594,7 +626,7 @@ Future<Nothing> DockerFetcherPluginProcess::_fetch( return getAuthHeader(manifestUri, basicAuthHeaders, response) .then(defer(self(), [=]( const http::Headers& authHeaders) -> Future<Nothing> { - return curl(manifestUri, manifestHeaders + authHeaders) + return curl(manifestUri, manifestHeaders + authHeaders, stallTimeout) .then(defer(self(), &Self::__fetch, uri, @@ -702,7 +734,7 @@ Future<Nothing> DockerFetcherPluginProcess::fetchBlob( { URI blobUri = getBlobUri(uri); - return download(blobUri, directory, authHeaders) + return download(blobUri, directory, authHeaders, stallTimeout) .then(defer(self(), [=](int code) -> Future<Nothing> { if (code == http::Status::UNAUTHORIZED) { // If we get a '401 Unauthorized', we assume that 'authHeaders' @@ -727,7 +759,7 @@ Future<Nothing> DockerFetcherPluginProcess::_fetchBlob( // HTTP headers from 'download'. Currently, 'download' only returns // the HTTP response code because we don't support parsing HTTP // headers alone. Revisit this once that's supported. - return curl(blobUri, basicAuthHeaders) + return curl(blobUri, basicAuthHeaders, stallTimeout) .then(defer(self(), [=](const http::Response& response) -> Future<Nothing> { // We expect a '401 Unauthorized' response here since the // 'download' with the same URI returns a '401 Unauthorized'. @@ -740,7 +772,7 @@ Future<Nothing> DockerFetcherPluginProcess::_fetchBlob( return getAuthHeader(blobUri, basicAuthHeaders, response) .then(defer(self(), [=]( const http::Headers& authHeaders) -> Future<Nothing> { - return download(blobUri, directory, authHeaders) + return download(blobUri, directory, authHeaders, stallTimeout) .then(defer(self(), &Self::__fetchBlob, lambda::_1)); @@ -812,7 +844,7 @@ Future<http::Headers> DockerFetcherPluginProcess::getAuthHeader( "service=" + authParam.at("service") + "&" + "scope=" + authParam.at("scope"); - return curl(authServerUri, basicAuthHeaders) + return curl(authServerUri, basicAuthHeaders, stallTimeout) .then([authServerUri]( const http::Response& response) -> Future<http::Headers> { if (response.code != http::Status::OK) { diff --git a/src/uri/fetchers/docker.hpp b/src/uri/fetchers/docker.hpp index 6a89c0a..cdbab9a 100644 --- a/src/uri/fetchers/docker.hpp +++ b/src/uri/fetchers/docker.hpp @@ -40,6 +40,7 @@ public: Flags(); Option<JSON::Object> docker_config; + Option<Duration> docker_stall_timeout; }; static const char NAME[];