This is an automated email from the ASF dual-hosted git repository. chhsiao pushed a commit to branch 1.5.x in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 0d05ffc174f90aa8573869ab36bd338224121b42 Author: Chun-Hung Hsiao <chhs...@apache.org> AuthorDate: Wed Mar 28 22:47:52 2018 -0700 Added `--fetcher_stall_timeout` to abort stalled artifact fetching. This flag specifies a timeout for `mesos-fetcher` to wait before aborting if the download speed keeps below 1 bytes/sec. This would avoid containers to get stuck at FETCHING. Review: https://reviews.apache.org/r/65856/ --- docs/configuration/agent.md | 12 +++++++++++ include/mesos/fetcher/fetcher.proto | 3 +++ src/launcher/fetcher.cpp | 40 +++++++++++++++++++++++++------------ src/slave/constants.hpp | 3 +++ src/slave/containerizer/fetcher.cpp | 3 +++ src/slave/flags.cpp | 9 +++++++++ src/slave/flags.hpp | 1 + 7 files changed, 58 insertions(+), 13 deletions(-) diff --git a/docs/configuration/agent.md b/docs/configuration/agent.md index 3cccf89..2a72a64 100644 --- a/docs/configuration/agent.md +++ b/docs/configuration/agent.md @@ -804,6 +804,18 @@ Size of the fetcher cache in Bytes. (default: 2GB) </tr> <tr> <td> + --fetcher_stall_timeout=VALUE + </td> + <td> +Amount of time for the fetcher to wait before considering a download +being too slow and abort it when the download stalls (i.e., the speed +keeps below one byte per second). +<b>NOTE</b>: This feature only applies when downloading data from the net and +does not apply to HDFS. (default: 1mins) + </td> +</tr> +<tr> + <td> --frameworks_home=VALUE </td> <td> diff --git a/include/mesos/fetcher/fetcher.proto b/include/mesos/fetcher/fetcher.proto index 6a5d807..d668106 100644 --- a/include/mesos/fetcher/fetcher.proto +++ b/include/mesos/fetcher/fetcher.proto @@ -64,4 +64,7 @@ message FetcherInfo { repeated Item items = 3; optional string user = 4; optional string frameworks_home = 5; + + // Only applies when fetching artifacts from the net. + optional DurationInfo stall_timeout = 6; } diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp index e2372a1..06fa52c 100644 --- a/src/launcher/fetcher.cpp +++ b/src/launcher/fetcher.cpp @@ -165,7 +165,8 @@ static Try<string> downloadWithHadoopClient( static Try<string> downloadWithNet( const string& sourceUri, - const string& destinationPath) + const string& destinationPath, + const Option<Duration>& stallTimeout) { // The net::download function only supports these protocols. CHECK(strings::startsWith(sourceUri, "http://") || @@ -176,7 +177,7 @@ static Try<string> downloadWithNet( LOG(INFO) << "Downloading resource from '" << sourceUri << "' to '" << destinationPath << "'"; - Try<int> code = net::download(sourceUri, destinationPath); + Try<int> code = net::download(sourceUri, destinationPath, stallTimeout); if (code.isError()) { return Error("Error downloading resource: " + code.error()); } else { @@ -217,7 +218,8 @@ static Try<string> copyFile( static Try<string> download( const string& _sourceUri, const string& destinationPath, - const Option<string>& frameworksHome) + const Option<string>& frameworksHome, + const Option<Duration>& stallTimeout) { // Trim leading whitespace for 'sourceUri'. const string sourceUri = strings::trim(_sourceUri, strings::PREFIX); @@ -243,7 +245,7 @@ static Try<string> download( // 2. Try to fetch URI using os::net / libcurl implementation. // We consider http, https, ftp, ftps compatible with libcurl. if (Fetcher::isNetUri(sourceUri)) { - return downloadWithNet(sourceUri, destinationPath); + return downloadWithNet(sourceUri, destinationPath, stallTimeout); } // 3. Try to fetch the URI using hadoop client. @@ -286,7 +288,8 @@ static Try<string> chmodExecutable(const string& filePath) static Try<string> fetchBypassingCache( const CommandInfo::URI& uri, const string& sandboxDirectory, - const Option<string>& frameworksHome) + const Option<string>& frameworksHome, + const Option<Duration>& stallTimeout) { LOG(INFO) << "Fetching directly into the sandbox directory"; @@ -315,7 +318,8 @@ static Try<string> fetchBypassingCache( string path = path::join(sandboxDirectory, outputFile.get()); - Try<string> downloaded = download(uri.value(), path, frameworksHome); + Try<string> downloaded = + download(uri.value(), path, frameworksHome, stallTimeout); if (downloaded.isError()) { return Error(downloaded.error()); } @@ -404,7 +408,8 @@ static Try<string> fetchThroughCache( const FetcherInfo::Item& item, const Option<string>& cacheDirectory, const string& sandboxDirectory, - const Option<string>& frameworksHome) + const Option<string>& frameworksHome, + const Option<Duration>& stallTimeout) { if (cacheDirectory.isNone() || cacheDirectory.get().empty()) { return Error("Cache directory not specified"); @@ -428,7 +433,8 @@ static Try<string> fetchThroughCache( Try<string> downloaded = download( item.uri().value(), path::join(cacheDirectory.get(), item.cache_filename()), - frameworksHome); + frameworksHome, + stallTimeout); if (downloaded.isError()) { return Error(downloaded.error()); @@ -445,7 +451,8 @@ static Try<string> fetch( const FetcherInfo::Item& item, const Option<string>& cacheDirectory, const string& sandboxDirectory, - const Option<string>& frameworksHome) + const Option<string>& frameworksHome, + const Option<Duration>& stallTimeout) { LOG(INFO) << "Fetching URI '" << item.uri().value() << "'"; @@ -453,14 +460,16 @@ static Try<string> fetch( return fetchBypassingCache( item.uri(), sandboxDirectory, - frameworksHome); + frameworksHome, + stallTimeout); } return fetchThroughCache( item, cacheDirectory, sandboxDirectory, - frameworksHome); + frameworksHome, + stallTimeout); } @@ -592,10 +601,15 @@ int main(int argc, char* argv[]) Option<string>::some(fetcherInfo.get().frameworks_home()) : Option<string>::none(); + const Option<Duration> stallTimeout = + fetcherInfo.get().has_stall_timeout() + ? Nanoseconds(fetcherInfo.get().stall_timeout().nanoseconds()) + : Option<Duration>::none(); + // Fetch each URI to a local file and chmod if necessary. foreach (const FetcherInfo::Item& item, fetcherInfo.get().items()) { - Try<string> fetched = - fetch(item, cacheDirectory, sandboxDirectory, frameworksHome); + Try<string> fetched = fetch( + item, cacheDirectory, sandboxDirectory, frameworksHome, stallTimeout); if (fetched.isError()) { EXIT(EXIT_FAILURE) << "Failed to fetch '" << item.uri().value() << "': " + fetched.error(); diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp index e2ebde5..068d744 100644 --- a/src/slave/constants.hpp +++ b/src/slave/constants.hpp @@ -175,6 +175,9 @@ constexpr char EXECUTOR_HTTP_AUTHENTICATION_REALM[] = "mesos-agent-executor"; // Default maximum storage space to be used by the fetcher cache. constexpr Bytes DEFAULT_FETCHER_CACHE_SIZE = Gigabytes(2); +// Default timeout for the fetcher to wait when a net download stalls. +constexpr Duration DEFAULT_FETCHER_STALL_TIMEOUT = Minutes(1); + // If no pings received within this timeout, then the slave will // trigger a re-detection of the master to cause a re-registration. Duration DEFAULT_MASTER_PING_TIMEOUT(); diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp index 8b26e88..e08336d 100644 --- a/src/slave/containerizer/fetcher.cpp +++ b/src/slave/containerizer/fetcher.cpp @@ -561,6 +561,9 @@ Future<Nothing> FetcherProcess::__fetch( info.set_frameworks_home(flags.frameworks_home); } + info.mutable_stall_timeout() + ->set_nanoseconds(flags.fetcher_stall_timeout.ns()); + return run(containerId, sandboxDirectory, user, info) .repair(defer(self(), [=](const Future<Nothing>& future) { ++metrics.task_fetches_failed; diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp index 7129cea..ec8ff29 100644 --- a/src/slave/flags.cpp +++ b/src/slave/flags.cpp @@ -248,6 +248,15 @@ mesos::internal::slave::Flags::Flags() " each other when occupying a shared space (i.e. disk contention).", path::join(os::temp(), "mesos", "fetch")); + add(&Flags::fetcher_stall_timeout, + "fetcher_stall_timeout", + "Amount of time for the fetcher to wait before considering a download\n" + "being too slow and abort it when the download stalls (i.e., the speed\n" + "keeps below one byte per second).\n" + "NOTE: This feature only applies when downloading data from the net and\n" + "does not apply to HDFS.", + DEFAULT_FETCHER_STALL_TIMEOUT); + add(&Flags::work_dir, "work_dir", "Path of the agent work directory. This is where executor sandboxes\n" diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp index 5e62e54..91f7297 100644 --- a/src/slave/flags.hpp +++ b/src/slave/flags.hpp @@ -67,6 +67,7 @@ public: Option<std::string> attributes; Bytes fetcher_cache_size; std::string fetcher_cache_dir; + Duration fetcher_stall_timeout; std::string work_dir; std::string runtime_dir; std::string launcher_dir;