YARN-4599. Set OOM control for memory cgroups. (Miklos Szegedi via Haibo Chen)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d9964799 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d9964799 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d9964799 Branch: refs/heads/trunk Commit: d9964799544eefcf424fcc178d987525f5356cdf Parents: f09dc73 Author: Haibo Chen <haiboc...@apache.org> Authored: Wed May 23 11:29:55 2018 -0700 Committer: Haibo Chen <haiboc...@apache.org> Committed: Wed May 23 16:35:37 2018 -0700 ---------------------------------------------------------------------- .gitignore | 1 + .../hadoop/yarn/conf/YarnConfiguration.java | 26 +- .../src/main/resources/yarn-default.xml | 67 ++- .../src/CMakeLists.txt | 19 + .../CGroupElasticMemoryController.java | 476 +++++++++++++++++++ .../linux/resources/CGroupsHandler.java | 6 + .../linux/resources/CGroupsHandlerImpl.java | 6 +- .../CGroupsMemoryResourceHandlerImpl.java | 15 - .../linux/resources/DefaultOOMHandler.java | 254 ++++++++++ .../monitor/ContainersMonitorImpl.java | 50 ++ .../executor/ContainerSignalContext.java | 41 ++ .../native/oom-listener/impl/oom_listener.c | 171 +++++++ .../native/oom-listener/impl/oom_listener.h | 102 ++++ .../oom-listener/impl/oom_listener_main.c | 104 ++++ .../oom-listener/test/oom_listener_test_main.cc | 292 ++++++++++++ .../resources/DummyRunnableWithContext.java | 31 ++ .../TestCGroupElasticMemoryController.java | 319 +++++++++++++ .../TestCGroupsMemoryResourceHandlerImpl.java | 6 +- .../linux/resources/TestDefaultOOMHandler.java | 307 ++++++++++++ .../monitor/TestContainersMonitor.java | 1 + .../TestContainersMonitorResourceChange.java | 3 +- .../site/markdown/NodeManagerCGroupsMemory.md | 133 ++++++ 22 files changed, 2391 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 934c009..428950b 100644 --- a/.gitignore +++ b/.gitignore @@ -17,6 +17,7 @@ target build dependency-reduced-pom.xml +make-build-debug # Filesystem contract test options and credentials auth-keys.xml http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 8e56cb8..6d08831 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1440,6 +1440,25 @@ public class YarnConfiguration extends Configuration { NM_PREFIX + "vmem-pmem-ratio"; public static final float DEFAULT_NM_VMEM_PMEM_RATIO = 2.1f; + /** Specifies whether to do memory check on overall usage. */ + public static final String NM_ELASTIC_MEMORY_CONTROL_ENABLED = NM_PREFIX + + "elastic-memory-control.enabled"; + public static final boolean DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED = false; + + /** Specifies the OOM handler code. */ + public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER = NM_PREFIX + + "elastic-memory-control.oom-handler"; + + /** The path to the OOM listener.*/ + public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH = + NM_PREFIX + "elastic-memory-control.oom-listener.path"; + + /** Maximum time in seconds to resolve an OOM situation. */ + public static final String NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC = + NM_PREFIX + "elastic-memory-control.timeout-sec"; + public static final Integer + DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC = 5; + /** Number of Virtual CPU Cores which can be allocated for containers.*/ public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores"; public static final int DEFAULT_NM_VCORES = 8; @@ -2006,13 +2025,6 @@ public class YarnConfiguration extends Configuration { /** The path to the Linux container executor.*/ public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH = NM_PREFIX + "linux-container-executor.path"; - - /** - * The UNIX group that the linux-container-executor should run as. - * This is intended to be set as part of container-executor.cfg. - */ - public static final String NM_LINUX_CONTAINER_GROUP = - NM_PREFIX + "linux-container-executor.group"; /** * True if linux-container-executor should limit itself to one user http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 156ca24..da44ccb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -772,7 +772,7 @@ <property> <description>Maximum size in bytes for configurations that can be provided by application to RM for delegation token renewal. - By experiment, it's roughly 128 bytes per key-value pair. + By experiment, its roughly 128 bytes per key-value pair. The default value 12800 allows roughly 100 configs, may be less. </description> <name>yarn.resourcemanager.delegation-token.max-conf-size-bytes</name> @@ -1860,14 +1860,6 @@ </property> <property> - <description> - The UNIX group that the linux-container-executor should run as. - </description> - <name>yarn.nodemanager.linux-container-executor.group</name> - <value></value> - </property> - - <property> <description>T-file compression types used to compress aggregated logs.</description> <name>yarn.nodemanager.log-aggregation.compression-type</name> <value>none</value> @@ -2158,7 +2150,7 @@ <description> In the server side it indicates whether timeline service is enabled or not. And in the client side, users can enable it to indicate whether client wants - to use timeline service. If it's enabled in the client side along with + to use timeline service. If its enabled in the client side along with security, then yarn client tries to fetch the delegation tokens for the timeline server. </description> @@ -3404,7 +3396,7 @@ <description> Defines the limit of the diagnostics message of an application attempt, in kilo characters (character count * 1024). - When using ZooKeeper to store application state behavior, it's + When using ZooKeeper to store application state behavior, its important to limit the size of the diagnostic messages to prevent YARN from overwhelming ZooKeeper. In cases where yarn.resourcemanager.state-store.max-completed-applications is set to @@ -3819,4 +3811,57 @@ <value>/usr/bin/numactl</value> </property> + <property> + <description> + Enable elastic memory control. This is a Linux only feature. + When enabled, the node manager adds a listener to receive an + event, if all the containers exceeded a limit. + The limit is specified by yarn.nodemanager.resource.memory-mb. + If this is not set, the limit is set based on the capabilities. + See yarn.nodemanager.resource.detect-hardware-capabilities + for details. + The limit applies to the physical or virtual (rss+swap) memory + depending on whether yarn.nodemanager.pmem-check-enabled or + yarn.nodemanager.vmem-check-enabled is set. + </description> + <name>yarn.nodemanager.elastic-memory-control.enabled</name> + <value>false</value> + </property> + + <property> + <description> + The name of a JVM class. The class must implement the Runnable + interface. It is called, + if yarn.nodemanager.elastic-memory-control.enabled + is set and the system reaches its memory limit. + When called the handler must preempt a container, + since all containers are frozen by cgroups. + Once preempted some memory is released, so that the + kernel can resume all containers. Because of this the + handler has to act quickly. + </description> + <name>yarn.nodemanager.elastic-memory-control.oom-handler</name> + <value>org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.DefaultOOMHandler</value> + </property> + + <property> + <description> + The path to the oom-listener tool. Elastic memory control is only + supported on Linux. It relies on kernel events. The tool forwards + these kernel events to the standard input, so that the node manager + can preempt containers, in and out-of-memory scenario. + You rarely need to update this setting. + </description> + <name>yarn.nodemanager.elastic-memory-control.oom-listener.path</name> + <value></value> + </property> + + <property> + <description> + Maximum time to wait for an OOM situation to get resolved before + bringing down the node. + </description> + <name>yarn.nodemanager.elastic-memory-control.timeout-sec</name> + <value>5</value> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt index 79faeec..a614f80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt @@ -30,6 +30,7 @@ string(REGEX MATCH . HCD_ONE "${HADOOP_CONF_DIR}") string(COMPARE EQUAL ${HCD_ONE} / HADOOP_CONF_DIR_IS_ABS) set (CMAKE_C_STANDARD 99) +set (CMAKE_CXX_STANDARD 11) include(CheckIncludeFiles) check_include_files("sys/types.h;sys/sysctl.h" HAVE_SYS_SYSCTL_H) @@ -113,6 +114,7 @@ include_directories( ${GTEST_SRC_DIR}/include main/native/container-executor main/native/container-executor/impl + main/native/oom-listener/impl ) # add gtest as system library to suppress gcc warnings include_directories(SYSTEM ${GTEST_SRC_DIR}/include) @@ -171,3 +173,20 @@ add_executable(cetest main/native/container-executor/test/utils/test_docker_util.cc) target_link_libraries(cetest gtest container) output_directory(cetest test) + +# CGroup OOM listener +add_executable(oom-listener + main/native/oom-listener/impl/oom_listener.c + main/native/oom-listener/impl/oom_listener.h + main/native/oom-listener/impl/oom_listener_main.c +) +output_directory(oom-listener target/usr/local/bin) + +# CGroup OOM listener test with GTest +add_executable(test-oom-listener + main/native/oom-listener/impl/oom_listener.c + main/native/oom-listener/impl/oom_listener.h + main/native/oom-listener/test/oom_listener_test_main.cc +) +target_link_libraries(test-oom-listener gtest) +output_directory(test-oom-listener test) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java new file mode 100644 index 0000000..752c3a6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupElasticMemoryController.java @@ -0,0 +1,476 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; + +import java.io.File; +import java.io.InputStream; +import java.lang.reflect.Constructor; +import java.nio.charset.Charset; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_PMEM_CHECK_ENABLED; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.NM_VMEM_CHECK_ENABLED; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_NO_LIMIT; + +/** + * This thread controls memory usage using cgroups. It listens to out of memory + * events of all the containers together, and if we go over the limit picks + * a container to kill. The algorithm that picks the container is a plugin. + */ +public class CGroupElasticMemoryController extends Thread { + protected static final Log LOG = LogFactory + .getLog(CGroupElasticMemoryController.class); + private final Clock clock = new MonotonicClock(); + private String yarnCGroupPath; + private String oomListenerPath; + private Runnable oomHandler; + private CGroupsHandler cgroups; + private boolean controlPhysicalMemory; + private boolean controlVirtualMemory; + private long limit; + private Process process = null; + private boolean stopped = false; + private int timeoutMS; + + /** + * Default constructor. + * @param conf Yarn configuration to use + * @param context Node manager context to out of memory handler + * @param cgroups Cgroups handler configured + * @param controlPhysicalMemory Whether to listen to physical memory OOM + * @param controlVirtualMemory Whether to listen to virtual memory OOM + * @param limit memory limit in bytes + * @param oomHandlerOverride optional OOM handler + * @exception YarnException Could not instantiate class + */ + @VisibleForTesting + CGroupElasticMemoryController(Configuration conf, + Context context, + CGroupsHandler cgroups, + boolean controlPhysicalMemory, + boolean controlVirtualMemory, + long limit, + Runnable oomHandlerOverride) + throws YarnException { + super("CGroupElasticMemoryController"); + boolean controlVirtual = controlVirtualMemory && !controlPhysicalMemory; + Runnable oomHandlerTemp = + getDefaultOOMHandler(conf, context, oomHandlerOverride, controlVirtual); + if (controlPhysicalMemory && controlVirtualMemory) { + LOG.warn( + NM_ELASTIC_MEMORY_CONTROL_ENABLED + " is on. " + + "We cannot control both virtual and physical " + + "memory at the same time. Enforcing virtual memory. " + + "If swapping is enabled set " + + "only " + NM_PMEM_CHECK_ENABLED + " to true otherwise set " + + "only " + NM_VMEM_CHECK_ENABLED + " to true."); + } + if (!controlPhysicalMemory && !controlVirtualMemory) { + throw new YarnException( + NM_ELASTIC_MEMORY_CONTROL_ENABLED + " is on. " + + "We need either virtual or physical memory check requested. " + + "If swapping is enabled set " + + "only " + NM_PMEM_CHECK_ENABLED + " to true otherwise set " + + "only " + NM_VMEM_CHECK_ENABLED + " to true."); + } + // We are safe at this point that no more exceptions can be thrown + this.timeoutMS = + 1000 * conf.getInt(NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC, + DEFAULT_NM_ELASTIC_MEMORY_CONTROL_OOM_TIMEOUT_SEC); + this.oomListenerPath = getOOMListenerExecutablePath(conf); + this.oomHandler = oomHandlerTemp; + this.cgroups = cgroups; + this.controlPhysicalMemory = !controlVirtual; + this.controlVirtualMemory = controlVirtual; + this.yarnCGroupPath = this.cgroups + .getPathForCGroup(CGroupsHandler.CGroupController.MEMORY, ""); + this.limit = limit; + } + + /** + * Get the configured OOM handler. + * @param conf configuration + * @param context context to pass to constructor + * @param oomHandlerLocal Default override + * @param controlVirtual Control physical or virtual memory + * @return The configured or overridden OOM handler. + * @throws YarnException in case the constructor failed + */ + private Runnable getDefaultOOMHandler( + Configuration conf, Context context, Runnable oomHandlerLocal, + boolean controlVirtual) + throws YarnException { + Class oomHandlerClass = + conf.getClass( + YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_HANDLER, + DefaultOOMHandler.class); + if (oomHandlerLocal == null) { + try { + Constructor constr = oomHandlerClass.getConstructor( + Context.class, boolean.class); + oomHandlerLocal = (Runnable)constr.newInstance( + context, controlVirtual); + } catch (Exception ex) { + throw new YarnException(ex); + } + } + return oomHandlerLocal; + } + + /** + * Default constructor. + * @param conf Yarn configuration to use + * @param context Node manager context to out of memory handler + * @param cgroups Cgroups handler configured + * @param controlPhysicalMemory Whether to listen to physical memory OOM + * @param controlVirtualMemory Whether to listen to virtual memory OOM + * @param limit memory limit in bytes + * @exception YarnException Could not instantiate class + */ + public CGroupElasticMemoryController(Configuration conf, + Context context, + CGroupsHandler cgroups, + boolean controlPhysicalMemory, + boolean controlVirtualMemory, + long limit) + throws YarnException { + this(conf, + context, + cgroups, + controlPhysicalMemory, + controlVirtualMemory, + limit, + null); + } + + /** + * Exception thrown if the OOM situation is not resolved. + */ + static private class OOMNotResolvedException extends YarnRuntimeException { + OOMNotResolvedException(String message, Exception parent) { + super(message, parent); + } + } + + /** + * Stop listening to the cgroup. + */ + public synchronized void stopListening() { + stopped = true; + if (process != null) { + process.destroyForcibly(); + } else { + LOG.warn("Trying to stop listening, when listening is not running"); + } + } + + /** + * Checks if the CGroupElasticMemoryController is available on this system. + * This assumes that Linux container executor is already initialized. + * We need to have CGroups enabled. + * + * @return True if CGroupElasticMemoryController is available. + * False otherwise. + */ + public static boolean isAvailable() { + try { + if (!Shell.LINUX) { + LOG.info("CGroupElasticMemoryController currently is supported only " + + "on Linux."); + return false; + } + if (ResourceHandlerModule.getCGroupsHandler() == null || + ResourceHandlerModule.getMemoryResourceHandler() == null) { + LOG.info("CGroupElasticMemoryController requires enabling " + + "memory CGroups with" + + YarnConfiguration.NM_MEMORY_RESOURCE_ENABLED); + return false; + } + } catch (SecurityException se) { + LOG.info("Failed to get Operating System name. " + se); + return false; + } + return true; + } + + /** + * Main OOM listening thread. It uses an external process to listen to + * Linux events. The external process does not need to run as root, so + * it is not related to container-executor. We do not use JNI for security + * reasons. + */ + @Override + public void run() { + ExecutorService executor = null; + try { + // Disable OOM killer and set a limit. + // This has to be set first, so that we get notified about valid events. + // We will be notified about events even, if they happened before + // oom-listener started + setCGroupParameters(); + + // Start a listener process + ProcessBuilder oomListener = new ProcessBuilder(); + oomListener.command(oomListenerPath, yarnCGroupPath); + synchronized (this) { + if (!stopped) { + process = oomListener.start(); + } else { + resetCGroupParameters(); + LOG.info("Listener stopped before starting"); + return; + } + } + LOG.info(String.format("Listening on %s with %s", + yarnCGroupPath, + oomListenerPath)); + + // We need 1 thread for the error stream and a few others + // as a watchdog for the OOM killer + executor = Executors.newFixedThreadPool(2); + + // Listen to any errors in the background. We do not expect this to + // be large in size, so it will fit into a string. + Future<String> errorListener = executor.submit( + () -> IOUtils.toString(process.getErrorStream(), + Charset.defaultCharset())); + + // We get Linux event increments (8 bytes) forwarded from the event stream + // The events cannot be split, so it is safe to read them as a whole + // There is no race condition with the cgroup + // running out of memory. If oom is 1 at startup + // oom_listener will send an initial notification + InputStream events = process.getInputStream(); + byte[] event = new byte[8]; + int read; + // This loop can be exited by terminating the process + // with stopListening() + while ((read = events.read(event)) == event.length) { + // An OOM event has occurred + resolveOOM(executor); + } + + if (read != -1) { + LOG.warn(String.format("Characters returned from event hander: %d", + read)); + } + + // If the input stream is closed, we wait for exit or process terminated. + int exitCode = process.waitFor(); + String error = errorListener.get(); + process = null; + LOG.info(String.format("OOM listener exited %d %s", exitCode, error)); + } catch (OOMNotResolvedException ex) { + // We could mark the node unhealthy but it shuts down the node anyways. + // Let's just bring down the node manager all containers are frozen. + throw new YarnRuntimeException("Could not resolve OOM", ex); + } catch (Exception ex) { + synchronized (this) { + if (!stopped) { + LOG.warn("OOM Listener exiting.", ex); + } + } + } finally { + // Make sure we do not leak the child process, + // especially if process.waitFor() did not finish. + if (process != null && process.isAlive()) { + process.destroyForcibly(); + } + if (executor != null) { + try { + executor.awaitTermination(6, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Exiting without processing all OOM events."); + } + executor.shutdown(); + } + resetCGroupParameters(); + } + } + + /** + * Resolve an OOM event. + * Listen to the handler timeouts. + * @param executor Executor to create watchdog with. + * @throws InterruptedException interrupted + * @throws java.util.concurrent.ExecutionException cannot launch watchdog + */ + private void resolveOOM(ExecutorService executor) + throws InterruptedException, java.util.concurrent.ExecutionException { + // Just log, when we are still in OOM after a couple of seconds + final long start = clock.getTime(); + Future<Boolean> watchdog = + executor.submit(() -> watchAndLogOOMState(start)); + // Kill something to resolve the issue + try { + oomHandler.run(); + } catch (RuntimeException ex) { + watchdog.cancel(true); + throw new OOMNotResolvedException("OOM handler failed", ex); + } + if (!watchdog.get()) { + // If we are still in OOM, + // the watchdog will trigger stop + // listening to exit this loop + throw new OOMNotResolvedException("OOM handler timed out", null); + } + } + + /** + * Just watch until we are in OOM and log. Send an update log every second. + * @return if the OOM was resolved successfully + */ + private boolean watchAndLogOOMState(long start) { + long lastLog = start; + try { + long end = start; + // Throw an error, if we are still in OOM after 5 seconds + while(end - start < timeoutMS) { + end = clock.getTime(); + String underOOM = cgroups.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL); + if (underOOM.contains(CGroupsHandler.UNDER_OOM)) { + if (end - lastLog > 1000) { + LOG.warn(String.format( + "OOM not resolved in %d ms", end - start)); + lastLog = end; + } + } else { + LOG.info(String.format( + "Resolved OOM in %d ms", end - start)); + return true; + } + // We do not want to saturate the CPU + // leaving the resources to the actual OOM killer + // but we want to be fast, too. + Thread.sleep(10); + } + } catch (InterruptedException ex) { + LOG.debug("Watchdog interrupted"); + } catch (Exception e) { + LOG.warn("Exception running logging thread", e); + } + LOG.warn(String.format("OOM was not resolved in %d ms", + clock.getTime() - start)); + stopListening(); + return false; + } + + /** + * Update root memory cgroup. This contains all containers. + * The physical limit has to be set first then the virtual limit. + */ + private void setCGroupParameters() throws ResourceHandlerException { + // Disable the OOM killer + cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_OOM_CONTROL, "1"); + if (controlPhysicalMemory && !controlVirtualMemory) { + try { + // Ignore virtual memory limits, since we do not know what it is set to + cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT); + } catch (ResourceHandlerException ex) { + LOG.debug("Swap monitoring is turned off in the kernel"); + } + // Set physical memory limits + cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit)); + } else if (controlVirtualMemory && !controlPhysicalMemory) { + // Ignore virtual memory limits, since we do not know what it is set to + cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT); + // Set physical limits to no more than virtual limits + cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, Long.toString(limit)); + // Set virtual memory limits + // Important: it has to be set after physical limit is set + cgroups.updateCGroupParam(CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, Long.toString(limit)); + } else { + throw new ResourceHandlerException( + String.format("Unsupported scenario physical:%b virtual:%b", + controlPhysicalMemory, controlVirtualMemory)); + } + } + + /** + * Reset root memory cgroup to OS defaults. This controls all containers. + */ + private void resetCGroupParameters() { + try { + try { + // Disable memory limits + cgroups.updateCGroupParam( + CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT); + } catch (ResourceHandlerException ex) { + LOG.debug("Swap monitoring is turned off in the kernel"); + } + cgroups.updateCGroupParam( + CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES, CGROUP_NO_LIMIT); + // Enable the OOM killer + cgroups.updateCGroupParam( + CGroupsHandler.CGroupController.MEMORY, "", + CGROUP_PARAM_MEMORY_OOM_CONTROL, "0"); + } catch (ResourceHandlerException ex) { + LOG.warn("Error in cleanup", ex); + } + } + + private static String getOOMListenerExecutablePath(Configuration conf) { + String yarnHomeEnvVar = + System.getenv(ApplicationConstants.Environment.HADOOP_YARN_HOME.key()); + if (yarnHomeEnvVar == null) { + yarnHomeEnvVar = "."; + } + File hadoopBin = new File(yarnHomeEnvVar, "bin"); + String defaultPath = + new File(hadoopBin, "oom-listener").getAbsolutePath(); + final String path = conf.get( + YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_OOM_LISTENER_PATH, + defaultPath); + LOG.debug(String.format("oom-listener path: %s %s", path, defaultPath)); + return path; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java index e279504..9dc16c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandler.java @@ -76,8 +76,14 @@ public interface CGroupsHandler { String CGROUP_PARAM_BLKIO_WEIGHT = "weight"; String CGROUP_PARAM_MEMORY_HARD_LIMIT_BYTES = "limit_in_bytes"; + String CGROUP_PARAM_MEMORY_SWAP_HARD_LIMIT_BYTES = "memsw.limit_in_bytes"; String CGROUP_PARAM_MEMORY_SOFT_LIMIT_BYTES = "soft_limit_in_bytes"; + String CGROUP_PARAM_MEMORY_OOM_CONTROL = "oom_control"; String CGROUP_PARAM_MEMORY_SWAPPINESS = "swappiness"; + String CGROUP_PARAM_MEMORY_USAGE_BYTES = "usage_in_bytes"; + String CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES = "memsw.usage_in_bytes"; + String CGROUP_NO_LIMIT = "-1"; + String UNDER_OOM = "under_oom 1"; String CGROUP_CPU_PERIOD_US = "cfs_period_us"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java index 008f3d7..6ed94e1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsHandlerImpl.java @@ -594,7 +594,11 @@ class CGroupsHandlerImpl implements CGroupsHandler { @Override public String getCGroupParam(CGroupController controller, String cGroupId, String param) throws ResourceHandlerException { - String cGroupParamPath = getPathForCGroupParam(controller, cGroupId, param); + String cGroupParamPath = + param.equals(CGROUP_FILE_TASKS) ? + getPathForCGroup(controller, cGroupId) + + Path.SEPARATOR + param : + getPathForCGroupParam(controller, cGroupId, param); try { byte[] contents = Files.readAllBytes(Paths.get(cGroupParamPath)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java index 2d1585e..a57adb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/CGroupsMemoryResourceHandlerImpl.java @@ -65,21 +65,6 @@ public class CGroupsMemoryResourceHandlerImpl implements MemoryResourceHandler { @Override public List<PrivilegedOperation> bootstrap(Configuration conf) throws ResourceHandlerException { - boolean pmemEnabled = - conf.getBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, - YarnConfiguration.DEFAULT_NM_PMEM_CHECK_ENABLED); - boolean vmemEnabled = - conf.getBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, - YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED); - if (pmemEnabled || vmemEnabled) { - String msg = "The default YARN physical and/or virtual memory health" - + " checkers as well as the CGroups memory controller are enabled. " - + "If you wish to use the Cgroups memory controller, please turn off" - + " the default physical/virtual memory checkers by setting " - + YarnConfiguration.NM_PMEM_CHECK_ENABLED + " and " - + YarnConfiguration.NM_VMEM_CHECK_ENABLED + " to false."; - throw new ResourceHandlerException(msg); - } this.cGroupsHandler.initializeCGroupController(MEMORY); enforce = conf.getBoolean( YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED, http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java new file mode 100644 index 0000000..c690225 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DefaultOOMHandler.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; + +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_FILE_TASKS; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_OOM_CONTROL; +import static org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupsHandler.CGROUP_PARAM_MEMORY_USAGE_BYTES; + +/** + * A very basic OOM handler implementation. + * See the javadoc on the run() method for details. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class DefaultOOMHandler implements Runnable { + protected static final Log LOG = LogFactory + .getLog(DefaultOOMHandler.class); + private Context context; + private boolean virtual; + private CGroupsHandler cgroups; + + /** + * Create an OOM handler. + * This has to be public to be able to construct through reflection. + * @param context node manager context to work with + * @param testVirtual Test virtual memory or physical + */ + public DefaultOOMHandler(Context context, boolean testVirtual) { + this.context = context; + this.virtual = testVirtual; + this.cgroups = ResourceHandlerModule.getCGroupsHandler(); + } + + @VisibleForTesting + void setCGroupsHandler(CGroupsHandler handler) { + cgroups = handler; + } + + /** + * Kill the container, if it has exceeded its request. + * + * @param container Container to check + * @param fileName CGroup filename (physical or swap/virtual) + * @return true, if the container was preempted + */ + private boolean killContainerIfOOM(Container container, String fileName) { + String value = null; + try { + value = cgroups.getCGroupParam(CGroupsHandler.CGroupController.MEMORY, + container.getContainerId().toString(), + fileName); + long usage = Long.parseLong(value); + long request = container.getResource().getMemorySize() * 1024 * 1024; + + // Check if the container has exceeded its limits. + if (usage > request) { + // Kill the container + // We could call the regular cleanup but that sends a + // SIGTERM first that cannot be handled by frozen processes. + // Walk through the cgroup + // tasks file and kill all processes in it + sigKill(container); + String message = String.format( + "Container %s was killed by elastic cgroups OOM handler using %d " + + "when requested only %d", + container.getContainerId(), usage, request); + LOG.warn(message); + return true; + } + } catch (ResourceHandlerException ex) { + LOG.warn(String.format("Could not access memory resource for %s", + container.getContainerId()), ex); + } catch (NumberFormatException ex) { + LOG.warn(String.format("Could not parse %s in %s", + value, container.getContainerId())); + } + return false; + } + + /** + * SIGKILL the specified container. We do this not using the standard + * container logic. The reason is that the processes are frozen by + * the cgroups OOM handler, so they cannot respond to SIGTERM. + * On the other hand we have to be as fast as possible. + * We walk through the list of active processes in the container. + * This is needed because frozen parents cannot signal their children. + * We kill each process and then try again until the whole cgroup + * is cleaned up. This logic avoids leaking processes in a cgroup. + * Currently the killing only succeeds for PGIDS. + * + * @param container Container to clean up + */ + private void sigKill(Container container) { + boolean finished = false; + try { + while (!finished) { + String[] pids = + cgroups.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + container.getContainerId().toString(), + CGROUP_FILE_TASKS) + .split("\n"); + finished = true; + for (String pid : pids) { + // Note: this kills only PGIDs currently + if (pid != null && !pid.isEmpty()) { + LOG.debug(String.format( + "Terminating container %s Sending SIGKILL to -%s", + container.getContainerId().toString(), + pid)); + finished = false; + try { + context.getContainerExecutor().signalContainer( + new ContainerSignalContext.Builder().setContainer(container) + .setUser(container.getUser()) + .setPid(pid).setSignal(ContainerExecutor.Signal.KILL) + .build()); + } catch (IOException ex) { + LOG.warn(String.format("Cannot kill container %s pid -%s.", + container.getContainerId(), pid), ex); + } + } + } + try { + Thread.sleep(10); + } catch (InterruptedException e) { + LOG.debug("Interrupted while waiting for processes to disappear"); + } + } + } catch (ResourceHandlerException ex) { + LOG.warn(String.format( + "Cannot list more tasks in container %s to kill.", + container.getContainerId())); + } + } + + /** + * It is called when the node is under an OOM condition. All processes in + * all sub-cgroups are suspended. We need to act fast, so that we do not + * affect the overall system utilization. + * In general we try to find a newly run container that exceeded its limits. + * The justification is cost, since probably this is the one that has + * accumulated the least amount of uncommitted data so far. + * We continue the process until the OOM is resolved. + */ + @Override + public void run() { + try { + // Reverse order by start time + Comparator<Container> comparator = (Container o1, Container o2) -> { + long order = o1.getContainerStartTime() - o2.getContainerStartTime(); + return order > 0 ? -1 : order < 0 ? 1 : 0; + }; + + // We kill containers until the kernel reports the OOM situation resolved + // Note: If the kernel has a delay this may kill more than necessary + while (true) { + String status = cgroups.getCGroupParam( + CGroupsHandler.CGroupController.MEMORY, + "", + CGROUP_PARAM_MEMORY_OOM_CONTROL); + if (!status.contains(CGroupsHandler.UNDER_OOM)) { + break; + } + + // The first pass kills a recent container + // that uses more than its request + ArrayList<Container> containers = new ArrayList<>(); + containers.addAll(context.getContainers().values()); + // Note: Sorting may take a long time with 10K+ containers + // but it is acceptable now with low number of containers per node + containers.sort(comparator); + + // Kill the latest container that exceeded its request + boolean found = false; + for (Container container : containers) { + if (!virtual) { + if (killContainerIfOOM(container, + CGROUP_PARAM_MEMORY_USAGE_BYTES)) { + found = true; + break; + } + } else { + if (killContainerIfOOM(container, + CGROUP_PARAM_MEMORY_MEMSW_USAGE_BYTES)) { + found = true; + break; + } + } + } + if (found) { + continue; + } + + // We have not found any containers that ran out of their limit, + // so we will kill the latest one. This can happen, if all use + // close to their request and one of them requests a big block + // triggering the OOM freeze. + // Currently there is no other way to identify the outstanding one. + if (containers.size() > 0) { + Container container = containers.get(0); + sigKill(container); + String message = String.format( + "Newest container %s killed by elastic cgroups OOM handler using", + container.getContainerId()); + LOG.warn(message); + continue; + } + + // This can happen, if SIGKILL did not clean up + // non-PGID or containers or containers launched by other users + // or if a process was put to the root YARN cgroup. + throw new YarnRuntimeException( + "Could not find any containers but CGroups " + + "reserved for containers ran out of memory. " + + "I am giving up"); + } + } catch (ResourceHandlerException ex) { + LOG.warn("Could not fecth OOM status. " + + "This is expected at shutdown. Exiting.", ex); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 35015c2..bd68dfe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerModule; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +67,7 @@ public class ContainersMonitorImpl extends AbstractService implements private long monitoringInterval; private MonitoringThread monitoringThread; + private CGroupElasticMemoryController oomListenerThread; private boolean containerMetricsEnabled; private long containerMetricsPeriodMs; private long containerMetricsUnregisterDelayMs; @@ -85,6 +89,8 @@ public class ContainersMonitorImpl extends AbstractService implements private boolean pmemCheckEnabled; private boolean vmemCheckEnabled; + private boolean elasticMemoryEnforcement; + private boolean strictMemoryEnforcement; private boolean containersMonitorEnabled; private long maxVCoresAllottedForContainers; @@ -173,8 +179,37 @@ public class ContainersMonitorImpl extends AbstractService implements vmemCheckEnabled = this.conf.getBoolean( YarnConfiguration.NM_VMEM_CHECK_ENABLED, YarnConfiguration.DEFAULT_NM_VMEM_CHECK_ENABLED); + elasticMemoryEnforcement = this.conf.getBoolean( + YarnConfiguration.NM_ELASTIC_MEMORY_CONTROL_ENABLED, + YarnConfiguration.DEFAULT_NM_ELASTIC_MEMORY_CONTROL_ENABLED); + strictMemoryEnforcement = conf.getBoolean( + YarnConfiguration.NM_MEMORY_RESOURCE_ENFORCED, + YarnConfiguration.DEFAULT_NM_MEMORY_RESOURCE_ENFORCED); LOG.info("Physical memory check enabled: " + pmemCheckEnabled); LOG.info("Virtual memory check enabled: " + vmemCheckEnabled); + LOG.info("Elastic memory control enabled: " + elasticMemoryEnforcement); + LOG.info("Strict memory control enabled: " + strictMemoryEnforcement); + + if (elasticMemoryEnforcement) { + if (!CGroupElasticMemoryController.isAvailable()) { + // Test for availability outside the constructor + // to be able to write non-Linux unit tests for + // CGroupElasticMemoryController + throw new YarnException( + "CGroup Elastic Memory controller enabled but " + + "it is not available. Exiting."); + } else { + this.oomListenerThread = new CGroupElasticMemoryController( + conf, + context, + ResourceHandlerModule.getCGroupsHandler(), + pmemCheckEnabled, + vmemCheckEnabled, + pmemCheckEnabled ? + maxPmemAllottedForContainers : maxVmemAllottedForContainers + ); + } + } containersMonitorEnabled = isContainerMonitorEnabled() && monitoringInterval > 0; @@ -246,6 +281,9 @@ public class ContainersMonitorImpl extends AbstractService implements if (containersMonitorEnabled) { this.monitoringThread.start(); } + if (oomListenerThread != null) { + oomListenerThread.start(); + } super.serviceStart(); } @@ -259,6 +297,14 @@ public class ContainersMonitorImpl extends AbstractService implements } catch (InterruptedException e) { LOG.info("ContainersMonitorImpl monitoring thread interrupted"); } + if (this.oomListenerThread != null) { + this.oomListenerThread.stopListening(); + try { + this.oomListenerThread.join(); + } finally { + this.oomListenerThread = null; + } + } } super.serviceStop(); } @@ -651,6 +697,10 @@ public class ContainersMonitorImpl extends AbstractService implements ProcessTreeInfo ptInfo, long currentVmemUsage, long currentPmemUsage) { + if (elasticMemoryEnforcement || strictMemoryEnforcement) { + // We enforce the overall memory usage instead of individual containers + return; + } boolean isMemoryOverLimit = false; long vmemLimit = ptInfo.getVmemLimit(); long pmemLimit = ptInfo.getPmemLimit(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java index 56b571b..5b911b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/executor/ContainerSignalContext.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.executor; +import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal; @@ -93,4 +94,44 @@ public final class ContainerSignalContext { public Signal getSignal() { return this.signal; } + + /** + * Retrun true if we are trying to signal the same process. + * @param obj compare to this object + * @return whether we try to signal the same process id + */ + @Override + public boolean equals(Object obj) { + if (obj instanceof ContainerSignalContext) { + ContainerSignalContext other = (ContainerSignalContext)obj; + boolean ret = + (other.getPid() == null && getPid() == null) || + (other.getPid() != null && getPid() != null && + other.getPid().equals(getPid())); + ret = ret && + (other.getSignal() == null && getSignal() == null) || + (other.getSignal() != null && getSignal() != null && + other.getSignal().equals(getSignal())); + ret = ret && + (other.getContainer() == null && getContainer() == null) || + (other.getContainer() != null && getContainer() != null && + other.getContainer().equals(getContainer())); + ret = ret && + (other.getUser() == null && getUser() == null) || + (other.getUser() != null && getUser() != null && + other.getUser().equals(getUser())); + return ret; + } + return super.equals(obj); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(). + append(getPid()). + append(getSignal()). + append(getContainer()). + append(getUser()). + toHashCode(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c new file mode 100644 index 0000000..0086b26 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.c @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if __linux + +#include <sys/param.h> +#include <poll.h> +#include "oom_listener.h" + +/* + * Print an error. +*/ +static inline void print_error(const char *file, const char *message, + ...) { + fprintf(stderr, "%s ", file); + va_list arguments; + va_start(arguments, message); + vfprintf(stderr, message, arguments); + va_end(arguments); +} + +/* + * Listen to OOM events in a memory cgroup. See declaration for details. + */ +int oom_listener(_oom_listener_descriptors *descriptors, const char *cgroup, int fd) { + const char *pattern = + cgroup[MAX(strlen(cgroup), 1) - 1] == '/' + ? "%s%s" :"%s/%s"; + + /* Create an event handle, if we do not have one already*/ + if (descriptors->event_fd == -1 && + (descriptors->event_fd = eventfd(0, 0)) == -1) { + print_error(descriptors->command, "eventfd() failed. errno:%d %s\n", + errno, strerror(errno)); + return EXIT_FAILURE; + } + + /* + * open the file to listen to (memory.oom_control) + * and write the event handle and the file handle + * to cgroup.event_control + */ + if (snprintf(descriptors->event_control_path, + sizeof(descriptors->event_control_path), + pattern, + cgroup, + "cgroup.event_control") < 0) { + print_error(descriptors->command, "path too long %s\n", cgroup); + return EXIT_FAILURE; + } + + if ((descriptors->event_control_fd = open( + descriptors->event_control_path, + O_WRONLY|O_CREAT, 0600)) == -1) { + print_error(descriptors->command, "Could not open %s. errno:%d %s\n", + descriptors->event_control_path, + errno, strerror(errno)); + return EXIT_FAILURE; + } + + if (snprintf(descriptors->oom_control_path, + sizeof(descriptors->oom_control_path), + pattern, + cgroup, + "memory.oom_control") < 0) { + print_error(descriptors->command, "path too long %s\n", cgroup); + return EXIT_FAILURE; + } + + if ((descriptors->oom_control_fd = open( + descriptors->oom_control_path, + O_RDONLY)) == -1) { + print_error(descriptors->command, "Could not open %s. errno:%d %s\n", + descriptors->oom_control_path, + errno, strerror(errno)); + return EXIT_FAILURE; + } + + if ((descriptors->oom_command_len = (size_t) snprintf( + descriptors->oom_command, + sizeof(descriptors->oom_command), + "%d %d", + descriptors->event_fd, + descriptors->oom_control_fd)) < 0) { + print_error(descriptors->command, "Could print %d %d\n", + descriptors->event_control_fd, + descriptors->oom_control_fd); + return EXIT_FAILURE; + } + + if (write(descriptors->event_control_fd, + descriptors->oom_command, + descriptors->oom_command_len) == -1) { + print_error(descriptors->command, "Could not write to %s errno:%d\n", + descriptors->event_control_path, errno); + return EXIT_FAILURE; + } + + if (close(descriptors->event_control_fd) == -1) { + print_error(descriptors->command, "Could not close %s errno:%d\n", + descriptors->event_control_path, errno); + return EXIT_FAILURE; + } + descriptors->event_control_fd = -1; + + /* + * Listen to events as long as the cgroup exists + * and forward them to the fd in the argument. + */ + for (;;) { + uint64_t u; + ssize_t ret = 0; + struct stat stat_buffer = {0}; + struct pollfd poll_fd = { + .fd = descriptors->event_fd, + .events = POLLIN + }; + + ret = poll(&poll_fd, 1, descriptors->watch_timeout); + if (ret < 0) { + /* Error calling poll */ + print_error(descriptors->command, + "Could not poll eventfd %d errno:%d %s\n", ret, + errno, strerror(errno)); + return EXIT_FAILURE; + } + + if (ret > 0) { + /* Event counter values are always 8 bytes */ + if ((ret = read(descriptors->event_fd, &u, sizeof(u))) != sizeof(u)) { + print_error(descriptors->command, + "Could not read from eventfd %d errno:%d %s\n", ret, + errno, strerror(errno)); + return EXIT_FAILURE; + } + + /* Forward the value to the caller, typically stdout */ + if ((ret = write(fd, &u, sizeof(u))) != sizeof(u)) { + print_error(descriptors->command, + "Could not write to pipe %d errno:%d %s\n", ret, + errno, strerror(errno)); + return EXIT_FAILURE; + } + } else if (ret == 0) { + /* Timeout has elapsed*/ + + /* Quit, if the cgroup is deleted */ + if (stat(cgroup, &stat_buffer) != 0) { + break; + } + } + } + return EXIT_SUCCESS; +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h new file mode 100644 index 0000000..aa77cb6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener.h @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if __linux + +#include <fcntl.h> +#include <errno.h> +#include <string.h> +#include <stdarg.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> + +#include <sys/eventfd.h> +#include <sys/stat.h> + +#include <linux/limits.h> + +/* +This file implements a standard cgroups out of memory listener. +*/ + +typedef struct _oom_listener_descriptors { + /* + * Command line that was called to run this process. + */ + const char *command; + /* + * Event descriptor to watch. + * It is filled in by the function, + * if not specified, yet. + */ + int event_fd; + /* + * cgroup.event_control file handle + */ + int event_control_fd; + /* + * memory.oom_control file handle + */ + int oom_control_fd; + /* + * cgroup.event_control path + */ + char event_control_path[PATH_MAX]; + /* + * memory.oom_control path + */ + char oom_control_path[PATH_MAX]; + /* + * Control command to write to + * cgroup.event_control + * Filled by the function. + */ + char oom_command[25]; + /* + * Length of oom_command filled by the function. + */ + size_t oom_command_len; + /* + * Directory watch timeout + */ + int watch_timeout; +} _oom_listener_descriptors; + +/* + Clean up allocated resources in a descriptor structure +*/ +inline void cleanup(_oom_listener_descriptors *descriptors) { + close(descriptors->event_fd); + descriptors->event_fd = -1; + close(descriptors->event_control_fd); + descriptors->event_control_fd = -1; + close(descriptors->oom_control_fd); + descriptors->oom_control_fd = -1; + descriptors->watch_timeout = 1000; +} + +/* + * Enable an OOM listener on the memory cgroup cgroup + * descriptors: Structure that holds state for testing purposes + * cgroup: cgroup path to watch. It has to be a memory cgroup + * fd: File to forward events to. Normally this is stdout + */ +int oom_listener(_oom_listener_descriptors *descriptors, const char *cgroup, int fd); + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c new file mode 100644 index 0000000..eb7fc3e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/impl/oom_listener_main.c @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if __linux + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/types.h> + +#include "oom_listener.h" + +void print_usage(void) { + fprintf(stderr, "oom-listener"); + fprintf(stderr, "Listen to OOM events in a cgroup"); + fprintf(stderr, "usage to listen: oom-listener <cgroup directory>\n"); + fprintf(stderr, "usage to test: oom-listener oom [<pgid>]\n"); + fprintf(stderr, "example listening: oom-listener /sys/fs/cgroup/memory/hadoop-yarn | xxd -c 8\n"); + fprintf(stderr, "example oom to test: bash -c 'echo $$ >/sys/fs/cgroup/memory/hadoop-yarn/tasks;oom-listener oom'\n"); + fprintf(stderr, "example container overload: sudo -u <user> bash -c 'echo $$ && oom-listener oom 0' >/sys/fs/cgroup/memory/hadoop-yarn/<container>/tasks\n"); + exit(EXIT_FAILURE); +} + +/* + Test an OOM situation adding the pid + to the group pgid and calling malloc in a loop + This can be used to test OOM listener. See examples above. +*/ +void test_oom_infinite(char* pgids) { + if (pgids != NULL) { + int pgid = atoi(pgids); + setpgid(0, pgid); + } + while(1) { + char* p = (char*)malloc(4096); + if (p != NULL) { + p[0] = 0xFF; + } else { + exit(1); + } + } +} + +/* + A command that receives a memory cgroup directory and + listens to the events in the directory. + It will print a new line on every out of memory event + to the standard output. + usage: + oom-listener <cgroup> +*/ +int main(int argc, char *argv[]) { + if (argc >= 2 && + strcmp(argv[1], "oom") == 0) + test_oom_infinite(argc < 3 ? NULL : argv[2]); + + if (argc != 2) + print_usage(); + + _oom_listener_descriptors descriptors = { + .command = argv[0], + .event_fd = -1, + .event_control_fd = -1, + .oom_control_fd = -1, + .event_control_path = {0}, + .oom_control_path = {0}, + .oom_command = {0}, + .oom_command_len = 0, + .watch_timeout = 1000 + }; + + int ret = oom_listener(&descriptors, argv[1], STDOUT_FILENO); + + cleanup(&descriptors); + + return ret; +} + +#else + +/* + This tool uses Linux specific functionality, + so it is not available for other operating systems +*/ +int main() { + return 1; +} + +#endif \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc new file mode 100644 index 0000000..9627632 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/oom-listener/test/oom_listener_test_main.cc @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#if __linux + +extern "C" { +#include "oom_listener.h" +} + +#include <gtest/gtest.h> +#include <fstream> +#include <mutex> + +#define CGROUP_ROOT "/sys/fs/cgroup/memory/" +#define TEST_ROOT "/tmp/test-oom-listener/" +#define CGROUP_TASKS "tasks" +#define CGROUP_OOM_CONTROL "memory.oom_control" +#define CGROUP_LIMIT_PHYSICAL "memory.limit_in_bytes" +#define CGROUP_LIMIT_SWAP "memory.memsw.limit_in_bytes" +#define CGROUP_EVENT_CONTROL "cgroup.event_control" +#define CGROUP_LIMIT (5 * 1024 * 1024) + +// We try multiple cgroup directories +// We try first the official path to test +// in production +// If we are running as a user we fall back +// to mock cgroup +static const char *cgroup_candidates[] = { CGROUP_ROOT, TEST_ROOT }; + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +class OOMListenerTest : public ::testing::Test { +private: + char cgroup[PATH_MAX] = {}; + const char* cgroup_root = nullptr; +public: + OOMListenerTest() = default; + + virtual ~OOMListenerTest() = default; + virtual const char* GetCGroup() { return cgroup; } + virtual void SetUp() { + struct stat cgroup_memory = {}; + for (unsigned int i = 0; i < GTEST_ARRAY_SIZE_(cgroup_candidates); ++i) { + cgroup_root = cgroup_candidates[i]; + + // Try to create the root. + // We might not have permission and + // it may already exist + mkdir(cgroup_root, 0700); + + if (0 != stat(cgroup_root, &cgroup_memory)) { + printf("%s missing. Skipping test\n", cgroup_root); + continue; + } + + timespec timespec1 = {}; + if (0 != clock_gettime(CLOCK_MONOTONIC, ×pec1)) { + ASSERT_TRUE(false) << " clock_gettime failed\n"; + } + + if (snprintf(cgroup, sizeof(cgroup), "%s%lx/", + cgroup_root, timespec1.tv_nsec) <= 0) { + cgroup[0] = '\0'; + printf("%s snprintf failed\n", cgroup_root); + continue; + } + + // Create a cgroup named the current timestamp + // to make it quasi unique + if (0 != mkdir(cgroup, 0700)) { + printf("%s not writable.\n", cgroup); + continue; + } + break; + } + + ASSERT_EQ(0, stat(cgroup, &cgroup_memory)) + << "Cannot use or simulate cgroup " << cgroup; + } + virtual void TearDown() { + if (cgroup[0] != '\0') { + rmdir(cgroup); + } + if (cgroup_root != nullptr && + cgroup_root != cgroup_candidates[0]) { + rmdir(cgroup_root); + } + } +}; + +/* + Unit test for cgroup testing. There are two modes. + If the unit test is run as root and we have cgroups + we try to crate a cgroup and generate an OOM. + If we are not running as root we just sleep instead of + hogging memory and simulate the OOM by sending + an event in a mock event fd mock_oom_event_as_user. +*/ +TEST_F(OOMListenerTest, test_oom) { + // Disable OOM killer + std::ofstream oom_control; + std::string oom_control_file = + std::string(GetCGroup()).append(CGROUP_OOM_CONTROL); + oom_control.open(oom_control_file.c_str(), oom_control.out); + oom_control << 1 << std::endl; + oom_control.close(); + + // Set a low enough limit for physical + std::ofstream limit; + std::string limit_file = + std::string(GetCGroup()).append(CGROUP_LIMIT_PHYSICAL); + limit.open(limit_file.c_str(), limit.out); + limit << CGROUP_LIMIT << std::endl; + limit.close(); + + // Set a low enough limit for physical + swap + std::ofstream limitSwap; + std::string limit_swap_file = + std::string(GetCGroup()).append(CGROUP_LIMIT_SWAP); + limitSwap.open(limit_swap_file.c_str(), limitSwap.out); + limitSwap << CGROUP_LIMIT << std::endl; + limitSwap.close(); + + // Event control file to set + std::string memory_control_file = + std::string(GetCGroup()).append(CGROUP_EVENT_CONTROL); + + // Tasks file to check + std::string tasks_file = + std::string(GetCGroup()).append(CGROUP_TASKS); + + int mock_oom_event_as_user = -1; + struct stat stat1 = {}; + if (0 != stat(memory_control_file.c_str(), &stat1)) { + // We cannot tamper with cgroups + // running as a user, so simulate an + // oom event + mock_oom_event_as_user = eventfd(0, 0); + } + const int simulate_cgroups = + mock_oom_event_as_user != -1; + + __pid_t mem_hog_pid = fork(); + if (!mem_hog_pid) { + // Child process to consume too much memory + if (simulate_cgroups) { + std::cout << "Simulating cgroups OOM" << std::endl; + for (;;) { + sleep(1); + } + } else { + // Wait until we are added to the cgroup + // so that it is accounted for our mem + // usage + __pid_t cgroupPid; + do { + std::ifstream tasks; + tasks.open(tasks_file.c_str(), tasks.in); + tasks >> cgroupPid; + tasks.close(); + } while (cgroupPid != getpid()); + + // Start consuming as much memory as we can. + // cgroup will stop us at CGROUP_LIMIT + const int bufferSize = 1024 * 1024; + std::cout << "Consuming too much memory" << std::endl; + for (;;) { + auto buffer = (char *) malloc(bufferSize); + if (buffer != nullptr) { + for (int i = 0; i < bufferSize; ++i) { + buffer[i] = (char) std::rand(); + } + } + } + } + } else { + // Parent test + ASSERT_GE(mem_hog_pid, 1) << "Fork failed " << errno; + + // Put child into cgroup + std::ofstream tasks; + tasks.open(tasks_file.c_str(), tasks.out); + tasks << mem_hog_pid << std::endl; + tasks.close(); + + // Create pipe to get forwarded eventfd + int test_pipe[2]; + ASSERT_EQ(0, pipe(test_pipe)); + + // Launch OOM listener + // There is no race condition with the process + // running out of memory. If oom is 1 at startup + // oom_listener will send an initial notification + __pid_t listener = fork(); + if (listener == 0) { + // child listener forwarding cgroup events + _oom_listener_descriptors descriptors = { + .command = "test", + .event_fd = mock_oom_event_as_user, + .event_control_fd = -1, + .oom_control_fd = -1, + .event_control_path = {0}, + .oom_control_path = {0}, + .oom_command = {0}, + .oom_command_len = 0, + .watch_timeout = 100 + }; + int ret = oom_listener(&descriptors, GetCGroup(), test_pipe[1]); + cleanup(&descriptors); + close(test_pipe[0]); + close(test_pipe[1]); + exit(ret); + } else { + // Parent test + uint64_t event_id = 1; + if (simulate_cgroups) { + // We cannot tamper with cgroups + // running as a user, so simulate an + // oom event + ASSERT_EQ(sizeof(event_id), + write(mock_oom_event_as_user, + &event_id, + sizeof(event_id))); + } + ASSERT_EQ(sizeof(event_id), + read(test_pipe[0], + &event_id, + sizeof(event_id))) + << "The event has not arrived"; + close(test_pipe[0]); + close(test_pipe[1]); + + // Simulate OOM killer + ASSERT_EQ(0, kill(mem_hog_pid, SIGKILL)); + + // Verify that process was killed + __WAIT_STATUS mem_hog_status = {}; + __pid_t exited0 = wait(mem_hog_status); + ASSERT_EQ(mem_hog_pid, exited0) + << "Wrong process exited"; + ASSERT_EQ(nullptr, mem_hog_status) + << "Test process killed with invalid status"; + + if (mock_oom_event_as_user != -1) { + ASSERT_EQ(0, unlink(oom_control_file.c_str())); + ASSERT_EQ(0, unlink(limit_file.c_str())); + ASSERT_EQ(0, unlink(limit_swap_file.c_str())); + ASSERT_EQ(0, unlink(tasks_file.c_str())); + ASSERT_EQ(0, unlink(memory_control_file.c_str())); + } + // Once the cgroup is empty delete it + ASSERT_EQ(0, rmdir(GetCGroup())) + << "Could not delete cgroup " << GetCGroup(); + + // Check that oom_listener exited on the deletion of the cgroup + __WAIT_STATUS oom_listener_status = {}; + __pid_t exited1 = wait(oom_listener_status); + ASSERT_EQ(listener, exited1) + << "Wrong process exited"; + ASSERT_EQ(nullptr, oom_listener_status) + << "Listener process exited with invalid status"; + } + } +} + +#else +/* +This tool covers Linux specific functionality, +so it is not available for other operating systems +*/ +int main() { + return 1; +} +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9964799/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java new file mode 100644 index 0000000..54bcb13 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/linux/resources/DummyRunnableWithContext.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources; + +import org.apache.hadoop.yarn.server.nodemanager.Context; + +/** + * Runnable that does not do anything. + */ +public class DummyRunnableWithContext implements Runnable { + public DummyRunnableWithContext(Context context, boolean virtual) { + } + @Override + public void run() { + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org