Refactored Docker containerizer launch path per interface changes. This continues the containerizer interface change that replaced most "Infos" (Task, Executor, Command, Container) with a single `ContainerConfig` and combined the nested/non-nested container launch paths.
Notably, this commit also changes the fields stored in the Docker containerizer to match the new interface. The somewhat ambiguously named `directory` field has been renamed to `containerWorkDir`. And the copy of `Slave::flags` in each container has been removed because it is not used. Review: https://reviews.apache.org/r/58907 Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/66070ebd Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/66070ebd Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/66070ebd Branch: refs/heads/master Commit: 66070ebd35a26273f419deb3ae50df0ae5b7a48a Parents: 55c86cb Author: Joseph Wu <josep...@apache.org> Authored: Mon May 1 12:19:02 2017 -0700 Committer: Joseph Wu <josep...@apache.org> Committed: Thu May 25 18:37:07 2017 -0700 ---------------------------------------------------------------------- src/slave/containerizer/docker.cpp | 216 ++++++++++++++------------------ src/slave/containerizer/docker.hpp | 132 +++++++++---------- 2 files changed, 151 insertions(+), 197 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/66070ebd/src/slave/containerizer/docker.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.cpp b/src/slave/containerizer/docker.cpp index c90750d..9f84109 100644 --- a/src/slave/containerizer/docker.cpp +++ b/src/slave/containerizer/docker.cpp @@ -72,8 +72,9 @@ using std::set; using std::string; using std::vector; -using mesos::slave::ContainerLogger; +using mesos::slave::ContainerConfig; using mesos::slave::ContainerIO; +using mesos::slave::ContainerLogger; using mesos::slave::ContainerTermination; using mesos::internal::slave::state::SlaveState; @@ -271,19 +272,24 @@ DockerContainerizer::~DockerContainerizer() Try<DockerContainerizerProcess::Container*> DockerContainerizerProcess::Container::create( const ContainerID& id, - const Option<TaskInfo>& taskInfo, - const ExecutorInfo& executorInfo, - const string& directory, - const Option<string>& user, - const SlaveID& slaveId, + const ContainerConfig& containerConfig, const map<string, string>& environment, - bool checkpoint, + const Option<string>& pidCheckpointPath, const Flags& flags) { + // We need to extract a SlaveID based on the sandbox directory, + // for the purpose of working around a limitation of the Docker CLI. + // If the sandbox directory contains a colon, the sandbox directory + // cannot be mounted directly into the container directory. Instead, + // we symlink the sandbox directory and mount the symlink. + // See MESOS-1833 for more details. + Try<paths::ExecutorRunPath> runPath = + paths::parseExecutorRunPath(flags.work_dir, containerConfig.directory()); + CHECK_SOME(runPath) << "Unable to determine SlaveID from sandbox directory"; string dockerSymlinkPath = path::join( - paths::getSlavePath(flags.work_dir, slaveId), + paths::getSlavePath(flags.work_dir, runPath->slaveId), DOCKER_SYMLINK_DIRECTORY); Try<Nothing> mkdir = os::mkdir(dockerSymlinkPath); @@ -293,17 +299,17 @@ DockerContainerizerProcess::Container::create( } bool symlinked = false; - string containerWorkdir = directory; - // We need to symlink the sandbox directory if the directory - // path has a colon, as Docker CLI uses the colon as a separator. - if (strings::contains(directory, ":")) { + string containerWorkdir = containerConfig.directory(); + if (strings::contains(containerConfig.directory(), ":")) { containerWorkdir = path::join(dockerSymlinkPath, id.value()); - Try<Nothing> symlink = ::fs::symlink(directory, containerWorkdir); + Try<Nothing> symlink = + ::fs::symlink(containerConfig.directory(), containerWorkdir); if (symlink.isError()) { - return Error("Failed to symlink directory '" + directory + - "' to '" + containerWorkdir + "': " + symlink.error()); + return Error( + "Failed to symlink directory '" + containerConfig.directory() + + "' to '" + containerWorkdir + "': " + symlink.error()); } symlinked = true; @@ -312,7 +318,7 @@ DockerContainerizerProcess::Container::create( Option<ContainerInfo> containerInfo = None(); Option<CommandInfo> commandInfo = None(); bool launchesExecutorContainer = false; - if (taskInfo.isSome() && flags.docker_mesos_image.isSome()) { + if (containerConfig.has_task_info() && flags.docker_mesos_image.isSome()) { // Override the container and command to launch an executor // in a docker container. ContainerInfo newContainerInfo; @@ -381,8 +387,9 @@ DockerContainerizerProcess::Container::create( } } - if (taskInfo->has_command()) { - newCommandInfo.mutable_uris()->CopyFrom(taskInfo->command().uris()); + if (containerConfig.task_info().has_command()) { + newCommandInfo.mutable_uris() + ->CopyFrom(containerConfig.task_info().command().uris()); } containerInfo = newContainerInfo; @@ -392,24 +399,19 @@ DockerContainerizerProcess::Container::create( return new Container( id, - taskInfo, - executorInfo, - containerWorkdir, - user, - slaveId, - checkpoint, + containerConfig, + environment, + pidCheckpointPath, symlinked, - flags, + containerWorkdir, commandInfo, containerInfo, - environment, launchesExecutorContainer); } Future<Nothing> DockerContainerizerProcess::fetch( - const ContainerID& containerId, - const SlaveID& slaveId) + const ContainerID& containerId) { CHECK(containers_.contains(containerId)); Container* container = containers_.at(containerId); @@ -417,10 +419,8 @@ Future<Nothing> DockerContainerizerProcess::fetch( return fetcher->fetch( containerId, container->command, - container->directory, - None(), - slaveId, - flags); + container->containerWorkDir, + None()); } @@ -437,7 +437,7 @@ Future<Nothing> DockerContainerizerProcess::pull( string image = container->image(); Future<Docker::Image> future = docker->pull( - container->directory, + container->containerWorkDir, image, container->forcePullImage()); @@ -605,7 +605,7 @@ Future<Nothing> DockerContainerizerProcess::mountPersistentVolumes( Container* container = containers_.at(containerId); container->state = Container::MOUNTING; - if (container->task.isNone() && + if (!container->containerConfig.has_task_info() && !container->resources.persistentVolumes().empty()) { LOG(ERROR) << "Persistent volumes found with container '" << containerId << "' but are not supported with custom executors"; @@ -614,7 +614,7 @@ Future<Nothing> DockerContainerizerProcess::mountPersistentVolumes( Try<Nothing> updateVolumes = updatePersistentVolumes( containerId, - container->directory, + container->containerWorkDir, Resources(), container->resources); @@ -773,18 +773,12 @@ Try<Nothing> DockerContainerizerProcess::checkpoint( container->executorPid = pid; - if (container->checkpoint) { - const string& path = - slave::paths::getForkedPidPath( - slave::paths::getMetaRootDir(flags.work_dir), - container->slaveId, - container->executor.framework_id(), - container->executor.executor_id(), - containerId); - - LOG(INFO) << "Checkpointing pid " << pid << " to '" << path << "'"; + if (container->pidCheckpointPath.isSome()) { + LOG(INFO) << "Checkpointing pid " << pid + << " to '" << container->pidCheckpointPath.get() << "'"; - return slave::state::checkpoint(path, stringify(pid)); + return slave::state::checkpoint( + container->pidCheckpointPath.get(), stringify(pid)); } return Nothing(); @@ -803,25 +797,17 @@ Future<Nothing> DockerContainerizer::recover( Future<bool> DockerContainerizer::launch( const ContainerID& containerId, - const Option<TaskInfo>& taskInfo, - const ExecutorInfo& executorInfo, - const string& directory, - const Option<string>& user, - const SlaveID& slaveId, + const ContainerConfig& containerConfig, const map<string, string>& environment, - bool checkpoint) + const Option<string>& pidCheckpointPath) { return dispatch( process.get(), &DockerContainerizerProcess::launch, containerId, - taskInfo, - executorInfo, - directory, - user, - slaveId, + containerConfig, environment, - checkpoint); + pidCheckpointPath); } @@ -1003,7 +989,6 @@ Future<Nothing> DockerContainerizerProcess::_recover( // Create and store a container. Container* container = new Container(containerId); containers_[containerId] = container; - container->slaveId = state->id; container->state = Container::RUNNING; container->launchesExecutorContainer = executorContainers.contains(containerId); @@ -1040,7 +1025,7 @@ Future<Nothing> DockerContainerizerProcess::_recover( executor.id, containerId); - container->directory = sandboxDirectory; + container->containerWorkDir = sandboxDirectory; } } } @@ -1106,47 +1091,33 @@ Future<Nothing> DockerContainerizerProcess::__recover( Future<bool> DockerContainerizerProcess::launch( const ContainerID& containerId, - const Option<TaskInfo>& taskInfo, - const ExecutorInfo& executorInfo, - const string& directory, - const Option<string>& user, - const SlaveID& slaveId, + const ContainerConfig& containerConfig, const map<string, string>& environment, - bool checkpoint) + const Option<string>& pidCheckpointPath) { - CHECK(!containerId.has_parent()); + if (containerId.has_parent()) { + return Failure("Nested containers are not supported"); + } if (containers_.contains(containerId)) { return Failure("Container already started"); } - Option<ContainerInfo> containerInfo; - - if (taskInfo.isSome() && taskInfo.get().has_container()) { - containerInfo = taskInfo.get().container(); - } else if (executorInfo.has_container()) { - containerInfo = executorInfo.container(); - } - - if (containerInfo.isNone()) { + if (!containerConfig.has_container_info()) { LOG(INFO) << "No container info found, skipping launch"; return false; } - if (containerInfo.get().type() != ContainerInfo::DOCKER) { + if (containerConfig.container_info().type() != ContainerInfo::DOCKER) { LOG(INFO) << "Skipping non-docker container"; return false; } Try<Container*> container = Container::create( containerId, - taskInfo, - executorInfo, - directory, - user, - slaveId, + containerConfig, environment, - checkpoint, + pidCheckpointPath, flags); if (container.isError()) { @@ -1155,28 +1126,27 @@ Future<bool> DockerContainerizerProcess::launch( containers_[containerId] = container.get(); - if (taskInfo.isSome()) { - LOG(INFO) << "Starting container '" << containerId - << "' for task '" << taskInfo.get().task_id() - << "' (and executor '" << executorInfo.executor_id() - << "') of framework " << executorInfo.framework_id(); - } else { - LOG(INFO) << "Starting container '" << containerId - << "' for executor '" << executorInfo.executor_id() - << "' and framework " << executorInfo.framework_id(); - } + LOG(INFO) + << "Starting container '" << containerId + << (containerConfig.has_task_info() + ? "' for task '" + stringify(containerConfig.task_info().task_id()) + : "") + << "' (and executor '" << containerConfig.executor_info().executor_id() + << "') of framework " << containerConfig.executor_info().framework_id(); Future<Nothing> f = Nothing(); if (HookManager::hooksAvailable()) { f = HookManager::slavePreLaunchDockerTaskExecutorDecorator( - taskInfo, - executorInfo, + containerConfig.has_task_info() + ? containerConfig.task_info() + : Option<TaskInfo>::none(), + containerConfig.executor_info(), container.get()->containerName, - container.get()->directory, + container.get()->containerWorkDir, flags.sandbox_directory, container.get()->environment) - .then(defer(self(), [this, taskInfo, containerId]( + .then(defer(self(), [this, containerId, containerConfig]( const DockerTaskExecutorPrepareInfo& decoratorInfo) -> Future<Nothing> { if (!containers_.contains(containerId)) { @@ -1209,7 +1179,7 @@ Future<bool> DockerContainerizerProcess::launch( taskEnvironment[variable.name()] = variable.value(); } - if (taskInfo.isSome()) { + if (containerConfig.has_task_info()) { container->taskEnvironment = taskEnvironment; // For dockerized command executors, the flags have already @@ -1242,19 +1212,13 @@ Future<bool> DockerContainerizerProcess::launch( self(), &Self::_launch, containerId, - taskInfo, - executorInfo, - directory, - slaveId)); + containerConfig)); } Future<bool> DockerContainerizerProcess::_launch( const ContainerID& containerId, - const Option<TaskInfo>& taskInfo, - const ExecutorInfo& executorInfo, - const string& directory, - const SlaveID& slaveId) + const ContainerConfig& containerConfig) { if (!containers_.contains(containerId)) { return Failure("Container is already destroyed"); @@ -1262,20 +1226,21 @@ Future<bool> DockerContainerizerProcess::_launch( Container* container = containers_.at(containerId); - if (taskInfo.isSome() && flags.docker_mesos_image.isNone()) { + if (containerConfig.has_task_info() && flags.docker_mesos_image.isNone()) { // Launching task by forking a subprocess to run docker executor. // TODO(steveniemitz): We should call 'update' to set CPU/CFS/mem // quotas after 'launchExecutorProcess'. However, there is a race // where 'update' can be called before mesos-docker-executor // creates the Docker container for the task. See more details in // the comments of r33174. - return container->launch = fetch(containerId, slaveId) + return container->launch = fetch(containerId) .then(defer(self(), [=]() { return pull(containerId); })) .then(defer(self(), [=]() { if (HookManager::hooksAvailable()) { - HookManager::slavePostFetchHook(containerId, directory); + HookManager::slavePostFetchHook( + containerId, containerConfig.directory()); } return mountPersistentVolumes(containerId); @@ -1301,13 +1266,14 @@ Future<bool> DockerContainerizerProcess::_launch( // We need to do so for launching a task because as the slave is // running in a container (via docker_mesos_image flag) we want the // executor to keep running when the slave container dies. - return container->launch = fetch(containerId, slaveId) + return container->launch = fetch(containerId) .then(defer(self(), [=]() { return pull(containerId); })) .then(defer(self(), [=]() { if (HookManager::hooksAvailable()) { - HookManager::slavePostFetchHook(containerId, directory); + HookManager::slavePostFetchHook( + containerId, containerConfig.directory()); } return mountPersistentVolumes(containerId); @@ -1321,7 +1287,8 @@ Future<bool> DockerContainerizerProcess::_launch( // is >= 1.7 this can be changed to pass --cpu-period and // --cpu-quota to the 'docker run' call in // launchExecutorContainer. - return update(containerId, executorInfo.resources(), true) + return update( + containerId, containerConfig.executor_info().resources(), true) .then([=]() { return Future<Docker::Container>(dockerContainer); }); @@ -1347,9 +1314,11 @@ Future<Docker::Container> DockerContainerizerProcess::launchExecutorContainer( container->state = Container::RUNNING; return logger->prepare( - container->executor, - container->directory, - container->user) + container->containerConfig.executor_info(), + container->containerWorkDir, + container->containerConfig.has_user() + ? container->containerConfig.user() + : Option<string>::none()) .then(defer( self(), [=](const ContainerIO& containerIO) @@ -1358,7 +1327,7 @@ Future<Docker::Container> DockerContainerizerProcess::launchExecutorContainer( container->container, container->command, containerName, - container->directory, + container->containerWorkDir, flags.sandbox_directory, container->resources, #ifdef __linux__ @@ -1436,7 +1405,8 @@ Future<pid_t> DockerContainerizerProcess::launchExecutorProcess( // Include any environment variables from ExecutorInfo. foreach (const Environment::Variable& variable, - container->executor.command().environment().variables()) { + container->containerConfig.executor_info() + .command().environment().variables()) { environment[variable.name()] = variable.value(); } @@ -1486,9 +1456,11 @@ Future<pid_t> DockerContainerizerProcess::launchExecutorProcess( return allocateGpus .then(defer(self(), [=]() { return logger->prepare( - container->executor, - container->directory, - container->user); + container->containerConfig.executor_info(), + container->containerWorkDir, + container->containerConfig.has_user() + ? container->containerConfig.user() + : Option<string>::none()); })) .then(defer( self(), @@ -1529,7 +1501,7 @@ Future<pid_t> DockerContainerizerProcess::launchExecutorProcess( ::mesos::internal::docker::Flags launchFlags = dockerFlags( flags, container->containerName, - container->directory, + container->containerWorkDir, container->taskEnvironment); VLOG(1) << "Launching 'mesos-docker-executor' with flags '" @@ -1549,7 +1521,7 @@ Future<pid_t> DockerContainerizerProcess::launchExecutorProcess( None(), parentHooks, {Subprocess::ChildHook::SETSID(), - Subprocess::ChildHook::CHDIR(container->directory)}); + Subprocess::ChildHook::CHDIR(container->containerWorkDir)}); if (s.isError()) { return Failure("Failed to fork executor: " + s.error()); http://git-wip-us.apache.org/repos/asf/mesos/blob/66070ebd/src/slave/containerizer/docker.hpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/docker.hpp b/src/slave/containerizer/docker.hpp index 2ed8e1c..b602a56 100644 --- a/src/slave/containerizer/docker.hpp +++ b/src/slave/containerizer/docker.hpp @@ -88,13 +88,9 @@ public: virtual process::Future<bool> launch( const ContainerID& containerId, - const Option<TaskInfo>& taskInfo, - const ExecutorInfo& executorInfo, - const std::string& directory, - const Option<std::string>& user, - const SlaveID& slaveId, + const mesos::slave::ContainerConfig& containerConfig, const std::map<std::string, std::string>& environment, - bool checkpoint); + const Option<std::string>& pidCheckpointPath); virtual process::Future<Nothing> update( const ContainerID& containerId, @@ -140,13 +136,9 @@ public: virtual process::Future<bool> launch( const ContainerID& containerId, - const Option<TaskInfo>& taskInfo, - const ExecutorInfo& executorInfo, - const std::string& directory, - const Option<std::string>& user, - const SlaveID& slaveId, + const mesos::slave::ContainerConfig& containerConfig, const std::map<std::string, std::string>& environment, - bool checkpoint); + const Option<std::string>& pidCheckpointPath); // force = true causes the containerizer to update the resources // for the container, even if they match what it has cached. @@ -168,9 +160,7 @@ public: const ContainerID& containerId, bool killed = true); // process is either killed or reaped. - virtual process::Future<Nothing> fetch( - const ContainerID& containerId, - const SlaveID& slaveId); + virtual process::Future<Nothing> fetch(const ContainerID& containerId); virtual process::Future<Nothing> pull(const ContainerID& containerId); @@ -188,10 +178,7 @@ private: process::Future<bool> _launch( const ContainerID& containerId, - const Option<TaskInfo>& taskInfo, - const ExecutorInfo& executorInfo, - const std::string& directory, - const SlaveID& slaveId); + const mesos::slave::ContainerConfig& containerConfig); process::Future<Nothing> _recover( const Option<state::SlaveState>& state, @@ -307,13 +294,9 @@ private: { static Try<Container*> create( const ContainerID& id, - const Option<TaskInfo>& taskInfo, - const ExecutorInfo& executorInfo, - const std::string& directory, - const Option<std::string>& user, - const SlaveID& slaveId, + const mesos::slave::ContainerConfig& containerConfig, const std::map<std::string, std::string>& environment, - bool checkpoint, + const Option<std::string>& pidCheckpointPath, const Flags& flags); static std::string name(const ContainerID& id) @@ -324,30 +307,23 @@ private: Container(const ContainerID& id) : state(FETCHING), id(id) {} - Container(const ContainerID& id, - const Option<TaskInfo>& taskInfo, - const ExecutorInfo& executorInfo, - const std::string& directory, - const Option<std::string>& user, - const SlaveID& slaveId, - bool checkpoint, - bool symlinked, - const Flags& flags, - const Option<CommandInfo>& _command, - const Option<ContainerInfo>& _container, - const std::map<std::string, std::string>& _environment, - bool launchesExecutorContainer) + Container( + const ContainerID& _id, + const mesos::slave::ContainerConfig& _containerConfig, + const std::map<std::string, std::string>& _environment, + const Option<std::string>& _pidCheckpointPath, + bool symlinked, + const std::string& containerWorkDir, + const Option<CommandInfo>& _command, + const Option<ContainerInfo>& _container, + bool launchesExecutorContainer) : state(FETCHING), - id(id), - task(taskInfo), - executor(executorInfo), + id(_id), + containerConfig(_containerConfig), + pidCheckpointPath(_pidCheckpointPath), environment(_environment), - directory(directory), - user(user), - slaveId(slaveId), - checkpoint(checkpoint), symlinked(symlinked), - flags(flags), + containerWorkDir(containerWorkDir), containerName(name(id)), launchesExecutorContainer(launchesExecutorContainer) { @@ -361,26 +337,24 @@ private: // perfect check because an executor might always have a subset // of it's resources that match a task, nevertheless, it's // better than nothing). - resources = executor.resources(); + resources = containerConfig.resources(); - if (task.isSome()) { - CHECK(resources.contains(task.get().resources())); + if (containerConfig.has_task_info()) { + CHECK(resources.contains(containerConfig.task_info().resources())); } if (_command.isSome()) { command = _command.get(); - } else if (task.isSome()) { - command = task.get().command(); } else { - command = executor.command(); + command = containerConfig.command_info(); } if (_container.isSome()) { container = _container.get(); - } else if (task.isSome()) { - container = task.get().container(); } else { - container = executor.container(); + // NOTE: The existence of this field is checked in + // DockerContainerizerProcess::launch. + container = containerConfig.container_info(); } } @@ -389,7 +363,7 @@ private: if (symlinked) { // The sandbox directory is a symlink, remove it at container // destroy. - os::rm(directory); + os::rm(containerWorkDir); } } @@ -404,20 +378,22 @@ private: std::string image() const { - if (task.isSome()) { - return task.get().container().docker().image(); + if (containerConfig.has_task_info()) { + return containerConfig.task_info().container().docker().image(); } - return executor.container().docker().image(); + return containerConfig.executor_info().container().docker().image(); } bool forcePullImage() const { - if (task.isSome()) { - return task.get().container().docker().force_pull_image(); + if (containerConfig.has_task_info()) { + return containerConfig.task_info() + .container().docker().force_pull_image(); } - return executor.container().docker().force_pull_image(); + return containerConfig.executor_info() + .container().docker().force_pull_image(); } // The DockerContainerizer needs to be able to properly clean up @@ -452,27 +428,33 @@ private: DESTROYING = 5 } state; + // Copies of the parameters sent to `Container::create`. const ContainerID id; - const Option<TaskInfo> task; - const ExecutorInfo executor; + const mesos::slave::ContainerConfig containerConfig; + const Option<std::string> pidCheckpointPath; + + // A copy of the parameter sent to `Container::create`. + // NOTE: This may be modified further by hooks. + std::map<std::string, std::string> environment; + + // The sandbox directory for the container. This holds the + // symlinked path if symlinked boolean is true. + // TODO(josephw): The symlink path does not persist across failovers, + // so we will not delete the symlink if the agent restarts. This results + // in gradually leaking hanging symlinks. + bool symlinked; + std::string containerWorkDir; + + // Copies of the fields in `containerConfig`, except when the + // container is a command task and the agent is launched with + // the --docker_mesos_image flag. ContainerInfo container; CommandInfo command; - std::map<std::string, std::string> environment; // Environment variables that the command executor should pass // onto a docker-ized task. This is set by a hook. Option<std::map<std::string, std::string>> taskEnvironment; - // The sandbox directory for the container. This holds the - // symlinked path if symlinked boolean is true. - std::string directory; - - const Option<std::string> user; - SlaveID slaveId; - bool checkpoint; - bool symlinked; - const Flags flags; - // The string used to refer to this container via the Docker CLI. // This name is either computed by concatenating the DOCKER_NAME_PREFIX // and the ContainerID; or during recovery, by taking the recovered