Repository: hadoop
Updated Branches:
  refs/heads/trunk 56b1ff80d -> 40b0045eb


YARN-7610. Extend Distributed Shell to support launching job with opportunistic 
containers. Contributed by Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/40b0045e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/40b0045e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/40b0045e

Branch: refs/heads/trunk
Commit: 40b0045ebe0752cd3d1d09be00acbabdea983799
Parents: 56b1ff8
Author: Weiwei Yang <[email protected]>
Authored: Wed Dec 6 17:52:41 2017 +0800
Committer: Weiwei Yang <[email protected]>
Committed: Wed Dec 6 17:52:41 2017 +0800

----------------------------------------------------------------------
 .../distributedshell/ApplicationMaster.java     |  22 +-
 .../applications/distributedshell/Client.java   |  19 ++
 .../distributedshell/TestDistributedShell.java  |  56 +++++
 .../site/markdown/OpportunisticContainers.md    | 212 -----------------
 .../site/markdown/OpportunisticContainers.md.vm | 233 +++++++++++++++++++
 5 files changed, 329 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b0045e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index 91dbc00..926de50 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -41,6 +41,7 @@ import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Arrays;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
@@ -90,6 +91,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
@@ -241,6 +244,9 @@ public class ApplicationMaster {
   private int containerVirtualCores = DEFAULT_CONTAINER_VCORES;
   // Priority of the request
   private int requestPriority;
+  // Execution type of the containers.
+  // Default GUARANTEED.
+  private ExecutionType containerType = ExecutionType.GUARANTEED;
 
   // Resource profile for the container
   private String containerResourceProfile = "";
@@ -412,6 +418,8 @@ public class ApplicationMaster {
         "App Attempt ID. Not to be used unless for testing purposes");
     opts.addOption("shell_env", true,
         "Environment for shell script. Specified as env_key=env_val pairs");
+    opts.addOption("container_type", true,
+        "Container execution type, GUARANTEED or OPPORTUNISTIC");
     opts.addOption("container_memory", true,
         "Amount of memory in MB to be requested to run the shell command");
     opts.addOption("container_vcores", true,
@@ -558,6 +566,16 @@ public class ApplicationMaster {
       domainId = envs.get(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN);
     }
 
+    if (cliParser.hasOption("container_type")) {
+      String containerTypeStr = cliParser.getOptionValue("container_type");
+      if (Arrays.stream(ExecutionType.values()).noneMatch(
+          executionType -> executionType.toString()
+              .equals(containerTypeStr))) {
+        throw new IllegalArgumentException("Invalid container_type: "
+            + containerTypeStr);
+      }
+      containerType = ExecutionType.valueOf(containerTypeStr);
+    }
     containerMemory = Integer.parseInt(cliParser.getOptionValue(
         "container_memory", "-1"));
     containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(
@@ -1242,7 +1260,9 @@ public class ApplicationMaster {
 
     // Set up resource type requirements
     ContainerRequest request =
-        new ContainerRequest(createProfileCapability(), null, null, pri);
+        new ContainerRequest(createProfileCapability(), null, null,
+            pri, 0, true, null,
+            ExecutionTypeRequest.newInstance(containerType));
     LOG.info("Requested container ask: " + request.toString());
     return request;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b0045e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 0582afe..16bf0fd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Vector;
+import java.util.Arrays;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
@@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@@ -171,6 +173,8 @@ public class Client {
   // No. of containers in which the shell script needs to be executed
   private int numContainers = 1;
   private String nodeLabelExpression = null;
+  // Container type, default GUARANTEED.
+  private ExecutionType containerType = ExecutionType.GUARANTEED;
 
   // log4j.properties file 
   // if available, add to local resources and set into classpath 
@@ -282,6 +286,8 @@ public class Client {
     opts.addOption("shell_env", true,
         "Environment for shell script. Specified as env_key=env_val pairs");
     opts.addOption("shell_cmd_priority", true, "Priority for the shell command 
containers");
+    opts.addOption("container_type", true,
+        "Container execution type, GUARANTEED or OPPORTUNISTIC");
     opts.addOption("container_memory", true, "Amount of memory in MB to be 
requested to run the shell command");
     opts.addOption("container_vcores", true, "Amount of virtual cores to be 
requested to run the shell command");
     opts.addOption("container_resource_profile", true, "Resource profile for 
the shell command");
@@ -433,6 +439,16 @@ public class Client {
     }
     shellCmdPriority = 
Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
 
+    if (cliParser.hasOption("container_type")) {
+      String containerTypeStr = cliParser.getOptionValue("container_type");
+      if (Arrays.stream(ExecutionType.values()).noneMatch(
+          executionType -> executionType.toString()
+          .equals(containerTypeStr))) {
+        throw new IllegalArgumentException("Invalid container_type: "
+            + containerTypeStr);
+      }
+      containerType = ExecutionType.valueOf(containerTypeStr);
+    }
     containerMemory =
         Integer.parseInt(cliParser.getOptionValue("container_memory", "-1"));
     containerVirtualCores =
@@ -740,6 +756,9 @@ public class Client {
     // Set class name 
     vargs.add(appMasterMainClass);
     // Set params for Application Master
+    if (containerType != null) {
+      vargs.add("--container_type " + String.valueOf(containerType));
+    }
     if (containerMemory > 0) {
       vargs.add("--container_memory " + String.valueOf(containerMemory));
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b0045e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 5cf884b..d6bb8d6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -1179,6 +1179,33 @@ public class TestDistributedShell {
           e.getMessage().contains("No shell command or shell script specified 
" +
           "to be executed by application master"));
     }
+
+    LOG.info("Initializing DS Client with invalid container_type argument");
+    try {
+      String[] args = {
+          "--jar",
+          APPMASTER_JAR,
+          "--num_containers",
+          "2",
+          "--master_memory",
+          "512",
+          "--master_vcores",
+          "2",
+          "--container_memory",
+          "128",
+          "--container_vcores",
+          "1",
+          "--shell_command",
+          "date",
+          "--container_type",
+          "UNSUPPORTED_TYPE"
+      };
+      client.init(args);
+      Assert.fail("Exception is expected");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue("The throw exception is not expected",
+          e.getMessage().contains("Invalid container_type: UNSUPPORTED_TYPE"));
+    }
   }
 
   @Test
@@ -1377,4 +1404,33 @@ public class TestDistributedShell {
       }
     }
   }
+
+  @Test
+  public void testDSShellWithOpportunisticContainers() throws Exception {
+    Client client = new Client(new Configuration(yarnCluster.getConfig()));
+    try {
+      String[] args = {
+          "--jar",
+          APPMASTER_JAR,
+          "--num_containers",
+          "2",
+          "--master_memory",
+          "512",
+          "--master_vcores",
+          "2",
+          "--container_memory",
+          "128",
+          "--container_vcores",
+          "1",
+          "--shell_command",
+          "date",
+          "--container_type",
+          "OPPORTUNISTIC"
+      };
+      client.init(args);
+      client.run();
+    } catch (Exception e) {
+      Assert.fail("Job execution with opportunistic containers failed.");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b0045e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md
deleted file mode 100644
index 83beb07..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md
+++ /dev/null
@@ -1,212 +0,0 @@
-<!---
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-   http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-
-Opportunistic Containers
-========================
-
-<!-- MACRO{toc|fromDepth=0|toDepth=3} -->
-
-
-<a name="Purpose"></a>Purpose
------------------------------
-
-This document introduces the notion of **opportunistic** container execution, 
and discusses how opportunistic containers are allocated and executed.
-
-
-<a name="Quick_Guide"></a>Quick Guide
---------------------------------------------------------------------
-
-We start by providing a brief overview of opportunistic containers, including 
how a user can enable this feature and run a sample job using such containers.
-
-###<a name="Main_Goal"></a>Main Goal
-
-Unlike existing YARN containers that are scheduled in a node only if there are 
unallocated resources, opportunistic containers can be dispatched to an NM, 
even if their execution at that node cannot start immediately. In such a case, 
opportunistic containers will be queued at that NM until resources become 
available. 
-The main goal of opportunistic container execution is to improve cluster 
resource utilization, and therefore increase task throughput. Resource 
utilization and task throughput improvements are more pronounced for workloads 
that include relatively short tasks (in the order of seconds).
-
-
-###<a name="Enabling_Opportunistic_Containers"></a>Enabling Opportunistic 
Containers
-
-To enable opportunistic container allocation, the following two properties 
have to be present in **conf/yarn-site.xml**:
-
-| Property | Description | Default value |
-|:-------- |:----- |:----- |
-| `yarn.resourcemanager.opportunistic-container-allocation.enabled` | Enables 
opportunistic container allocation. | `false` |
-| `yarn.nodemanager.opportunistic-containers-max-queue-length` | Determines 
the max number of opportunistic containers that can be queued at an NM. | `0` |
-
-The first parameter above has to be set to `true`. The second one has to be 
set to a positive value to allow queuing of opportunistic containers at the NM. 
A value of `10` can be used to start experimenting with opportunistic 
containers. The optimal value depends on the jobs characteristics, the cluster 
configuration and the target utilization.
-
-By default, allocation of opportunistic containers is performed centrally 
through the RM. However, a user can choose to enable distributed allocation of 
opportunistic containers, which can further improve allocation latency for 
short tasks. Distributed scheduling can be enabling by setting to `true` the 
following parameter (note that non-opportunistic containers will continue being 
scheduled through the RM):
-
-| Property | Description | Default value |
-|:-------- |:----- |:----- |
-| `yarn.nodemanager.distributed-scheduling.enabled` | Enables distributed 
scheduling. | `false` |
-
-
-###<a name="Running_a_Sample_Job"></a>Running a Sample Job
-
-The following command can be used to run a sample pi map-reduce job, executing 
40% of mappers using opportunistic containers (substitute 
`3.0.0-alpha2-SNAPSHOT` below with the version of Hadoop you are using):
-
-```
-$ hadoop jar 
hadoop-3.0.0-alpha2-SNAPSHOT/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0-alpha2-SNAPSHOT.jar
 pi -Dmapreduce.job.num-opportunistic-maps-percent="40" 50 100
-```
-
-By changing the value of `mapreduce.job.num-opportunistic-maps-percent` in the 
above command, we can specify the percentage of mappers that can be executed 
through opportunistic containers.
-
-
-###<a name="Opportunistic_Containers_in_Web_UI"></a>Opportunistic Containers 
in Web UI
-
-When opportunistic container allocation is enabled, the following new columns 
can be observed in the Nodes page of the Web UI 
(`rm-address:8088/cluster/nodes`):
-
-* Running Containers (O): number of running opportunistic containers on each 
node;
-* Mem Used (O): Total memory used by opportunistic containers on each node;
-* VCores Used (O): Total CPU virtual cores used by opportunistic containers on 
each node;
-* Queued Containers: Number of containers queued at each node.
-
-When clicking on a specific container running on a node, the execution type of 
the container is also shown.
-
-In the rest of the document, we provide an in-depth description of 
opportunistic containers, including details about their allocation and 
execution.
-
-
-Overview <a name="Overview"></a>
---------------------------------
-
-The existing schedulers in YARN (Fair and Capacity Scheduler) allocate 
containers to a node only if there are unallocated resources at that node at 
the moment of scheduling the containers. This **guaranteed** type of execution 
has the advantage that once the AM dispatches a container to a node, the 
container execution will start immediately, since it is guaranteed that there 
will be available resources. Moreover, unless fairness or capacity constraints 
are violated, containers are guaranteed to run to completion without being 
preempted. 
-
-Although this design offers a more predictable task execution, it has two main 
drawbacks that can lead to suboptimal cluster resource utilization:
-
-* **Feedback delays.** When a container finishes its execution at a node, the 
RM gets notified that there are available resources through the next NM-RM 
heartbeat, then the RM schedules a new container at that node, the AM gets 
notified through the next AM-RM heartbeat, and finally the AM launches the new 
container at the node. These delays result in idle node resources, which in 
turn lead to lower resource utilization, especially when workloads involve 
tasks whose duration is relatively short.
-* **Allocated vs. utilized resources.** The RM allocates containers based on 
the *allocated* resources at each node, which might be significantly higher 
than the actually *utilized* resources (e.g., think of a container for which 
4GB memory have been allocated, but only 2GB are being utilized). This lowers 
effective resource utilization, and can be avoided if the RM takes into account 
the utilized resources during scheduling. However, this has to be done in a way 
that allows resources to be reclaimed in case the utilized resources of a 
running container increase.
-
-To mitigate the above problems, in addition to the existing containers (which 
we term **guaranteed** containers hereafter), we introduce the notion of 
**opportunistic** containers. An opportunistic container can be dispatched to 
an NM, even if there are no available (unallocated) resources for it at the 
moment of scheduling. In such a case, the opportunistic container will be 
queued at the NM, waiting for resources to become available for its execution 
to start. The opportunistic containers are of lower priority than the 
guaranteed ones, which means that they can be preempted for guaranteed 
containers to start their execution. Therefore, they can be used to improve 
cluster resource utilization without impacting the execution of existing 
guaranteed containers.
-
-An additional advantage of opportunistic containers is that they introduce a 
notion of **execution priority at the NMs**. For instance, a lower priority job 
that does not require strict execution guarantees can use opportunistic 
containers or a mix of container execution types for its tasks.
-
-We have introduced two ways of allocating opportunistic containers: a 
**centralized** and a **distributed** one. In the centralized scheduling, 
opportunistic containers are allocated through the YARN RM, whereas in the 
distributed one, through local schedulers that reside at each NM. Centralized 
allocation allows for higher quality placement decisions and for implementing 
more involved sharing policies across applications (e.g., fairness). On the 
other hand, distributed scheduling can offer faster container allocation, which 
is useful for short tasks, as it avoids the round-trip to the RM. In both 
cases, the scheduling of guaranteed containers remains intact and happens 
through the YARN RM (using the existing Fair or Capacity Scheduler).
-
-Note that in the current implementation, we are allocating containers based on 
allocated (and not utilized) resources. Therefore, we tackle the "feedback 
delays" problem mentioned above, but not the "allocated vs. utilized resources" 
one. There is ongoing work (`YARN-1011`) that employs opportunistic containers 
to address the latter problem too.
-
-Below, we describe in more detail the [container execution 
types](#Container_Execution_Types), as well as the 
[execution](#Execution_of_Opportunistic_Containers) (including the container 
queuing at the NMs) and [allocation](#Allocation_of_Opportunistic_Containers) 
of opportunistic containers. Then we discuss how to fine-tune opportunistic 
containers through some [advanced configuration 
parameters](#Advanced_Configuration). Finally, we discuss open items for 
[future work](#Items_for_Future_Work).
-
-
-<a name="Container_Execution_Types"></a>Container Execution Types
------------------------------------------------------------------
-
-We introduce the following two types of containers:
-
-* **Guaranteed containers** correspond to the existing YARN containers. They 
are allocated by the Fair or Capacity Scheduler, and once dispatched to a node, 
it is guaranteed that there are available resources for their execution to 
start immediately. Moreover, these containers run to completion (as long as 
there are no failures). They can be preempted only in case the scheduler's 
queue to which they belong, violates fairness or capacity constraints.
-* **Opportunistic containers** are not guaranteed to have resources for their 
execution to start when they get dispatched to a node. Instead, they might be 
queued at the NM until resources become available. In case a guaranteed 
container arrives at a node and there are no resources available for it, one or 
more opportunistic containers will be preempted to execute the guaranteed one.
-
-When an AM submits its resource requests to the RM, it specifies the type for 
each container (default is guaranteed), determining the way the container will 
be [allocated](#Allocation_of_Opportunistic_Containers). Subsequently, when the 
container is launched by the AM at an NM, its type determines how it will be 
[executed](#Execution_of_Opportunistic_Containers) by the NM.
-
-
-<a name="Execution_of_Opportunistic_Containers"></a>Execution of Opportunistic 
Containers
----------------------------------------------------------------------------
-
-When a container arrives at an NM, its execution is determined by the 
available resources at the NM and the container type. Guaranteed containers 
start their execution immediately, and if needed, the NM will kill running 
opportunistic containers to ensure there are sufficient resources for the 
guaranteed ones to start. On the other hand, opportunistic containers can be 
queued at the NM, if there are no resources available to start their execution 
when they arrive at the NM. To enable this, we extended the NM by allowing 
queuing of containers at each node. The NM monitors the local resources, and 
when there are sufficient resources available, it starts the execution of the 
opportunistic container that is at the head of the queue.
-
-In particular, when a container arrives at an NM, localization is performed 
(i.e., all required resources are downloaded), and then the container moves to 
a `SCHEDULED` state, in which the container is queued, waiting for its 
execution to begin:
-
-* If there are available resources, the execution of the container starts 
immediately, irrespective of its execution type.
-* If there are no available resources:
-    * If the container is guaranteed, we kill as many running opportunistic 
containers as required for the guaranteed container to be executed, and then 
start its execution.
-    * If the container is opportunistic, it remains at the queue until 
resources become available.
-* When a container (guaranteed or opportunistic) finishes its execution and 
resources get freed up, we examine the queued containers and if there are 
available resources we start their execution. We pick containers from the queue 
in a FIFO order.
-
-In the [future work items](#Items_for_Future_Work) below, we discuss different 
ways of prioritizing task execution (queue reordering) and of killing 
opportunistic containers to make space for guaranteed ones.
-
-
-<a name="Allocation_of_Opportunistic_Containers"></a>Allocation of 
Opportunistic Containers
------------------------------------------------------------------------------
-
-As mentioned above, we provide both a centralized and a distributed way of 
allocating opportunistic containers, which we describe below.
-
-###<a name="Centralized_Allocation"></a>Centralized Allocation
-
-We have introduced a new service at the RM, namely the 
`OpportunisticContainerAllocatorAMService`, which extends the 
`ApplicationMasterService`. When the centralized opportunistic allocation is 
enabled, the resource requests from the AMs are served at the RM side by the 
`OpportunisticContainerAllocatorAMService`, which splits them into two sets of 
resource requests: 
-
-* The guaranteed set is forwarded to the existing `ApplicationMasterService` 
and is subsequently handled by the Fair or Capacity Scheduler.
-* The opportunistic set is handled by the new 
`OpportunisticContainerAllocator`, which performs the scheduling of 
opportunistic containers to nodes.
-
-The `OpportunisticContainerAllocator` maintains a list with the [least loaded 
nodes](#Determining_Nodes_for_Allocation) of the cluster at each moment, and 
assigns containers to them in a round-robin fashion. Note that in the current 
implementation, we purposely do not take into account node locality 
constraints. Since an opportunistic container (unlike the guaranteed ones) 
might wait at the queue of an NM before its execution starts, it is more 
important to allocate it at a node that is less loaded (i.e., where queuing 
delay will be smaller) rather than respect its locality constraints. Moreover, 
we do not take into account sharing (fairness/capacity) constraints for 
opportunistic containers at the moment. Support for both locality and sharing 
constraints can be added in the future if required.
-
-
-###<a name="Distributed_Allocation"></a>Distributed Allocation
-
-In order to enable distributed scheduling of opportunistic containers, we have 
introduced a new service at each NM, called `AMRMProxyService`. The 
`AMRMProxyService` implements the `ApplicationMasterService` protocol, and acts 
as a proxy between the AMs running at that node and the RM. When the 
`AMRMProxyService` is enabled (through a parameter), we force all AMs running 
at a particular node to communicate with the `AMRMProxyService` of the same 
node, instead of going directly to the RM. Moreover, to ensure that the AMs 
will not talk directly with the RM, when a new AM gets initialized, we replace 
its `AMRMToken` with a token signed by the `AMRMProxyService`.
-
-A chain of interceptors can be registered with the `AMRMProxyService`. One of 
these interceptors is the `DistributedScheduler` that is responsible for 
allocating opportunistic containers in a distributed way, without needing to 
contact the RM. This modular design makes the `AMRMProxyService` instrumental 
in other scenarios too, such as YARN federation (`YARN-2915`) or throttling 
down misbehaving AMs, which can be enabled simply by adding additional 
interceptors at the interceptor chain.
-
-When distributed opportunistic scheduling is enabled, each AM sends its 
resource requests to the `AMRMProxyService` running at the same node. The 
`AMRMProxyService` splits the resource requests into two sets:
-
-* The guaranteed set is forwarded to the RM. In this case the 
`AMRMProxyService` simply acts as a proxy between the AM and the RM, and the 
container allocation remains intact (using the Fair or Capacity Scheduler).
-* The opportunistic set is not forwarded to the RM. Instead, it is handled by 
the `DistributedScheduler` that is running locally at the node. In particular, 
the `DistributedScheduler` maintains a list with the least loaded nodes in the 
cluster, and allocates containers to them in a round-robin fashion. The RM 
informs the `DistributedScheduler` about the least loaded nodes at regular 
intervals through the NM-RM heartbeats.
-
-The above procedure is similar to the one performed by the 
`OpportunisticContainerAllocatorAMService` in the case of centralized 
opportunistic scheduling described above. The main difference is that in the 
distributed case, the splitting of requests into guaranteed and opportunistic 
happens locally at the node, and only the guaranteed requests are forwarded to 
the RM, while the opportunistic ones are handled without contacting the RM.
-
-
-###<a name="Determining_Nodes_for_Allocation"></a>Determining Nodes for 
Allocation
-
-Each NM informs the RM periodically through the NM-RM heartbeats about the 
number of running guaranteed and opportunistic containers, as well as the 
number of queued opportunistic containers. The RM gathers this information from 
all nodes and determines the least loaded ones.
-
-In the case of centralized allocation of opportunistic containers, this 
information is immediately available, since the allocation happens centrally. 
In the case of distributed scheduling, the list with the least loaded nodes is 
propagated to all NMs (and thus becomes available to the 
`DistributedSchedulers`) through the heartbeat responses from the RM to the 
NMs. The number of least loaded nodes sent to the NMs is configurable.
-
-At the moment, we take into account only the number of queued opportunistic 
containers at each node in order to estimate the time an opportunistic 
container would have to wait if sent to that node and, thus, determine the 
least loaded nodes. If the AM provided us with information about the estimated 
task durations, we could take them into account in order to have better 
estimates of the queue waiting times.
-
-
-###<a name="Rebalancing_Node_Load"></a>Rebalancing Node Load
-
-Occasionally poor placement choices for opportunistic containers may be made 
(due to stale queue length estimates), which can lead to load imbalance between 
nodes. The problem is more pronounced under high cluster load, and also in the 
case of distributed scheduling (multiple `DistributedSchedulers` may place 
containers at the same NM, since they do not coordinate with each other). To 
deal with this load imbalance between the NM queues, we perform load shedding 
to dynamically re-balance the load between NMs. In particular, while 
aggregating at the RM the queue time estimates published by each NM, we 
construct a distribution and find a targeted maximal value for the length of 
the NM queues (based on the mean and standard deviation of the distribution). 
Then the RM disseminates this value to the various NMs through the heartbeat 
responses. Subsequently, using this information, an NM on a node whose queue 
length is above the threshold discards opportunistic containers to meet this 
maxi
 mal value. This forces the associated individual AMs to reschedule those 
containers elsewhere.
-
-
-<a name="Advanced_Configuration"></a>Advanced Configuration
---------------------------------------------------
-
-The main properties for enabling opportunistic container allocation and 
choosing between centralized and distributed allocation were described in the 
[quick guide](#Quick_Guide) in the beginning of this document. Here we present 
more advanced configuration. Note that using default values for those 
parameters should be sufficient in most cases. All parameters below have to be 
defined in the **conf/yarn-site.xml** file.
-
-To determine the number of [least loaded 
nodes](#Determining_Nodes_for_Allocation) that will be used when scheduling 
opportunistic containers and how often this list will be refreshed, we use the 
following parameters:
-
-| Property | Description | Default value |
-|:-------- |:----- |:----- |
-| `yarn.resourcemanager.opportunistic-container-allocation.nodes-used` | 
Number of least loaded nodes to be used by the Opportunistic Container 
allocator for dispatching containers during container allocation. A higher 
value can improve load balance in large clusters. | `10` |
-| `yarn.resourcemanager.nm-container-queuing.sorting-nodes-interval-ms` | 
Frequency for computing least loaded nodes. | `1000` |
-
-
-As discussed in the [node load rebalancing](#Rebalancing_Node_Load) section 
above, at regular intervals, the RM gathers all NM queue lengths and computes 
their mean value (`avg`) and standard deviation (`stdev`), as well as the value 
`avg + k*stdev` (where `k` a float). This value gets propagated through the 
NM-RM heartbeats to all NMs, who should respect that value by dequeuing 
containers (if required), as long as their current queue length is between a 
`queue_min_length` and a `queue_max_length` value (these values are used to 
avoid dequeuing tasks from very short queues and to aggressively dequeue tasks 
from long queues, respectively). 
-The parameters `k`, `queue_min_length` and `queue_max_length` can be specified 
as follows:
-
-| Property | Description | Default value |
-|:-------- |:----- |:----- |
-| `yarn.resourcemanager.nm-container-queuing.queue-limit-stdev` | The `k` 
parameter. | `1.0f` |
-| `yarn.resourcemanager.nm-container-queuing.min-queue-length` | The 
`queue_min_length` parameter. | `5` |
-| `yarn.resourcemanager.nm-container-queuing.max-queue-length` | The 
`queue_max_length` parameter. | `15` |
-
-
-Finally, two more properties can further tune the `AMRMProxyService` in case 
distributed scheduling is used:
-
-| Property | Description | Default value |
-|:-------- |:----- |:----- |
-| `yarn.nodemanager.amrmproxy.address` | The address/port to which the 
`AMRMProxyService` is bound to. | `0.0.0.0:8049` |
-| `yarn.nodemanager.amrmproxy.client.thread-count` | The number of threads 
that are used at each NM for serving the interceptors register to the 
`AMRMProxyService` by different jobs. | `3` |
-
-
-<a name="Items_for_Future_Work"></a>Items for Future Work
------------------------------------------------
-
-Here we describe multiple ways in which we can extend/enhance the allocation 
and execution of opportunistic containers. We also provide the JIRAs that track 
each item.
-
-* **Resource overcommitment** (`YARN-1011`). As already discussed, in order to 
further improve the cluster resource utilization, we can schedule containers 
not based on the allocated resources but on the actually utilized ones. When 
over-committing resources, there is the risk of running out of resources in 
case we have an increase in the utilized resources of the already running 
containers. Therefore, opportunistic execution should be used for containers 
whose allocation goes beyond the capacity of a node. This way, we can choose 
opportunistic containers to kill for reclaiming resources.
-* **NM Queue reordering** (`YARN-5886`). Instead of executing queued 
containers in a FIFO order, we can employ reordering strategies that 
dynamically determine which opportunistic container will be executed next. For 
example, we can prioritize containers that are expected to be short-running or 
which belong to applications that are close to completion.
-* **Out of order killing at NMs** (`YARN-5887`). As described above, when we 
need to free up resources for a guaranteed container to start its execution, we 
kill opportunistic containers in reverse order of arrival (first the most 
recently started ones). This might not always be the right decision. For 
example, we might want to minimize the number of containers killed or to 
refrain from killing containers of jobs that are very close to completion.
-* **Container pausing** (`YARN-5292`): At the moment we kill opportunistic 
containers to make room for guaranteed in case of resource contention. In busy 
clusters this can lower the effective cluster utilization: whenever we kill a 
running opportunistic container, it has to be restarted, and thus we lose work. 
To this end, we can instead pause running opportunistic containers. Note that 
this will require support from the container executor (e.g., the container 
technology used) and from the application.
-* **Container promotion** (`YARN-5085`). There are cases where changing the 
execution type of a container during its execution can be beneficial. For 
instance, an application might submit a container as opportunistic, and when 
its execution starts, it can request its promotion to a guaranteed container to 
avoid it getting killed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/40b0045e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm
new file mode 100644
index 0000000..7882b87
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/OpportunisticContainers.md.vm
@@ -0,0 +1,233 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+#set ( $H3 = '###' )
+#set ( $H4 = '####' )
+#set ( $H5 = '#####' )
+
+Opportunistic Containers
+========================
+
+<!-- MACRO{toc|fromDepth=0|toDepth=3} -->
+
+
+Purpose
+-------
+
+This document introduces the notion of **opportunistic** container execution, 
and discusses how opportunistic containers are allocated and executed.
+
+
+Quick Guide
+-----------
+
+We start by providing a brief overview of opportunistic containers, including 
how a user can enable this feature and run a sample job using such containers.
+
+$H3 Main Goal
+
+Unlike existing YARN containers that are scheduled in a node only if there are 
unallocated resources, opportunistic containers can be dispatched to an NM, 
even if their execution at that node cannot start immediately. In such a case, 
opportunistic containers will be queued at that NM until resources become 
available. 
+The main goal of opportunistic container execution is to improve cluster 
resource utilization, and therefore increase task throughput. Resource 
utilization and task throughput improvements are more pronounced for workloads 
that include relatively short tasks (in the order of seconds).
+
+
+$H3 Enabling Opportunistic Containers
+
+To enable opportunistic container allocation, the following two properties 
have to be present in **conf/yarn-site.xml**:
+
+| Property | Description | Default value |
+|:-------- |:----- |:----- |
+| `yarn.resourcemanager.opportunistic-container-allocation.enabled` | Enables 
opportunistic container allocation. | `false` |
+| `yarn.nodemanager.opportunistic-containers-max-queue-length` | Determines 
the max number of opportunistic containers that can be queued at an NM. | `0` |
+
+The first parameter above has to be set to `true`. The second one has to be 
set to a positive value to allow queuing of opportunistic containers at the NM. 
A value of `10` can be used to start experimenting with opportunistic 
containers. The optimal value depends on the jobs characteristics, the cluster 
configuration and the target utilization.
+
+By default, allocation of opportunistic containers is performed centrally 
through the RM. However, a user can choose to enable distributed allocation of 
opportunistic containers, which can further improve allocation latency for 
short tasks. Distributed scheduling can be enabling by setting to `true` the 
following parameter (note that non-opportunistic containers will continue being 
scheduled through the RM):
+
+| Property | Description | Default value |
+|:-------- |:----- |:----- |
+| `yarn.nodemanager.distributed-scheduling.enabled` | Enables distributed 
scheduling. | `false` |
+
+In order to submit jobs to a cluster that has AMRMProxy turned on, one must 
create a separate set of configs for the client from which jobs will be 
submitted. In these, the **conf/yarn-site.xml** should have the following 
additional configurations:
+
+| Property | Value | Description |
+|:-------- |:----- |:----- |
+| `yarn.resourcemanger.scheduler.address` | `localhost:8049` | Redirects jobs 
to the Node Manager's AMRMProxy port.|
+
+
+$H3 Running a Sample Job
+
+$H4 MapReduce PI
+
+The following command can be used to run a sample pi map-reduce job, executing 
40% of mappers using opportunistic containers:
+
+```
+$ hadoop jar 
share/hadoop/mapreduce/hadoop-mapreduce-examples-${project.version}.jar pi 
-Dmapreduce.job.num-opportunistic-maps-percent="40" 50 100
+```
+
+By changing the value of `mapreduce.job.num-opportunistic-maps-percent` in the 
above command, we can specify the percentage of mappers that can be executed 
through opportunistic containers.
+
+$H4 Distributed Shell
+
+Another sample job is the distributed shell, it allows us to run a given shell 
command on a set of containers. The following command can be used to run `sleep 
10` command in 10 opportunistic containers:
+
+```
+$ yarn org.apache.hadoop.yarn.applications.distributedshell.Client -jar 
share/hadoop/yarn/hadoop-yarn-applications-distributedshell-${project.version}.jar.jar
 -shell_command sleep -shell_args 10 -num_containers 10 -container_type 
OPPORTUNISTIC
+```
+
+By change the value of `container_type` to `OPPORTUNISTIC` or `GUARANTEED` in 
the above command, we can specify the tasks to be running in opportunistic or 
guaranteed containers. The default type is `GUARANTEED`.
+
+$H3 Opportunistic Containers in Web UI
+
+When opportunistic container allocation is enabled, the following new columns 
can be observed in the Nodes page of the Web UI 
(`rm-address:8088/cluster/nodes`):
+
+* Running Containers (O): number of running opportunistic containers on each 
node;
+* Mem Used (O): Total memory used by opportunistic containers on each node;
+* VCores Used (O): Total CPU virtual cores used by opportunistic containers on 
each node;
+* Queued Containers: Number of containers queued at each node.
+
+When clicking on a specific container running on a node, the execution type of 
the container is also shown.
+
+In the rest of the document, we provide an in-depth description of 
opportunistic containers, including details about their allocation and 
execution.
+
+
+Overview
+--------
+
+The existing schedulers in YARN (Fair and Capacity Scheduler) allocate 
containers to a node only if there are unallocated resources at that node at 
the moment of scheduling the containers. This **guaranteed** type of execution 
has the advantage that once the AM dispatches a container to a node, the 
container execution will start immediately, since it is guaranteed that there 
will be available resources. Moreover, unless fairness or capacity constraints 
are violated, containers are guaranteed to run to completion without being 
preempted. 
+
+Although this design offers a more predictable task execution, it has two main 
drawbacks that can lead to suboptimal cluster resource utilization:
+
+* **Feedback delays.** When a container finishes its execution at a node, the 
RM gets notified that there are available resources through the next NM-RM 
heartbeat, then the RM schedules a new container at that node, the AM gets 
notified through the next AM-RM heartbeat, and finally the AM launches the new 
container at the node. These delays result in idle node resources, which in 
turn lead to lower resource utilization, especially when workloads involve 
tasks whose duration is relatively short.
+* **Allocated vs. utilized resources.** The RM allocates containers based on 
the *allocated* resources at each node, which might be significantly higher 
than the actually *utilized* resources (e.g., think of a container for which 
4GB memory have been allocated, but only 2GB are being utilized). This lowers 
effective resource utilization, and can be avoided if the RM takes into account 
the utilized resources during scheduling. However, this has to be done in a way 
that allows resources to be reclaimed in case the utilized resources of a 
running container increase.
+
+To mitigate the above problems, in addition to the existing containers (which 
we term **guaranteed** containers hereafter), we introduce the notion of 
**opportunistic** containers. An opportunistic container can be dispatched to 
an NM, even if there are no available (unallocated) resources for it at the 
moment of scheduling. In such a case, the opportunistic container will be 
queued at the NM, waiting for resources to become available for its execution 
to start. The opportunistic containers are of lower priority than the 
guaranteed ones, which means that they can be preempted for guaranteed 
containers to start their execution. Therefore, they can be used to improve 
cluster resource utilization without impacting the execution of existing 
guaranteed containers.
+
+An additional advantage of opportunistic containers is that they introduce a 
notion of **execution priority at the NMs**. For instance, a lower priority job 
that does not require strict execution guarantees can use opportunistic 
containers or a mix of container execution types for its tasks.
+
+We have introduced two ways of allocating opportunistic containers: a 
**centralized** and a **distributed** one. In the centralized scheduling, 
opportunistic containers are allocated through the YARN RM, whereas in the 
distributed one, through local schedulers that reside at each NM. Centralized 
allocation allows for higher quality placement decisions and for implementing 
more involved sharing policies across applications (e.g., fairness). On the 
other hand, distributed scheduling can offer faster container allocation, which 
is useful for short tasks, as it avoids the round-trip to the RM. In both 
cases, the scheduling of guaranteed containers remains intact and happens 
through the YARN RM (using the existing Fair or Capacity Scheduler).
+
+Note that in the current implementation, we are allocating containers based on 
allocated (and not utilized) resources. Therefore, we tackle the "feedback 
delays" problem mentioned above, but not the "allocated vs. utilized resources" 
one. There is ongoing work (`YARN-1011`) that employs opportunistic containers 
to address the latter problem too.
+
+Below, we describe in more detail the [container execution 
types](#Container_Execution_Types), as well as the 
[execution](#Execution_of_Opportunistic_Containers) (including the container 
queuing at the NMs) and [allocation](#Allocation_of_Opportunistic_Containers) 
of opportunistic containers. Then we discuss how to fine-tune opportunistic 
containers through some [advanced configuration 
parameters](#Advanced_Configuration). Finally, we discuss open items for 
[future work](#Items_for_Future_Work).
+
+
+Container Execution Types
+-------------------------
+
+We introduce the following two types of containers:
+
+* **Guaranteed containers** correspond to the existing YARN containers. They 
are allocated by the Fair or Capacity Scheduler, and once dispatched to a node, 
it is guaranteed that there are available resources for their execution to 
start immediately. Moreover, these containers run to completion (as long as 
there are no failures). They can be preempted only in case the scheduler's 
queue to which they belong, violates fairness or capacity constraints.
+* **Opportunistic containers** are not guaranteed to have resources for their 
execution to start when they get dispatched to a node. Instead, they might be 
queued at the NM until resources become available. In case a guaranteed 
container arrives at a node and there are no resources available for it, one or 
more opportunistic containers will be preempted to execute the guaranteed one.
+
+When an AM submits its resource requests to the RM, it specifies the type for 
each container (default is guaranteed), determining the way the container will 
be [allocated](#Allocation_of_Opportunistic_Containers). Subsequently, when the 
container is launched by the AM at an NM, its type determines how it will be 
[executed](#Execution_of_Opportunistic_Containers) by the NM.
+
+
+Execution of Opportunistic Containers
+-------------------------------------
+
+When a container arrives at an NM, its execution is determined by the 
available resources at the NM and the container type. Guaranteed containers 
start their execution immediately, and if needed, the NM will kill running 
opportunistic containers to ensure there are sufficient resources for the 
guaranteed ones to start. On the other hand, opportunistic containers can be 
queued at the NM, if there are no resources available to start their execution 
when they arrive at the NM. To enable this, we extended the NM by allowing 
queuing of containers at each node. The NM monitors the local resources, and 
when there are sufficient resources available, it starts the execution of the 
opportunistic container that is at the head of the queue.
+
+In particular, when a container arrives at an NM, localization is performed 
(i.e., all required resources are downloaded), and then the container moves to 
a `SCHEDULED` state, in which the container is queued, waiting for its 
execution to begin:
+
+* If there are available resources, the execution of the container starts 
immediately, irrespective of its execution type.
+* If there are no available resources:
+    * If the container is guaranteed, we kill as many running opportunistic 
containers as required for the guaranteed container to be executed, and then 
start its execution.
+    * If the container is opportunistic, it remains at the queue until 
resources become available.
+* When a container (guaranteed or opportunistic) finishes its execution and 
resources get freed up, we examine the queued containers and if there are 
available resources we start their execution. We pick containers from the queue 
in a FIFO order.
+
+In the [future work items](#Items_for_Future_Work) below, we discuss different 
ways of prioritizing task execution (queue reordering) and of killing 
opportunistic containers to make space for guaranteed ones.
+
+
+Allocation of Opportunistic Containers
+--------------------------------------
+
+As mentioned above, we provide both a centralized and a distributed way of 
allocating opportunistic containers, which we describe below.
+
+$H3 Centralized Allocation
+
+We have introduced a new service at the RM, namely the 
`OpportunisticContainerAllocatorAMService`, which extends the 
`ApplicationMasterService`. When the centralized opportunistic allocation is 
enabled, the resource requests from the AMs are served at the RM side by the 
`OpportunisticContainerAllocatorAMService`, which splits them into two sets of 
resource requests: 
+
+* The guaranteed set is forwarded to the existing `ApplicationMasterService` 
and is subsequently handled by the Fair or Capacity Scheduler.
+* The opportunistic set is handled by the new 
`OpportunisticContainerAllocator`, which performs the scheduling of 
opportunistic containers to nodes.
+
+The `OpportunisticContainerAllocator` maintains a list with the [least loaded 
nodes](#Determining_Nodes_for_Allocation) of the cluster at each moment, and 
assigns containers to them in a round-robin fashion. Note that in the current 
implementation, we purposely do not take into account node locality 
constraints. Since an opportunistic container (unlike the guaranteed ones) 
might wait at the queue of an NM before its execution starts, it is more 
important to allocate it at a node that is less loaded (i.e., where queuing 
delay will be smaller) rather than respect its locality constraints. Moreover, 
we do not take into account sharing (fairness/capacity) constraints for 
opportunistic containers at the moment. Support for both locality and sharing 
constraints can be added in the future if required.
+
+
+$H3 Distributed Allocation
+
+In order to enable distributed scheduling of opportunistic containers, we have 
introduced a new service at each NM, called `AMRMProxyService`. The 
`AMRMProxyService` implements the `ApplicationMasterService` protocol, and acts 
as a proxy between the AMs running at that node and the RM. When the 
`AMRMProxyService` is enabled (through a parameter), we force all AMs running 
at a particular node to communicate with the `AMRMProxyService` of the same 
node, instead of going directly to the RM. Moreover, to ensure that the AMs 
will not talk directly with the RM, when a new AM gets initialized, we replace 
its `AMRMToken` with a token signed by the `AMRMProxyService`.
+
+A chain of interceptors can be registered with the `AMRMProxyService`. One of 
these interceptors is the `DistributedScheduler` that is responsible for 
allocating opportunistic containers in a distributed way, without needing to 
contact the RM. This modular design makes the `AMRMProxyService` instrumental 
in other scenarios too, such as YARN federation (`YARN-2915`) or throttling 
down misbehaving AMs, which can be enabled simply by adding additional 
interceptors at the interceptor chain.
+
+When distributed opportunistic scheduling is enabled, each AM sends its 
resource requests to the `AMRMProxyService` running at the same node. The 
`AMRMProxyService` splits the resource requests into two sets:
+
+* The guaranteed set is forwarded to the RM. In this case the 
`AMRMProxyService` simply acts as a proxy between the AM and the RM, and the 
container allocation remains intact (using the Fair or Capacity Scheduler).
+* The opportunistic set is not forwarded to the RM. Instead, it is handled by 
the `DistributedScheduler` that is running locally at the node. In particular, 
the `DistributedScheduler` maintains a list with the least loaded nodes in the 
cluster, and allocates containers to them in a round-robin fashion. The RM 
informs the `DistributedScheduler` about the least loaded nodes at regular 
intervals through the NM-RM heartbeats.
+
+The above procedure is similar to the one performed by the 
`OpportunisticContainerAllocatorAMService` in the case of centralized 
opportunistic scheduling described above. The main difference is that in the 
distributed case, the splitting of requests into guaranteed and opportunistic 
happens locally at the node, and only the guaranteed requests are forwarded to 
the RM, while the opportunistic ones are handled without contacting the RM.
+
+
+$H3 Determining Nodes for Allocation
+
+Each NM informs the RM periodically through the NM-RM heartbeats about the 
number of running guaranteed and opportunistic containers, as well as the 
number of queued opportunistic containers. The RM gathers this information from 
all nodes and determines the least loaded ones.
+
+In the case of centralized allocation of opportunistic containers, this 
information is immediately available, since the allocation happens centrally. 
In the case of distributed scheduling, the list with the least loaded nodes is 
propagated to all NMs (and thus becomes available to the 
`DistributedSchedulers`) through the heartbeat responses from the RM to the 
NMs. The number of least loaded nodes sent to the NMs is configurable.
+
+At the moment, we take into account only the number of queued opportunistic 
containers at each node in order to estimate the time an opportunistic 
container would have to wait if sent to that node and, thus, determine the 
least loaded nodes. If the AM provided us with information about the estimated 
task durations, we could take them into account in order to have better 
estimates of the queue waiting times.
+
+
+$H3 Rebalancing Node Load
+
+Occasionally poor placement choices for opportunistic containers may be made 
(due to stale queue length estimates), which can lead to load imbalance between 
nodes. The problem is more pronounced under high cluster load, and also in the 
case of distributed scheduling (multiple `DistributedSchedulers` may place 
containers at the same NM, since they do not coordinate with each other). To 
deal with this load imbalance between the NM queues, we perform load shedding 
to dynamically re-balance the load between NMs. In particular, while 
aggregating at the RM the queue time estimates published by each NM, we 
construct a distribution and find a targeted maximal value for the length of 
the NM queues (based on the mean and standard deviation of the distribution). 
Then the RM disseminates this value to the various NMs through the heartbeat 
responses. Subsequently, using this information, an NM on a node whose queue 
length is above the threshold discards opportunistic containers to meet this 
maxi
 mal value. This forces the associated individual AMs to reschedule those 
containers elsewhere.
+
+
+Advanced Configuration
+----------------------
+
+The main properties for enabling opportunistic container allocation and 
choosing between centralized and distributed allocation were described in the 
[quick guide](#Quick_Guide) in the beginning of this document. Here we present 
more advanced configuration. Note that using default values for those 
parameters should be sufficient in most cases. All parameters below have to be 
defined in the **conf/yarn-site.xml** file.
+
+To determine the number of [least loaded 
nodes](#Determining_Nodes_for_Allocation) that will be used when scheduling 
opportunistic containers and how often this list will be refreshed, we use the 
following parameters:
+
+| Property | Description | Default value |
+|:-------- |:----- |:----- |
+| `yarn.resourcemanager.opportunistic-container-allocation.nodes-used` | 
Number of least loaded nodes to be used by the Opportunistic Container 
allocator for dispatching containers during container allocation. A higher 
value can improve load balance in large clusters. | `10` |
+| `yarn.resourcemanager.nm-container-queuing.sorting-nodes-interval-ms` | 
Frequency for computing least loaded nodes. | `1000` |
+
+
+As discussed in the [node load rebalancing](#Rebalancing_Node_Load) section 
above, at regular intervals, the RM gathers all NM queue lengths and computes 
their mean value (`avg`) and standard deviation (`stdev`), as well as the value 
`avg + k*stdev` (where `k` a float). This value gets propagated through the 
NM-RM heartbeats to all NMs, who should respect that value by dequeuing 
containers (if required), as long as their current queue length is between a 
`queue_min_length` and a `queue_max_length` value (these values are used to 
avoid dequeuing tasks from very short queues and to aggressively dequeue tasks 
from long queues, respectively). 
+The parameters `k`, `queue_min_length` and `queue_max_length` can be specified 
as follows:
+
+| Property | Description | Default value |
+|:-------- |:----- |:----- |
+| `yarn.resourcemanager.nm-container-queuing.queue-limit-stdev` | The `k` 
parameter. | `1.0f` |
+| `yarn.resourcemanager.nm-container-queuing.min-queue-length` | The 
`queue_min_length` parameter. | `5` |
+| `yarn.resourcemanager.nm-container-queuing.max-queue-length` | The 
`queue_max_length` parameter. | `15` |
+
+
+Finally, two more properties can further tune the `AMRMProxyService` in case 
distributed scheduling is used:
+
+| Property | Description | Default value |
+|:-------- |:----- |:----- |
+| `yarn.nodemanager.amrmproxy.address` | The address/port to which the 
`AMRMProxyService` is bound to. | `0.0.0.0:8049` |
+| `yarn.nodemanager.amrmproxy.client.thread-count` | The number of threads 
that are used at each NM for serving the interceptors register to the 
`AMRMProxyService` by different jobs. | `3` |
+
+
+Items for Future Work
+---------------------
+
+Here we describe multiple ways in which we can extend/enhance the allocation 
and execution of opportunistic containers. We also provide the JIRAs that track 
each item.
+
+* **Resource overcommitment** (`YARN-1011`). As already discussed, in order to 
further improve the cluster resource utilization, we can schedule containers 
not based on the allocated resources but on the actually utilized ones. When 
over-committing resources, there is the risk of running out of resources in 
case we have an increase in the utilized resources of the already running 
containers. Therefore, opportunistic execution should be used for containers 
whose allocation goes beyond the capacity of a node. This way, we can choose 
opportunistic containers to kill for reclaiming resources.
+* **NM Queue reordering** (`YARN-5886`). Instead of executing queued 
containers in a FIFO order, we can employ reordering strategies that 
dynamically determine which opportunistic container will be executed next. For 
example, we can prioritize containers that are expected to be short-running or 
which belong to applications that are close to completion.
+* **Out of order killing at NMs** (`YARN-5887`). As described above, when we 
need to free up resources for a guaranteed container to start its execution, we 
kill opportunistic containers in reverse order of arrival (first the most 
recently started ones). This might not always be the right decision. For 
example, we might want to minimize the number of containers killed or to 
refrain from killing containers of jobs that are very close to completion.
+* **Container pausing** (`YARN-5292`): At the moment we kill opportunistic 
containers to make room for guaranteed in case of resource contention. In busy 
clusters this can lower the effective cluster utilization: whenever we kill a 
running opportunistic container, it has to be restarted, and thus we lose work. 
To this end, we can instead pause running opportunistic containers. Note that 
this will require support from the container executor (e.g., the container 
technology used) and from the application.
+* **Container promotion** (`YARN-5085`). There are cases where changing the 
execution type of a container during its execution can be beneficial. For 
instance, an application might submit a container as opportunistic, and when 
its execution starts, it can request its promotion to a guaranteed container to 
avoid it getting killed.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to