Repository: samza Updated Branches: refs/heads/master dfdc35e7e -> b14da282c
SAMZA-792: SamzaAppMaster Java code needs to pass the requested container memory size to RM Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b14da282 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b14da282 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b14da282 Branch: refs/heads/master Commit: b14da282cd92d28252a77adb80aa038ba0a66bc9 Parents: dfdc35e Author: Navina <navi.trin...@gmail.com> Authored: Thu Oct 8 16:16:26 2015 -0700 Committer: Navina <navi.trin...@gmail.com> Committed: Thu Oct 8 16:16:26 2015 -0700 ---------------------------------------------------------------------- .../job/yarn/AbstractContainerAllocator.java | 23 +++++++---- .../samza/job/yarn/ContainerAllocator.java | 7 ++-- .../job/yarn/HostAwareContainerAllocator.java | 10 ++--- .../samza/job/yarn/SamzaContainerRequest.java | 1 + .../apache/samza/job/yarn/SamzaTaskManager.java | 6 +-- .../samza/job/yarn/TestContainerAllocator.java | 5 ++- .../yarn/TestHostAwareContainerAllocator.java | 8 ++-- .../samza/job/yarn/TestSamzaTaskManager.java | 43 ++++++++++++++++---- .../job/yarn/util/MockContainerAllocator.java | 7 ++-- 9 files changed, 68 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java index eec1708..6edd477 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java @@ -21,11 +21,7 @@ package org.apache.samza.job.yarn; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.samza.util.Util; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; +import org.apache.samza.config.YarnConfig; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -46,6 +42,8 @@ public abstract class AbstractContainerAllocator implements Runnable { protected final AMRMClientAsync<AMRMClient.ContainerRequest> amClient; protected final int ALLOCATOR_SLEEP_TIME; protected final ContainerUtil containerUtil; + protected final int containerMaxMemoryMb; + protected final int containerMaxCpuCore; @Override public abstract void run(); @@ -58,12 +56,14 @@ public abstract class AbstractContainerAllocator implements Runnable { public AbstractContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient, ContainerUtil containerUtil, - int allocatorSleepTime, - ContainerRequestState containerRequestState) { + ContainerRequestState containerRequestState, + YarnConfig yarnConfig) { this.amClient = amClient; this.containerUtil = containerUtil; - this.ALLOCATOR_SLEEP_TIME = allocatorSleepTime; + this.ALLOCATOR_SLEEP_TIME = yarnConfig.getAllocatorSleepTime(); this.containerRequestState = containerRequestState; + this.containerMaxMemoryMb = yarnConfig.getContainerMaxMemoryMb(); + this.containerMaxCpuCore = yarnConfig.getContainerMaxCpuCores(); } @@ -93,7 +93,12 @@ public abstract class AbstractContainerAllocator implements Runnable { * @param preferredHost Name of the host that you prefer to run the container on */ public final void requestContainer(int expectedContainerId, String preferredHost) { - SamzaContainerRequest request = new SamzaContainerRequest(expectedContainerId, preferredHost); + SamzaContainerRequest request = new SamzaContainerRequest( + containerMaxMemoryMb, + containerMaxCpuCore, + DEFAULT_PRIORITY, + expectedContainerId, + preferredHost); containerRequestState.updateRequestState(request); } http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java index 9911540..7c57a86 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerAllocator.java @@ -21,11 +21,10 @@ package org.apache.samza.job.yarn; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.samza.config.YarnConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.List; -import java.util.Map; /** * This is the default allocator thread that will be used by SamzaTaskManager. @@ -38,8 +37,8 @@ public class ContainerAllocator extends AbstractContainerAllocator { public ContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient, ContainerUtil containerUtil, - int allocatorSleepTime) { - super(amClient, containerUtil, allocatorSleepTime, new ContainerRequestState(amClient, false)); + YarnConfig yarnConfig) { + super(amClient, containerUtil, new ContainerRequestState(amClient, false), yarnConfig); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java index e3b5868..ad1587d 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/HostAwareContainerAllocator.java @@ -21,11 +21,10 @@ package org.apache.samza.job.yarn; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.samza.config.YarnConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.List; -import java.util.Map; /** * This is the allocator thread that will be used by SamzaTaskManager when host-affinity is enabled for a job. It is similar to {@link org.apache.samza.job.yarn.ContainerAllocator}, except that it considers container locality for allocation. @@ -43,10 +42,9 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator { public HostAwareContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amClient, ContainerUtil containerUtil, - int allocatorSleepTime, - int containerRequestTimeout) { - super(amClient, containerUtil, allocatorSleepTime, new ContainerRequestState(amClient, true)); - this.CONTAINER_REQUEST_TIMEOUT = containerRequestTimeout; + YarnConfig yarnConfig) { + super(amClient, containerUtil, new ContainerRequestState(amClient, true), yarnConfig); + this.CONTAINER_REQUEST_TIMEOUT = yarnConfig.getContainerRequestTimeout(); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java index 9441d77..a84e53f 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaContainerRequest.java @@ -63,6 +63,7 @@ public class SamzaContainerRequest implements Comparable<SamzaContainerRequest> this.requestTimestamp = System.currentTimeMillis(); } + // Convenience class for unit testing public SamzaContainerRequest(int expectedContainerId, String preferredHost) { this( AbstractContainerAllocator.DEFAULT_CONTAINER_MEM, http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java index 12f2f2c..d17ffe0 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java @@ -32,7 +32,6 @@ import org.apache.samza.config.YarnConfig; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.HashMap; import java.util.Map; @@ -84,14 +83,13 @@ class SamzaTaskManager implements YarnAppMasterListener { this.containerAllocator = new HostAwareContainerAllocator( amClient, new ContainerUtil(config, state, conf), - yarnConfig.getAllocatorSleepTime(), - yarnConfig.getContainerRequestTimeout() + yarnConfig ); } else { this.containerAllocator = new ContainerAllocator( amClient, new ContainerUtil(config, state, conf), - yarnConfig.getAllocatorSleepTime()); + yarnConfig); } this.allocatorThread = new Thread(this.containerAllocator, "Container Allocator Thread"); http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java index 01f32a4..b20e351 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.YarnConfig; import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.server.HttpServer; @@ -53,7 +54,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class TestContainerAllocator { - private final int ALLOCATOR_SLEEP_TIME = 10; private static final String ANY_HOST = ContainerRequestState.ANY_HOST; private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); @@ -74,6 +74,7 @@ public class TestContainerAllocator { put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); put("yarn.container.retry.count", "1"); put("yarn.container.retry.window.ms", "1999999999"); + put("yarn.allocator.sleep.ms", "10"); } }); @@ -107,7 +108,7 @@ public class TestContainerAllocator { containerAllocator = new ContainerAllocator( amRmClientAsync, TestUtil.getContainerUtil(config, state), - ALLOCATOR_SLEEP_TIME + new YarnConfig(config) ); Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); requestStateField.setAccessible(true); http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java index 663ea25..08e53aa 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.YarnConfig; import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.server.HttpServer; @@ -54,8 +55,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class TestHostAwareContainerAllocator { - private static final int ALLOCATOR_SLEEP_TIME = 1; - private static final int CONTAINER_REQUEST_TIMEOUT = 3; private static final String ANY_HOST = ContainerRequestState.ANY_HOST; private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); @@ -79,6 +78,8 @@ public class TestHostAwareContainerAllocator { put("yarn.container.retry.count", "1"); put("yarn.container.retry.window.ms", "1999999999"); put("yarn.samza.host-affinity.enabled", "true"); + put("yarn.container.request.timeout.ms", "3"); + put("yarn.allocator.sleep.ms", "1"); } }); @@ -121,8 +122,7 @@ public class TestHostAwareContainerAllocator { containerAllocator = new HostAwareContainerAllocator( amRmClientAsync, containerUtil, - ALLOCATOR_SLEEP_TIME, - CONTAINER_REQUEST_TIMEOUT + new YarnConfig(config) ); Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); requestStateField.setAccessible(true); http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java index 4c1eaa9..b12ae5c 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestSamzaTaskManager.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.YarnConfig; import org.apache.samza.container.LocalityManager; import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.JobCoordinator; @@ -134,8 +135,13 @@ public class TestSamzaTaskManager { @Test public void testSamzaTaskManager() throws Exception { + Map<String, String> conf = new HashMap<>(); + conf.putAll(getConfig()); + conf.put("yarn.container.memory.mb", "500"); + conf.put("yarn.container.cpu.cores", "5"); + SamzaTaskManager taskManager = new SamzaTaskManager( - getConfig(), + new MapConfig(conf), state, amRmClientAsync, new YarnConfiguration() @@ -143,9 +149,17 @@ public class TestSamzaTaskManager { AbstractContainerAllocator allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager); assertEquals(ContainerAllocator.class, allocator.getClass()); + // Asserts that samza exposed container configs is honored by allocator thread + assertEquals(500, allocator.containerMaxMemoryMb); + assertEquals(5, allocator.containerMaxCpuCore); + + conf.clear(); + conf.putAll(getConfigWithHostAffinity()); + conf.put("yarn.container.memory.mb", "500"); + conf.put("yarn.container.cpu.cores", "5"); taskManager = new SamzaTaskManager( - getConfigWithHostAffinity(), + new MapConfig(conf), state, amRmClientAsync, new YarnConfiguration() @@ -153,12 +167,21 @@ public class TestSamzaTaskManager { allocator = (AbstractContainerAllocator) getPrivateFieldFromTaskManager("containerAllocator", taskManager).get(taskManager); assertEquals(HostAwareContainerAllocator.class, allocator.getClass()); + // Asserts that samza exposed container configs is honored by allocator thread + assertEquals(500, allocator.containerMaxMemoryMb); + assertEquals(5, allocator.containerMaxCpuCore); + } + + @Test + public void testContainerConfigsAreHonoredInAllocator() { + } @Test public void testOnInit() throws Exception { + Config conf = getConfig(); SamzaTaskManager taskManager = new SamzaTaskManager( - getConfig(), + conf, state, amRmClientAsync, new YarnConfiguration() @@ -167,7 +190,7 @@ public class TestSamzaTaskManager { MockContainerAllocator allocator = new MockContainerAllocator( amRmClientAsync, TestUtil.getContainerUtil(getConfig(), state), - 1); + new YarnConfig(conf)); getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); getPrivateFieldFromTaskManager("allocatorThread", taskManager).set(taskManager, new Thread() { @@ -238,8 +261,9 @@ public class TestSamzaTaskManager { */ @Test public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception { + Config conf = getConfig(); SamzaTaskManager taskManager = new SamzaTaskManager( - getConfig(), + conf, state, amRmClientAsync, new YarnConfiguration() @@ -247,7 +271,7 @@ public class TestSamzaTaskManager { MockContainerAllocator allocator = new MockContainerAllocator( amRmClientAsync, TestUtil.getContainerUtil(getConfig(), state), - 1); + new YarnConfig(conf)); getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); Thread thread = new Thread(allocator); @@ -303,8 +327,9 @@ public class TestSamzaTaskManager { */ @Test public void testSameContainerRequestedOnFailureWithUnknownCode() throws Exception { + Config conf = getConfigWithHostAffinity(); SamzaTaskManager taskManager = new SamzaTaskManager( - getConfigWithHostAffinity(), + conf, state, amRmClientAsync, new YarnConfiguration() @@ -312,7 +337,7 @@ public class TestSamzaTaskManager { MockContainerAllocator allocator = new MockContainerAllocator( amRmClientAsync, TestUtil.getContainerUtil(getConfig(), state), - 1); + new YarnConfig(conf)); getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); Thread thread = new Thread(allocator); @@ -381,7 +406,7 @@ public class TestSamzaTaskManager { MockContainerAllocator allocator = new MockContainerAllocator( amRmClientAsync, TestUtil.getContainerUtil(getConfig(), state), - 1); + new YarnConfig(new MapConfig(config))); getPrivateFieldFromTaskManager("containerAllocator", taskManager).set(taskManager, allocator); Thread thread = new Thread(allocator); http://git-wip-us.apache.org/repos/asf/samza/blob/b14da282/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java index 85f871a..5fcad82 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerAllocator.java @@ -20,9 +20,8 @@ package org.apache.samza.job.yarn.util; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.samza.job.yarn.AbstractContainerAllocator; +import org.apache.samza.config.YarnConfig; import org.apache.samza.job.yarn.ContainerAllocator; -import org.apache.samza.job.yarn.ContainerRequestState; import org.apache.samza.job.yarn.ContainerUtil; import java.util.Map; @@ -32,8 +31,8 @@ public class MockContainerAllocator extends ContainerAllocator { public MockContainerAllocator(AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync, ContainerUtil containerUtil, - int allocatorSleepTime) { - super(amrmClientAsync, containerUtil, allocatorSleepTime); + YarnConfig yarnConfig) { + super(amrmClientAsync, containerUtil, yarnConfig); } @Override