YARN-2190. Added CPU and memory limit options to the default container executor 
for Windows containers. Contributed by Chuan Liu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1752b659
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1752b659
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1752b659

Branch: refs/heads/HDFS-7285
Commit: 1752b65904f2a4f5dec33770a2924410d3e74295
Parents: 667c3fc
Author: Jian He <jia...@apache.org>
Authored: Fri Mar 6 14:17:57 2015 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Mon Mar 9 13:11:26 2015 -0700

----------------------------------------------------------------------
 BUILDING.txt                                    |   9 +-
 .../hadoop-common/src/main/winutils/task.c      | 144 ++++++++++++++++---
 .../src/main/winutils/win8sdk.props             |  28 ++++
 .../src/main/winutils/winutils.vcxproj          |   3 +
 .../org/apache/hadoop/util/TestWinUtils.java    |  62 ++++++++
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  12 ++
 .../src/main/resources/yarn-default.xml         |  14 ++
 .../server/nodemanager/ContainerExecutor.java   |  49 ++++++-
 .../nodemanager/DefaultContainerExecutor.java   |   9 +-
 .../WindowsSecureContainerExecutor.java         |   9 +-
 .../nodemanager/TestContainerExecutor.java      |  53 +++++++
 12 files changed, 360 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1752b659/BUILDING.txt
----------------------------------------------------------------------
diff --git a/BUILDING.txt b/BUILDING.txt
index 6e38ad3..b60da6c 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -209,7 +209,8 @@ Requirements:
 * Findbugs 1.3.9 (if running findbugs)
 * ProtocolBuffer 2.5.0
 * CMake 2.6 or newer
-* Windows SDK or Visual Studio 2010 Professional
+* Windows SDK 7.1 or Visual Studio 2010 Professional
+* Windows SDK 8.1 (if building CPU rate control for the container executor)
 * zlib headers (if building native code bindings for zlib)
 * Internet connection for first build (to fetch all Maven and Hadoop 
dependencies)
 * Unix command-line tools from GnuWin32: sh, mkdir, rm, cp, tar, gzip. These
@@ -220,11 +221,15 @@ can be downloaded from http://git-scm.com/download/win.
 
 If using Visual Studio, it must be Visual Studio 2010 Professional (not 2012).
 Do not use Visual Studio Express.  It does not support compiling for 64-bit,
-which is problematic if running a 64-bit system.  The Windows SDK is free to
+which is problematic if running a 64-bit system.  The Windows SDK 7.1 is free 
to
 download here:
 
 http://www.microsoft.com/en-us/download/details.aspx?id=8279
 
+The Windows SDK 8.1 is available to download at:
+
+http://msdn.microsoft.com/en-us/windows/bg162891.aspx
+
 Cygwin is neither required nor supported.
 
 
----------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1752b659/hadoop-common-project/hadoop-common/src/main/winutils/task.c
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/winutils/task.c 
b/hadoop-common-project/hadoop-common/src/main/winutils/task.c
index 21b1893..37c6ca1 100644
--- a/hadoop-common-project/hadoop-common/src/main/winutils/task.c
+++ b/hadoop-common-project/hadoop-common/src/main/winutils/task.c
@@ -49,6 +49,31 @@ typedef enum TaskCommandOptionType
   TaskProcessList
 } TaskCommandOption;
 
+ //----------------------------------------------------------------------------
+// Function: GetLimit
+//
+// Description:
+//  Get the resource limit value in long type given the command line argument.
+//
+// Returns:
+// TRUE: If successfully get the value
+// FALSE: otherwise
+static BOOL GetLimit(__in const wchar_t *str, __out long *value)
+{
+  wchar_t *end = NULL;
+  if (str == NULL || value == NULL) return FALSE;
+  *value = wcstol(str, &end, 10);
+  if (end == NULL || *end != '\0')
+  {
+    *value = -1;
+    return FALSE;
+  }
+  else
+  {
+    return TRUE;
+  }
+}
+
 //----------------------------------------------------------------------------
 // Function: ParseCommandLine
 //
@@ -61,7 +86,9 @@ typedef enum TaskCommandOptionType
 // FALSE: otherwise
 static BOOL ParseCommandLine(__in int argc,
                              __in_ecount(argc) wchar_t *argv[],
-                             __out TaskCommandOption *command)
+                             __out TaskCommandOption *command,
+                             __out_opt long *memory,
+                             __out_opt long *vcore)
 {
   *command = TaskInvalid;
 
@@ -88,9 +115,44 @@ static BOOL ParseCommandLine(__in int argc,
     }
   }
 
-  if (argc == 4) {
+  if (argc >= 4 && argc <= 8) {
     if (wcscmp(argv[1], L"create") == 0)
     {
+      int i;
+      for (i = 2; i < argc - 3; i++)
+      {
+        if (wcscmp(argv[i], L"-c") == 0)
+        {
+          if (vcore != NULL && !GetLimit(argv[i + 1], vcore))
+          {
+            return FALSE;
+          }
+          else
+          {
+            i++;
+            continue;
+          }
+        }
+        else if (wcscmp(argv[i], L"-m") == 0)
+        {
+          if (memory != NULL && !GetLimit(argv[i + 1], memory))
+          {
+            return FALSE;
+          }
+          else
+          {
+            i++;
+            continue;
+          }
+        }
+        else
+        {
+          break;
+        }
+      }
+      if (argc - i != 2)
+        return FALSE;
+
       *command = TaskCreate;
       return TRUE;
     }
@@ -573,7 +635,7 @@ done:
 // ERROR_SUCCESS: On success
 // GetLastError: otherwise
 DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in PCWSTR jobObjName,__in 
PCWSTR cmdLine, 
-  __in LPCWSTR userName) 
+  __in LPCWSTR userName, __in long memory, __in long cpuRate)
 {
   DWORD dwErrorCode = ERROR_SUCCESS;
   DWORD exitCode = EXIT_FAILURE;
@@ -616,6 +678,12 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in 
PCWSTR jobObjName,__in PC
     return dwErrorCode;
   }
   jeli.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE;
+  if (memory > 0)
+  {
+    jeli.BasicLimitInformation.LimitFlags |= JOB_OBJECT_LIMIT_JOB_MEMORY;
+    jeli.ProcessMemoryLimit = ((SIZE_T) memory) * 1024 * 1024;
+    jeli.JobMemoryLimit = ((SIZE_T) memory) * 1024 * 1024;
+  }
   if(SetInformationJobObject(jobObject, 
                              JobObjectExtendedLimitInformation, 
                              &jeli, 
@@ -626,6 +694,24 @@ DWORD CreateTaskImpl(__in_opt HANDLE logonHandle, __in 
PCWSTR jobObjName,__in PC
     CloseHandle(jobObject);
     return dwErrorCode;
   }
+#ifdef NTDDI_WIN8
+  if (cpuRate > 0)
+  {
+    JOBOBJECT_CPU_RATE_CONTROL_INFORMATION jcrci = { 0 };
+    SYSTEM_INFO sysinfo;
+    GetSystemInfo(&sysinfo);
+    jcrci.ControlFlags = JOB_OBJECT_CPU_RATE_CONTROL_ENABLE |
+      JOB_OBJECT_CPU_RATE_CONTROL_HARD_CAP;
+    jcrci.CpuRate = min(10000, cpuRate);
+    if(SetInformationJobObject(jobObject, JobObjectCpuRateControlInformation,
+          &jcrci, sizeof(jcrci)) == 0)
+    {
+      dwErrorCode = GetLastError();
+      CloseHandle(jobObject);
+      return dwErrorCode;
+    }
+  }
+#endif
 
   if (logonHandle != NULL) {
     dwErrorCode = AddNodeManagerAndUserACEsToObject(jobObject, userName, 
JOB_OBJECT_ALL_ACCESS);
@@ -809,10 +895,10 @@ create_process_done:
 // Returns:
 // ERROR_SUCCESS: On success
 // GetLastError: otherwise
-DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine) 
+DWORD CreateTask(__in PCWSTR jobObjName,__in PWSTR cmdLine, __in long memory, 
__in long cpuRate)
 {
   // call with null logon in order to create tasks utilizing the current logon
-  return CreateTaskImpl( NULL, jobObjName, cmdLine, NULL);
+  return CreateTaskImpl( NULL, jobObjName, cmdLine, NULL, memory, cpuRate);
 }
 
 //----------------------------------------------------------------------------
@@ -893,7 +979,7 @@ DWORD CreateTaskAsUser(__in PCWSTR jobObjName,
       goto done;
   }
 
-  err = CreateTaskImpl(logonHandle, jobObjName, cmdLine, user);
+  err = CreateTaskImpl(logonHandle, jobObjName, cmdLine, user, -1, -1);
 
 done: 
   if( profileIsLoaded ) {
@@ -1095,6 +1181,8 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
 {
   DWORD dwErrorCode = ERROR_SUCCESS;
   TaskCommandOption command = TaskInvalid;
+  long memory = -1;
+  long cpuRate = -1;
   wchar_t* cmdLine = NULL;
   wchar_t buffer[16*1024] = L""; // 32K max command line
   size_t charCountBufferLeft = sizeof(buffer)/sizeof(wchar_t);
@@ -1111,7 +1199,7 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
                ARGC_COMMAND_ARGS
        };
 
-  if (!ParseCommandLine(argc, argv, &command)) {
+  if (!ParseCommandLine(argc, argv, &command, &memory, &cpuRate)) {
     dwErrorCode = ERROR_INVALID_COMMAND_LINE;
 
     fwprintf(stderr, L"Incorrect command line arguments.\n\n");
@@ -1123,7 +1211,7 @@ int Task(__in int argc, __in_ecount(argc) wchar_t *argv[])
   {
     // Create the task jobobject
     //
-    dwErrorCode = CreateTask(argv[2], argv[3]);
+    dwErrorCode = CreateTask(argv[argc-2], argv[argc-1], memory, cpuRate);
     if (dwErrorCode != ERROR_SUCCESS)
     {
       ReportErrorCode(L"CreateTask", dwErrorCode);
@@ -1238,18 +1326,30 @@ void TaskUsage()
   // jobobject's are being used.
   // ProcessTree.isSetsidSupported()
   fwprintf(stdout, L"\
-    Usage: task create [TASKNAME] [COMMAND_LINE] |\n\
-          task createAsUser [TASKNAME] [USERNAME] [PIDFILE] [COMMAND_LINE] |\n\
-          task isAlive [TASKNAME] |\n\
-          task kill [TASKNAME]\n\
-          task processList [TASKNAME]\n\
-    Creates a new task jobobject with taskname\n\
-    Creates a new task jobobject with taskname as the user provided\n\
-    Checks if task jobobject is alive\n\
-    Kills task jobobject\n\
-    Prints to stdout a list of processes in the task\n\
-    along with their resource usage. One process per line\n\
-    and comma separated info per process\n\
-    ProcessId,VirtualMemoryCommitted(bytes),\n\
-    WorkingSetSize(bytes),CpuTime(Millisec,Kernel+User)\n");
+Usage: task create [OPTOINS] [TASKNAME] [COMMAND_LINE]\n\
+         Creates a new task job object with taskname and options to set CPU\n\
+         and memory limits on the job object\n\
+\n\
+         OPTIONS: -c [cup rate] set the cpu rate limit on the job object.\n\
+                  -m [memory] set the memory limit on the job object.\n\
+         The cpu limit is an integral value of percentage * 100. The memory\n\
+         limit is an integral number of memory in MB. \n\
+         The limit will not be set if 0 or negative value is passed in as\n\
+         parameter(s).\n\
+\n\
+       task createAsUser [TASKNAME] [USERNAME] [PIDFILE] [COMMAND_LINE]\n\
+         Creates a new task jobobject with taskname as the user provided\n\
+\n\
+       task isAlive [TASKNAME]\n\
+         Checks if task job object is alive\n\
+\n\
+       task kill [TASKNAME]\n\
+         Kills task job object\n\
+\n\
+       task processList [TASKNAME]\n\
+         Prints to stdout a list of processes in the task\n\
+         along with their resource usage. One process per line\n\
+         and comma separated info per process\n\
+         ProcessId,VirtualMemoryCommitted(bytes),\n\
+         WorkingSetSize(bytes),CpuTime(Millisec,Kernel+User)\n");
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1752b659/hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props 
b/hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props
new file mode 100644
index 0000000..503b37a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/winutils/win8sdk.props
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<Project ToolsVersion="4.0" 
xmlns="http://schemas.microsoft.com/developer/msbuild/2003";>
+ <ImportGroup Label="PropertySheets" />
+ <PropertyGroup Label="UserMacros" />
+ <PropertyGroup>
+   
<ExecutablePath>$(VCInstallDir)bin\x86_amd64;$(VCInstallDir)bin;$(WindowsSdkDir)bin\NETFX
 4.0 Tools;$(MSBuildProgramFiles32)\Windows 
Kits\8.1\bin\x86;$(VSInstallDir)Common7\Tools\bin;$(VSInstallDir)Common7\tools;$(VSInstallDir)Common7\ide;$(MSBuildProgramFiles32)\HTML
 Help 
Workshop;$(FrameworkSDKDir)\bin;$(MSBuildToolsPath32);$(VSInstallDir);$(SystemRoot)\SysWow64;$(FxCopDir);$(PATH)</ExecutablePath>
+   <IncludePath>$(MSBuildProgramFiles32)\Windows 
Kits\8.1\Include\um;$(MSBuildProgramFiles32)\Windows 
Kits\8.1\Include\shared;$(VCInstallDir)include;$(VCInstallDir)atlmfc\include;$(FrameworkSDKDir)\include;</IncludePath>
+   
<LibraryPath>$(VCInstallDir)lib\amd64;$(VCInstallDir)atlmfc\lib\amd64;$(MSBuildProgramFiles32)\Windows
 Kits\8.1\lib\win8\um\x64;$(MSBuildProgramFiles32)\Windows 
Kits\8.1\Lib\winv6.3\um\x64;$(FrameworkSDKDir)\lib\x64</LibraryPath>
+   
<ExcludePath>$(VCInstallDir)include;$(VCInstallDir)atlmfc\include;$(MSBuildProgramFiles32)\Windows
 Kits\8.1\Include\um;$(MSBuildProgramFiles32)\Windows 
Kits\8.1\Include\shared;$(FrameworkSDKDir)\include;$(MSBuildToolsPath32);$(VCInstallDir)atlmfc\lib;$(VCInstallDir)lib;</ExcludePath>
+ </PropertyGroup>
+<ItemDefinitionGroup />
+</Project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1752b659/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj 
b/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
index 9ecba0a..76a7414 100644
--- a/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
+++ b/hadoop-common-project/hadoop-common/src/main/winutils/winutils.vcxproj
@@ -67,6 +67,9 @@
   </PropertyGroup>
   <ImportGroup Label="ExtensionSettings">
   </ImportGroup>
+  <ImportGroup Label="PropertySheets" 
Condition="exists('$(MSBuildProgramFiles32)\Windows Kits\8.1')">
+    <Import Project="win8sdk.props" />
+  </ImportGroup>
   <ImportGroup Condition="'$(Configuration)|$(Platform)'=='Debug|x64'" 
Label="PropertySheets">
     <Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" 
Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" 
Label="LocalAppDataPlatform" />
   </ImportGroup>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1752b659/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java
index 8ac6e40..987c706 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestWinUtils.java
@@ -547,4 +547,66 @@ public class TestWinUtils {
     
     assertThat(outNumber, containsString(testNumber));
   }
+
+  @Test (timeout = 30000)
+  public void testTaskCreateWithLimits() throws IOException {
+    // Generate a unique job id
+    String jobId = String.format("%f", Math.random());
+
+    // Run a task without any options
+    String out = Shell.execCommand(Shell.WINUTILS, "task", "create",
+        "job" + jobId, "cmd /c echo job" + jobId);
+    assertTrue(out.trim().equals("job" + jobId));
+
+    // Run a task without any limits
+    jobId = String.format("%f", Math.random());
+    out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-1", "-m",
+        "-1", "job" + jobId, "cmd /c echo job" + jobId);
+    assertTrue(out.trim().equals("job" + jobId));
+
+    // Run a task with limits (128MB should be enough for a cmd)
+    jobId = String.format("%f", Math.random());
+    out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "10000", 
"-m",
+        "128", "job" + jobId, "cmd /c echo job" + jobId);
+    assertTrue(out.trim().equals("job" + jobId));
+
+    // Run a task without enough memory
+    try {
+      jobId = String.format("%f", Math.random());
+      out = Shell.execCommand(Shell.WINUTILS, "task", "create", "-m", "128", 
"job"
+          + jobId, "java -Xmx256m -version");
+      fail("Failed to get Shell.ExitCodeException with insufficient memory");
+    } catch (Shell.ExitCodeException ece) {
+      assertThat(ece.getExitCode(), is(1));
+    }
+
+    // Run tasks with wrong parameters
+    //
+    try {
+      jobId = String.format("%f", Math.random());
+      Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-1", "-m",
+          "-1", "foo", "job" + jobId, "cmd /c echo job" + jobId);
+      fail("Failed to get Shell.ExitCodeException with bad parameters");
+    } catch (Shell.ExitCodeException ece) {
+      assertThat(ece.getExitCode(), is(1639));
+    }
+
+    try {
+      jobId = String.format("%f", Math.random());
+      Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "-m", "-1",
+          "job" + jobId, "cmd /c echo job" + jobId);
+      fail("Failed to get Shell.ExitCodeException with bad parameters");
+    } catch (Shell.ExitCodeException ece) {
+      assertThat(ece.getExitCode(), is(1639));
+    }
+
+    try {
+      jobId = String.format("%f", Math.random());
+      Shell.execCommand(Shell.WINUTILS, "task", "create", "-c", "foo",
+          "job" + jobId, "cmd /c echo job" + jobId);
+      fail("Failed to get Shell.ExitCodeException with bad parameters");
+    } catch (Shell.ExitCodeException ece) {
+      assertThat(ece.getExitCode(), is(1639));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1752b659/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d073169..c2aa2ef 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -363,6 +363,9 @@ Release 2.7.0 - UNRELEASED
     YARN-1809. Synchronize RM and TimeLineServer Web-UIs. (Zhijie Shen and
     Xuan Gong via jianhe)
 
+    YARN-2190. Added CPU and memory limit options to the default container
+    executor for Windows containers. (Chuan Liu via jianhe)
+
   OPTIMIZATIONS
 
     YARN-2990. FairScheduler's delay-scheduling always waits for node-local 
and 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1752b659/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 25b808e..8c83fea 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1027,6 +1027,18 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_DELAY =
       20;
 
+  /**
+   * Indicates if memory and CPU limits will be set for the Windows Job
+   * Object for the containers launched by the default container executor.
+   */
+  public static final String NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED =
+      NM_PREFIX + "windows-container.memory-limit.enabled";
+  public static final boolean 
DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED = false;
+
+  public static final String NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED =
+      NM_PREFIX + "windows-container.cpu-limit.enabled";
+  public static final boolean DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED = 
false;
+
   /** 
   /* The Windows group that the windows-secure-container-executor should run 
as.
   */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1752b659/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index df730d5..66400c8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1075,6 +1075,20 @@
   </property>
 
   <property>
+    <description>This flag determines whether memory limit will be set for the 
Windows Job
+    Object of the containers launched by the default container 
executor.</description>
+    <name>yarn.nodemanager.windows-container.memory-limit.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>This flag determines whether CPU limit will be set for the 
Windows Job
+    Object of the containers launched by the default container 
executor.</description>
+    <name>yarn.nodemanager.windows-container.cpu-limit.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
     <description>T-file compression types used to compress aggregated 
logs.</description>
     <name>yarn.nodemanager.log-aggregation.compression-type</name>
     <value>none</value>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1752b659/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
index 77193df..248a393 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerDiagnosticsUpdateEvent;
@@ -298,6 +299,11 @@ public abstract class ContainerExecutor implements 
Configurable {
       readLock.unlock();
     }
   }
+
+  protected String[] getRunCommand(String command, String groupId,
+      String userName, Path pidFile, Configuration conf) {
+    return getRunCommand(command, groupId, userName, pidFile, conf, null);
+  }
   
   /** 
    *  Return a command to execute the given command in OS shell.
@@ -306,7 +312,7 @@ public abstract class ContainerExecutor implements 
Configurable {
    *  non-Windows, groupId is ignored. 
    */
   protected String[] getRunCommand(String command, String groupId,
-      String userName, Path pidFile, Configuration conf) {
+      String userName, Path pidFile, Configuration conf, Resource resource) {
     boolean containerSchedPriorityIsSet = false;
     int containerSchedPriorityAdjustment = 
         YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY;
@@ -320,7 +326,46 @@ public abstract class ContainerExecutor implements 
Configurable {
     }
   
     if (Shell.WINDOWS) {
-      return new String[] { Shell.WINUTILS, "task", "create", groupId,
+      int cpuRate = -1;
+      int memory = -1;
+      if (resource != null) {
+        if (conf
+            .getBoolean(
+                YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED,
+                
YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED)) {
+          memory = resource.getMemory();
+        }
+
+        if (conf.getBoolean(
+            YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED,
+            YarnConfiguration.DEFAULT_NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED)) 
{
+          int containerVCores = resource.getVirtualCores();
+          int nodeVCores = conf.getInt(YarnConfiguration.NM_VCORES,
+              YarnConfiguration.DEFAULT_NM_VCORES);
+          // cap overall usage to the number of cores allocated to YARN
+          int nodeCpuPercentage = Math
+              .min(
+                  conf.getInt(
+                      
YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT,
+                      
YarnConfiguration.DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT),
+                  100);
+          nodeCpuPercentage = Math.max(0, nodeCpuPercentage);
+          if (nodeCpuPercentage == 0) {
+            String message = "Illegal value for "
+                + YarnConfiguration.NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT
+                + ". Value cannot be less than or equal to 0.";
+            throw new IllegalArgumentException(message);
+          }
+          float yarnVCores = (nodeCpuPercentage * nodeVCores) / 100.0f;
+          // CPU should be set to a percentage * 100, e.g. 20% cpu rate limit
+          // should be set as 20 * 100. The following setting is equal to:
+          // 100 * (100 * (vcores / Total # of cores allocated to YARN))
+          cpuRate = Math.min(10000,
+              (int) ((containerVCores * 10000) / yarnVCores));
+        }
+      }
+      return new String[] { Shell.WINUTILS, "task", "create", "-m",
+          String.valueOf(memory), "-c", String.valueOf(cpuRate), groupId,
           "cmd /c " + command };
     } else {
       List<String> retCommand = new ArrayList<String>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1752b659/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
index f3d2121..e0ecea3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.util.Shell.CommandExecutor;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -202,7 +203,7 @@ public class DefaultContainerExecutor extends 
ContainerExecutor {
       setScriptExecutable(sb.getWrapperScriptPath(), user);
 
       shExec = buildCommandExecutor(sb.getWrapperScriptPath().toString(),
-          containerIdStr, user, pidFile,
+          containerIdStr, user, pidFile, container.getResource(),
           new File(containerWorkDir.toUri().getPath()),
           container.getLaunchContext().getEnvironment());
       
@@ -256,12 +257,12 @@ public class DefaultContainerExecutor extends 
ContainerExecutor {
   }
 
   protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, 
-      String containerIdStr, String user, Path pidFile, File wordDir, 
-      Map<String, String> environment) 
+      String containerIdStr, String user, Path pidFile, Resource resource,
+      File wordDir, Map<String, String> environment)
           throws IOException {
     
     String[] command = getRunCommand(wrapperScriptPath,
-        containerIdStr, user, pidFile, this.getConf());
+        containerIdStr, user, pidFile, this.getConf(), resource);
 
       LOG.info("launchContainer: " + Arrays.toString(command));
       return new ShellCommandExecutor(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1752b659/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
index cd3e71a..b7bec5f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/WindowsSecureContainerExecutor.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.io.nativeio.NativeIOException;
 import org.apache.hadoop.util.NativeCodeLoader;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.Shell.CommandExecutor;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@@ -727,11 +728,9 @@ public class WindowsSecureContainerExecutor extends 
DefaultContainerExecutor {
    }
  
    @Override
-   protected CommandExecutor buildCommandExecutor(String wrapperScriptPath, 
-       String containerIdStr,
-     String userName, Path pidFile,File wordDir, Map<String, String> 
environment) 
-     throws IOException {
-
+  protected CommandExecutor buildCommandExecutor(String wrapperScriptPath,
+      String containerIdStr, String userName, Path pidFile, Resource resource,
+      File wordDir, Map<String, String> environment) throws IOException {
      return new WintuilsProcessStubExecutor(
          wordDir.toString(),
          containerIdStr, userName, pidFile.toString(), 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1752b659/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java
index fd3634b..dc3e941 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerExecutor.java
@@ -18,13 +18,21 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import java.util.Arrays;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 
+import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
+import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+import org.junit.Assert;
 import org.junit.Test;
+
 import static org.junit.Assert.*;
+import static org.junit.Assume.assumeTrue;
 
 public class TestContainerExecutor {
   
@@ -69,4 +77,49 @@ public class TestContainerExecutor {
     }
   }
 
+  @Test (timeout = 5000)
+  public void testRunCommandWithNoResources() {
+    // Windows only test
+    assumeTrue(Shell.WINDOWS);
+    Configuration conf = new Configuration();
+    String[] command = containerExecutor.getRunCommand("echo", "group1", null, 
null,
+        conf, Resource.newInstance(1024, 1));
+    // Assert the cpu and memory limits are set correctly in the command
+    String[] expected = { Shell.WINUTILS, "task", "create", "-m", "-1", "-c",
+        "-1", "group1", "cmd /c " + "echo" };
+    Assert.assertTrue(Arrays.equals(expected, command));
+  }
+
+  @Test (timeout = 5000)
+  public void testRunCommandWithMemoryOnlyResources() {
+    // Windows only test
+    assumeTrue(Shell.WINDOWS);
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, 
"true");
+    String[] command = containerExecutor.getRunCommand("echo", "group1", null, 
null,
+        conf, Resource.newInstance(1024, 1));
+    // Assert the cpu and memory limits are set correctly in the command
+    String[] expected = { Shell.WINUTILS, "task", "create", "-m", "1024", "-c",
+        "-1", "group1", "cmd /c " + "echo" };
+    Assert.assertTrue(Arrays.equals(expected, command));
+  }
+
+  @Test (timeout = 5000)
+  public void testRunCommandWithCpuAndMemoryResources() {
+    // Windows only test
+    assumeTrue(Shell.WINDOWS);
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_CPU_LIMIT_ENABLED, "true");
+    conf.set(YarnConfiguration.NM_WINDOWS_CONTAINER_MEMORY_LIMIT_ENABLED, 
"true");
+    String[] command = containerExecutor.getRunCommand("echo", "group1", null, 
null,
+        conf, Resource.newInstance(1024, 1));
+    float yarnProcessors = NodeManagerHardwareUtils.getContainersCores(
+        ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, conf),
+        conf);
+    int cpuRate = Math.min(10000, (int) ((1 * 10000) / yarnProcessors));
+    // Assert the cpu and memory limits are set correctly in the command
+    String[] expected = { Shell.WINUTILS, "task", "create", "-m", "1024", "-c",
+        String.valueOf(cpuRate), "group1", "cmd /c " + "echo" };
+    Assert.assertTrue(Arrays.equals(expected, command));
+  }
 }

Reply via email to