Repository: samza Updated Branches: refs/heads/master ed5be4f92 -> 1497bf6c2
SAMZA-910 - Host Affinity - Fix sporadic failures in Container Allocator tests Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1497bf6c Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1497bf6c Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1497bf6c Branch: refs/heads/master Commit: 1497bf6c2629dd165f02352480584d3b3a9e5c18 Parents: ed5be4f Author: Jacob Maes <jacob.m...@gmail.com> Authored: Fri Apr 15 14:21:16 2016 -0700 Committer: Navina Ramesh <nram...@linkedin.com> Committed: Fri Apr 15 14:21:16 2016 -0700 ---------------------------------------------------------------------- .../samza/job/yarn/TestContainerAllocator.java | 235 +++----------- .../job/yarn/TestContainerAllocatorCommon.java | 225 ++++++++++++++ .../yarn/TestHostAwareContainerAllocator.java | 306 +++++-------------- .../job/yarn/util/MockContainerListener.java | 146 ++++++--- .../yarn/util/MockContainerRequestState.java | 19 +- .../samza/job/yarn/util/MockContainerUtil.java | 12 + 6 files changed, 464 insertions(+), 479 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java index b253f98..e21aded 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocator.java @@ -19,35 +19,15 @@ package org.apache.samza.job.yarn; -import java.io.IOException; -import java.lang.reflect.Field; -import java.net.URL; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; -import org.apache.samza.config.YarnConfig; -import org.apache.samza.container.TaskName; -import org.apache.samza.coordinator.JobCoordinator; -import org.apache.samza.coordinator.server.HttpServer; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.job.model.JobModel; -import org.apache.samza.job.model.TaskModel; -import org.apache.samza.job.yarn.util.MockContainerListener; import org.apache.samza.job.yarn.util.MockContainerRequestState; -import org.apache.samza.job.yarn.util.MockContainerUtil; -import org.apache.samza.job.yarn.util.MockHttpServer; -import org.apache.samza.job.yarn.util.TestAMRMClientImpl; import org.apache.samza.job.yarn.util.TestUtil; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.servlet.ServletHolder; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -55,18 +35,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -public class TestContainerAllocator { - private static final String ANY_HOST = ContainerRequestState.ANY_HOST; - private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); +public class TestContainerAllocator extends TestContainerAllocatorCommon { - private AMRMClientAsyncImpl amRmClientAsync; - private TestAMRMClientImpl testAMRMClient; - private MockContainerRequestState requestState; - private ContainerAllocator containerAllocator; - private ContainerUtil containerUtil; - private Thread allocatorThread; - - private Config config = new MapConfig(new HashMap<String, String>() { + private final Config config = new MapConfig(new HashMap<String, String>() { { put("yarn.container.count", "1"); put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); @@ -77,68 +48,57 @@ public class TestContainerAllocator { put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); put("yarn.container.retry.count", "1"); put("yarn.container.retry.window.ms", "1999999999"); - put("yarn.allocator.sleep.ms", "10"); + put("yarn.container.request.timeout.ms", "3"); + put("yarn.allocator.sleep.ms", "1"); } }); - private SamzaAppState state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2); + @Override + protected Config getConfig() { + return config; + } - private JobCoordinator getCoordinator(int containerCount) { - Map<Integer, ContainerModel> containers = new java.util.HashMap<>(); - for (int i = 0; i < containerCount; i++) { - ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>()); - containers.put(i, container); - } - JobModel jobModel = new JobModel(config, containers); - return new JobCoordinator(jobModel, server, null); + @Override + protected MockContainerRequestState createContainerRequestState( + AMRMClientAsync<AMRMClient.ContainerRequest> amClient) { + return new MockContainerRequestState(amClient, false); } - @Before - public void setup() throws Exception { - // Create AMRMClient - testAMRMClient = new TestAMRMClientImpl( - TestUtil.getAppMasterResponse( - false, - new ArrayList<Container>(), - new ArrayList<ContainerStatus>() - )); - amRmClientAsync = TestUtil.getAMClient(testAMRMClient); + /** + * Test request containers with no containerToHostMapping makes the right number of requests + */ + @Test + public void testRequestContainersWithNoMapping() throws Exception { + int containerCount = 4; + Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>(); + for (int i = 0; i < containerCount; i++) { + containersToHostMapping.put(i, null); + } + allocatorThread.start(); - // Initialize certain state variables (mostly to avoid NPE) - state.coordinatorUrl = new URL("http://localhost:7778/"); + containerAllocator.requestContainers(containersToHostMapping); - requestState = new MockContainerRequestState(amRmClientAsync, false); - containerUtil = TestUtil.getContainerUtil(config, state); - containerAllocator = new ContainerAllocator( - amRmClientAsync, - containerUtil, - new YarnConfig(config) - ); - Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); - requestStateField.setAccessible(true); - requestStateField.set(containerAllocator, requestState); + assertNotNull(requestState); - allocatorThread = new Thread(containerAllocator); - } + assertNotNull(requestState.getRequestsQueue()); + assertTrue(requestState.getRequestsQueue().size() == 4); - @After - public void teardown() throws Exception { - containerAllocator.setIsRunning(false); - allocatorThread.join(); + // If host-affinty is not enabled, it doesn't update the requestMap + assertNotNull(requestState.getRequestsToCountMap()); + assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0); } - /** * Adds all containers returned to ANY_HOST only */ @Test public void testAddContainer() throws Exception { - assertNull(requestState.getContainersOnAHost("abc")); + assertNull(requestState.getContainersOnAHost("host1")); assertNull(requestState.getContainersOnAHost(ANY_HOST)); - containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123)); - containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "xyz", 123)); + containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123)); + containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host3", 123)); - assertNull(requestState.getContainersOnAHost("abc")); + assertNull(requestState.getContainersOnAHost("host1")); assertNotNull(requestState.getContainersOnAHost(ANY_HOST)); assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2); } @@ -150,10 +110,10 @@ public class TestContainerAllocator { public void testRequestContainers() throws Exception { Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() { { - put(0, "abc"); - put(1, "def"); + put(0, "host1"); + put(1, "host2"); put(2, null); - put(3, "abc"); + put(3, "host1"); } }; @@ -174,121 +134,4 @@ public class TestContainerAllocator { assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0); } - /** - * Test request containers with no containerToHostMapping makes the right number of requests - */ - @Test - public void testRequestContainersWithNoMapping() throws Exception { - int containerCount = 4; - Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>(); - for (int i = 0; i < containerCount; i++) { - containersToHostMapping.put(i, null); - } - allocatorThread.start(); - - containerAllocator.requestContainers(containersToHostMapping); - - assertNotNull(requestState); - - assertNotNull(requestState.getRequestsQueue()); - assertTrue(requestState.getRequestsQueue().size() == 4); - - // If host-affinty is not enabled, it doesn't update the requestMap - assertNotNull(requestState.getRequestsToCountMap()); - assertTrue(requestState.getRequestsToCountMap().keySet().size() == 0); - } - - /** - * If the container fails to start e.g because it fails to connect to a NM on a host that - * is down, the allocator should request a new container on a different host. - */ - @Test - public void testRerequestOnAnyHostIfContainerStartFails() throws Exception { - final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "2", 123); - final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "1", 123); - - ((MockContainerUtil) containerUtil).containerStartException = new IOException("You shall not... connect to the NM!"); - - // Set up our final asserts before starting the allocator thread - MockContainerListener listener = new MockContainerListener(2, 1, 2, null, new Runnable() { - @Override - public void run() { - // The failed container should be released. The successful one should not. - assertNotNull(testAMRMClient.getRelease()); - assertEquals(1, testAMRMClient.getRelease().size()); - assertTrue(testAMRMClient.getRelease().contains(container.getId())); - } - }, - new Runnable() { - @Override - public void run() { - // Test that the first request assignment had a preferred host and the retry didn't - assertEquals(2, requestState.assignedRequests.size()); - - SamzaContainerRequest request = requestState.assignedRequests.remove(); - assertEquals(0, request.expectedContainerId); - assertEquals("2", request.getPreferredHost()); - - request = requestState.assignedRequests.remove(); - assertEquals(0, request.expectedContainerId); - assertEquals("ANY_HOST", request.getPreferredHost()); - - // This routine should be called after the retry is assigned, but before it's started. - // So there should still be 1 container needed. - assertEquals(1, state.neededContainers.get()); - } - } - ); - requestState.registerContainerListener(listener); - - allocatorThread.start(); - - // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry) - containerAllocator.requestContainer(0, "2"); - containerAllocator.addContainer(container); - containerAllocator.addContainer(container1); - - listener.verify(); - } - - /** - * Extra allocated containers that are returned by the RM and unused by the AM should be released. - * Containers are considered "extra" only when there are no more pending requests to fulfill - * @throws Exception - */ - @Test - public void testAllocatorReleasesExtraContainers() throws Exception { - final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123); - final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123); - final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123); - - // Set up our final asserts before starting the allocator thread - MockContainerListener listener = new MockContainerListener(3, 2, 0, null, new Runnable() { - @Override - public void run() { - assertNotNull(testAMRMClient.getRelease()); - assertEquals(2, testAMRMClient.getRelease().size()); - assertTrue(testAMRMClient.getRelease().contains(container1.getId())); - assertTrue(testAMRMClient.getRelease().contains(container2.getId())); - - // Test that state is cleaned up - assertEquals(0, requestState.getRequestsQueue().size()); - assertEquals(0, requestState.getRequestsToCountMap().size()); - assertNull(requestState.getContainersOnAHost("abc")); - assertNull(requestState.getContainersOnAHost("def")); - } - }, null); - requestState.registerContainerListener(listener); - - allocatorThread.start(); - - containerAllocator.requestContainer(0, "abc"); - - containerAllocator.addContainer(container); - containerAllocator.addContainer(container1); - containerAllocator.addContainer(container2); - - listener.verify(); - } - } http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java new file mode 100644 index 0000000..5badd29 --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestContainerAllocatorCommon.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.samza.config.Config; +import org.apache.samza.config.YarnConfig; +import org.apache.samza.container.TaskName; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.server.HttpServer; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.JobModel; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.job.yarn.util.MockContainerListener; +import org.apache.samza.job.yarn.util.MockContainerRequestState; +import org.apache.samza.job.yarn.util.MockContainerUtil; +import org.apache.samza.job.yarn.util.MockHttpServer; +import org.apache.samza.job.yarn.util.TestAMRMClientImpl; +import org.apache.samza.job.yarn.util.TestUtil; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + + +/** + * Handles all common fields/tests for ContainerAllocators. + */ +public abstract class TestContainerAllocatorCommon { + protected static final String ANY_HOST = ContainerRequestState.ANY_HOST; + + protected final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); + + protected AMRMClientAsyncImpl amRmClientAsync; + protected TestAMRMClientImpl testAMRMClient; + protected MockContainerRequestState requestState; + protected AbstractContainerAllocator containerAllocator; + protected Thread allocatorThread; + protected ContainerUtil containerUtil; + + protected SamzaAppState state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2); + + protected abstract Config getConfig(); + protected abstract MockContainerRequestState createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient); + + private JobCoordinator getCoordinator(int containerCount) { + Map<Integer, ContainerModel> containers = new java.util.HashMap<>(); + for (int i = 0; i < containerCount; i++) { + ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>()); + containers.put(i, container); + } + JobModel jobModel = new JobModel(getConfig(), containers); + return new JobCoordinator(jobModel, server, null); + } + + + @Before + public void setup() throws Exception { + // Create AMRMClient + testAMRMClient = new TestAMRMClientImpl( + TestUtil.getAppMasterResponse( + false, + new ArrayList<Container>(), + new ArrayList<ContainerStatus>() + )); + amRmClientAsync = TestUtil.getAMClient(testAMRMClient); + + // Initialize certain state variables + state.coordinatorUrl = new URL("http://localhost:7778/"); + + containerUtil = TestUtil.getContainerUtil(getConfig(), state); + + requestState = createContainerRequestState(amRmClientAsync); + containerAllocator = new HostAwareContainerAllocator( + amRmClientAsync, + containerUtil, + new YarnConfig(getConfig()) + ); + Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); + requestStateField.setAccessible(true); + requestStateField.set(containerAllocator, requestState); + + allocatorThread = new Thread(containerAllocator); + } + + @After + public void teardown() throws Exception { + containerAllocator.setIsRunning(false); + allocatorThread.join(); + } + + /** + * If the container fails to start e.g because it fails to connect to a NM on a host that + * is down, the allocator should request a new container on a different host. + */ + @Test + public void testRerequestOnAnyHostIfContainerStartFails() throws Exception { + final Container container = TestUtil + .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "host2", 123); + final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123); + + ((MockContainerUtil) containerUtil).containerStartException = new IOException("You shall not... connect to the NM!"); + + Runnable releasedContainerAssertions = new Runnable() { + @Override + public void run() { + // The failed container should be released. The successful one should not. + assertNotNull(testAMRMClient.getRelease()); + assertEquals(1, testAMRMClient.getRelease().size()); + assertTrue(testAMRMClient.getRelease().contains(container.getId())); + } + }; + + Runnable assignedContainerAssertions = new Runnable() { + @Override + public void run() { + // Test that the first request assignment had a preferred host and the retry didn't + assertEquals(2, requestState.assignedRequests.size()); + + SamzaContainerRequest request = requestState.assignedRequests.remove(); + assertEquals(0, request.expectedContainerId); + assertEquals("host2", request.getPreferredHost()); + + request = requestState.assignedRequests.remove(); + assertEquals(0, request.expectedContainerId); + assertEquals("ANY_HOST", request.getPreferredHost()); + + // This routine should be called after the retry is assigned, but before it's started. + // So there should still be 1 container needed because neededContainers should not be decremented for a failed start. + assertEquals(1, state.neededContainers.get()); + } + }; + + // Set up our final asserts before starting the allocator thread + MockContainerListener listener = new MockContainerListener(2, 1, 2, 0, null, releasedContainerAssertions, assignedContainerAssertions, null); + requestState.registerContainerListener(listener); + state.neededContainers.set(1); // Normally this would be done in the SamzaTaskManager + + // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry) + containerAllocator.requestContainer(0, "host2"); + containerAllocator.addContainer(container); + containerAllocator.addContainer(container1); + + allocatorThread.start(); + + listener.verify(); + } + + + /** + * Extra allocated containers that are returned by the RM and unused by the AM should be released. + * Containers are considered "extra" only when there are no more pending requests to fulfill + * @throws Exception + */ + @Test + public void testAllocatorReleasesExtraContainers() throws Exception { + final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "host1", 123); + final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123); + final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host2", 123); + + Runnable releasedContainerAssertions = new Runnable() { + @Override + public void run() { + assertNotNull(testAMRMClient.getRelease()); + assertEquals(2, testAMRMClient.getRelease().size()); + assertTrue(testAMRMClient.getRelease().contains(container1.getId())); + assertTrue(testAMRMClient.getRelease().contains(container2.getId())); + + // Test that state is cleaned up + assertEquals(0, requestState.getRequestsQueue().size()); + assertEquals(0, requestState.getRequestsToCountMap().size()); + assertNull(requestState.getContainersOnAHost("host1")); + assertNull(requestState.getContainersOnAHost("host2")); + } + }; + + // Set up our final asserts before starting the allocator thread + MockContainerListener listener = new MockContainerListener(3, 2, 0, 0, null, releasedContainerAssertions, null, null); + requestState.registerContainerListener(listener); + + containerAllocator.requestContainer(0, "host1"); + + containerAllocator.addContainer(container); + containerAllocator.addContainer(container1); + containerAllocator.addContainer(container2); + + allocatorThread.start(); + + listener.verify(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java index 93e430b..ead7200 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestHostAwareContainerAllocator.java @@ -18,36 +18,20 @@ */ package org.apache.samza.job.yarn; -import java.io.IOException; -import java.lang.reflect.Field; -import java.net.URL; -import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; -import org.apache.samza.config.YarnConfig; -import org.apache.samza.container.TaskName; -import org.apache.samza.coordinator.JobCoordinator; -import org.apache.samza.coordinator.server.HttpServer; -import org.apache.samza.job.model.ContainerModel; -import org.apache.samza.job.model.JobModel; -import org.apache.samza.job.model.TaskModel; import org.apache.samza.job.yarn.util.MockContainerListener; import org.apache.samza.job.yarn.util.MockContainerRequestState; import org.apache.samza.job.yarn.util.MockContainerUtil; -import org.apache.samza.job.yarn.util.MockHttpServer; -import org.apache.samza.job.yarn.util.TestAMRMClientImpl; import org.apache.samza.job.yarn.util.TestUtil; -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.servlet.ServletHolder; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -55,19 +39,9 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -public class TestHostAwareContainerAllocator { - private static final String ANY_HOST = ContainerRequestState.ANY_HOST; +public class TestHostAwareContainerAllocator extends TestContainerAllocatorCommon { - private final HttpServer server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); - - private AMRMClientAsyncImpl amRmClientAsync; - private TestAMRMClientImpl testAMRMClient; - private MockContainerRequestState requestState; - private HostAwareContainerAllocator containerAllocator; - private Thread allocatorThread; - private ContainerUtil containerUtil; - - private Config config = new MapConfig(new HashMap<String, String>() { + private final Config config = new MapConfig(new HashMap<String, String>() { { put("yarn.container.count", "1"); put("systems.test-system.samza.factory", "org.apache.samza.job.yarn.MockSystemFactory"); @@ -84,147 +58,14 @@ public class TestHostAwareContainerAllocator { } }); - private Config getConfig() { - Map<String, String> map = new HashMap<>(); - map.putAll(config); - return new MapConfig(map); - } - - private SamzaAppState state = new SamzaAppState(getCoordinator(1), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2); - - private JobCoordinator getCoordinator(int containerCount) { - Map<Integer, ContainerModel> containers = new java.util.HashMap<>(); - for (int i = 0; i < containerCount; i++) { - ContainerModel container = new ContainerModel(i, new HashMap<TaskName, TaskModel>()); - containers.put(i, container); - } - JobModel jobModel = new JobModel(getConfig(), containers); - return new JobCoordinator(jobModel, server, null); - } - - - @Before - public void setup() throws Exception { - // Create AMRMClient - testAMRMClient = new TestAMRMClientImpl( - TestUtil.getAppMasterResponse( - false, - new ArrayList<Container>(), - new ArrayList<ContainerStatus>() - )); - amRmClientAsync = TestUtil.getAMClient(testAMRMClient); - - // Initialize certain state variables - state.coordinatorUrl = new URL("http://localhost:7778/"); - - containerUtil = TestUtil.getContainerUtil(getConfig(), state); - - requestState = new MockContainerRequestState(amRmClientAsync, true); - containerAllocator = new HostAwareContainerAllocator( - amRmClientAsync, - containerUtil, - new YarnConfig(config) - ); - Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("containerRequestState"); - requestStateField.setAccessible(true); - requestStateField.set(containerAllocator, requestState); - - allocatorThread = new Thread(containerAllocator); + @Override + protected Config getConfig() { + return config; } - @After - public void teardown() throws Exception { - containerAllocator.setIsRunning(false); - allocatorThread.join(); - } - - /** - * If the container fails to start e.g because it fails to connect to a NM on a host that - * is down, the allocator should request a new container on a different host. - */ - @Test - public void testRerequestOnAnyHostIfContainerStartFails() throws Exception { - final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "2", 123); - final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "1", 123); - - ((MockContainerUtil) containerUtil).containerStartException = new IOException("You shall not... connect to the NM!"); - - // Set up our final asserts before starting the allocator thread - MockContainerListener listener = new MockContainerListener(2, 1, 2, null, new Runnable() { - @Override - public void run() { - // The failed container should be released. The successful one should not. - assertNotNull(testAMRMClient.getRelease()); - assertEquals(1, testAMRMClient.getRelease().size()); - assertTrue(testAMRMClient.getRelease().contains(container.getId())); - } - }, - new Runnable() { - @Override - public void run() { - // Test that the first request assignment had a preferred host and the retry didn't - assertEquals(2, requestState.assignedRequests.size()); - - SamzaContainerRequest request = requestState.assignedRequests.remove(); - assertEquals(0, request.expectedContainerId); - assertEquals("2", request.getPreferredHost()); - - request = requestState.assignedRequests.remove(); - assertEquals(0, request.expectedContainerId); - assertEquals("ANY_HOST", request.getPreferredHost()); - - // This routine should be called after the retry is assigned, but before it's started. - // So there should still be 1 container needed. - assertEquals(1, state.neededContainers.get()); - } - } - ); - requestState.registerContainerListener(listener); - - // Only request 1 container and we should see 2 assignments in the assertions above (because of the retry) - containerAllocator.requestContainer(0, "2"); - containerAllocator.addContainer(container1); - containerAllocator.addContainer(container); - - allocatorThread.start(); - - listener.verify(); - } - - @Test - public void testAllocatorReleasesExtraContainers() throws Exception { - final Container container = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "abc", 123); - final Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123); - final Container container2 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "def", 123); - - // Set up our final asserts before starting the allocator thread - MockContainerListener listener = new MockContainerListener(3, 2, 0, null, new Runnable() { - @Override - public void run() { - assertNotNull(testAMRMClient.getRelease()); - assertEquals(2, testAMRMClient.getRelease().size()); - assertTrue(testAMRMClient.getRelease().contains(container1.getId())); - assertTrue(testAMRMClient.getRelease().contains(container2.getId())); - - // Test that state is cleaned up - assertEquals(0, requestState.getRequestsQueue().size()); - assertEquals(0, requestState.getRequestsToCountMap().size()); - assertNull(requestState.getContainersOnAHost("abc")); - assertNull(requestState.getContainersOnAHost("def")); - } - }, - null); - requestState.registerContainerListener(listener); - - allocatorThread.start(); - - containerAllocator.requestContainer(0, "abc"); - - containerAllocator.addContainer(container); - containerAllocator.addContainer(container1); - containerAllocator.addContainer(container2); - - listener.verify(); + @Override + protected MockContainerRequestState createContainerRequestState(AMRMClientAsync<AMRMClient.ContainerRequest> amClient) { + return new MockContainerRequestState(amClient, true); } /** @@ -259,28 +100,28 @@ public class TestHostAwareContainerAllocator { public void testAddContainerWithHostAffinity() throws Exception { containerAllocator.requestContainers(new HashMap<Integer, String>() { { - put(0, "abc"); - put(1, "xyz"); + put(0, "host1"); + put(1, "host3"); } }); - assertNotNull(requestState.getContainersOnAHost("abc")); - assertEquals(0, requestState.getContainersOnAHost("abc").size()); + assertNotNull(requestState.getContainersOnAHost("host1")); + assertEquals(0, requestState.getContainersOnAHost("host1").size()); - assertNotNull(requestState.getContainersOnAHost("xyz")); - assertEquals(0, requestState.getContainersOnAHost("xyz").size()); + assertNotNull(requestState.getContainersOnAHost("host3")); + assertEquals(0, requestState.getContainersOnAHost("host3").size()); assertNull(requestState.getContainersOnAHost(ANY_HOST)); - containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "abc", 123)); - containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), "def", 123)); - containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "xyz", 123)); + containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "host1", 123)); + containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000004"), "host2", 123)); + containerAllocator.addContainer(TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "host3", 123)); - assertNotNull(requestState.getContainersOnAHost("abc")); - assertEquals(1, requestState.getContainersOnAHost("abc").size()); + assertNotNull(requestState.getContainersOnAHost("host1")); + assertEquals(1, requestState.getContainersOnAHost("host1").size()); - assertNotNull(requestState.getContainersOnAHost("xyz")); - assertEquals(1, requestState.getContainersOnAHost("xyz").size()); + assertNotNull(requestState.getContainersOnAHost("host3")); + assertEquals(1, requestState.getContainersOnAHost("host3").size()); assertNotNull(requestState.getContainersOnAHost(ANY_HOST)); assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 1); @@ -293,10 +134,10 @@ public class TestHostAwareContainerAllocator { public void testRequestContainers() throws Exception { Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() { { - put(0, "abc"); - put(1, "def"); + put(0, "host1"); + put(1, "host2"); put(2, null); - put(3, "abc"); + put(3, "host1"); } }; allocatorThread.start(); @@ -314,11 +155,11 @@ public class TestHostAwareContainerAllocator { assertNotNull(requestState.getRequestsToCountMap()); Map<String, AtomicInteger> requestsMap = requestState.getRequestsToCountMap(); - assertNotNull(requestsMap.get("abc")); - assertEquals(2, requestsMap.get("abc").get()); + assertNotNull(requestsMap.get("host1")); + assertEquals(2, requestsMap.get("host1").get()); - assertNotNull(requestsMap.get("def")); - assertEquals(1, requestsMap.get("def").get()); + assertNotNull(requestsMap.get("host2")); + assertEquals(1, requestsMap.get("host2").get()); assertNotNull(requestsMap.get(ANY_HOST)); assertEquals(1, requestsMap.get(ANY_HOST).get()); @@ -331,8 +172,8 @@ public class TestHostAwareContainerAllocator { public void testExpiredRequestHandling() throws Exception { Map<Integer, String> containersToHostMapping = new HashMap<Integer, String>() { { - put(0, "abc"); - put(1, "def"); + put(0, "requestedHost1"); + put(1, "requestedHost2"); } }; containerAllocator.requestContainers(containersToHostMapping); @@ -341,51 +182,72 @@ public class TestHostAwareContainerAllocator { assertTrue(requestState.getRequestsQueue().size() == 2); assertNotNull(requestState.getRequestsToCountMap()); - assertNotNull(requestState.getRequestsToCountMap().get("abc")); - assertTrue(requestState.getRequestsToCountMap().get("abc").get() == 1); + assertNotNull(requestState.getRequestsToCountMap().get("requestedHost1")); + assertTrue(requestState.getRequestsToCountMap().get("requestedHost1").get() == 1); - assertNotNull(requestState.getRequestsToCountMap().get("def")); - assertTrue(requestState.getRequestsToCountMap().get("def").get() == 1); + assertNotNull(requestState.getRequestsToCountMap().get("requestedHost2")); + assertTrue(requestState.getRequestsToCountMap().get("requestedHost2").get() == 1); - // Set up our final asserts before starting the allocator thread - MockContainerListener listener = new MockContainerListener(2, 0, 0, new Runnable() { + final Container container0 = TestUtil + .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "availableHost1", 123); + final Container container1 = TestUtil + .getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "availableHost2", 123); + + Runnable addedContainerAssertions = new Runnable() { @Override public void run() { - assertNull(requestState.getContainersOnAHost("xyz")); - assertNull(requestState.getContainersOnAHost("zzz")); + assertNotNull(requestState.getRequestsToCountMap()); + assertNull(requestState.getContainersOnAHost("availableHost1")); + assertNull(requestState.getContainersOnAHost("availableHost2")); assertNotNull(requestState.getContainersOnAHost(ANY_HOST)); assertTrue(requestState.getContainersOnAHost(ANY_HOST).size() == 2); } - }, null, null); - requestState.registerContainerListener(listener); - - allocatorThread.start(); - - Container container0 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "xyz", 123); - Container container1 = TestUtil.getContainer(ConverterUtils.toContainerId("container_1350670447861_0003_01_000003"), "zzz", 123); - containerAllocator.addContainer(container0); - containerAllocator.addContainer(container1); + }; - listener.verify(); + Runnable assignedContainerAssertions = new Runnable() { + @Override + public void run() { + List<Container> anyHostContainers = requestState.getContainersOnAHost(ANY_HOST); + assertTrue(anyHostContainers == null || anyHostContainers.isEmpty()); - Thread.sleep(1000); + assertNotNull(requestState.getRequestsQueue()); + assertTrue(requestState.getRequestsQueue().size() == 0); - MockContainerUtil mockContainerUtil = (MockContainerUtil) containerUtil; + assertNotNull(requestState.getRequestsToCountMap()); + assertNotNull(requestState.getRequestsToCountMap().get("requestedHost1")); + assertNotNull(requestState.getRequestsToCountMap().get("requestedHost2")); + } + }; - assertNotNull(mockContainerUtil.runningContainerList.get("xyz")); - assertTrue(mockContainerUtil.runningContainerList.get("xyz").contains(container0)); + Runnable runningContainerAssertions = new Runnable() { + @Override + public void run() { + MockContainerUtil mockContainerUtil = (MockContainerUtil) containerUtil; - assertNotNull(mockContainerUtil.runningContainerList.get("zzz")); - assertTrue(mockContainerUtil.runningContainerList.get("zzz").contains(container1)); + assertNotNull(mockContainerUtil.runningContainerList.get("availableHost1")); + assertTrue(mockContainerUtil.runningContainerList.get("availableHost1").contains(container0)); - assertNull(requestState.getContainersOnAHost(ANY_HOST)); + assertNotNull(mockContainerUtil.runningContainerList.get("availableHost2")); + assertTrue(mockContainerUtil.runningContainerList.get("availableHost2").contains(container1)); + } + }; + // Set up our final asserts before starting the allocator thread + MockContainerListener listener = new MockContainerListener( + 2, 0, 2, 2, + addedContainerAssertions, + null, + assignedContainerAssertions, + runningContainerAssertions); + requestState.registerContainerListener(listener); + ((MockContainerUtil) containerUtil).registerContainerListener(listener); - assertNotNull(requestState.getRequestsQueue()); - assertTrue(requestState.getRequestsQueue().size() == 0); + containerAllocator.addContainer(container0); + containerAllocator.addContainer(container1); - assertNotNull(requestState.getRequestsToCountMap()); - assertNull(requestState.getRequestsToCountMap().get("abc")); + // Start after adding containers to avoid a race condition between the allocator thread + // using the containers and the assertions after the containers are added. + allocatorThread.start(); - assertNull(requestState.getRequestsToCountMap().get("def")); + listener.verify(); } } http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java index cb82ccc..43bda8f 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerListener.java @@ -19,82 +19,130 @@ package org.apache.samza.job.yarn.util; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.yarn.api.records.Container; -import org.apache.samza.job.yarn.SamzaContainerRequest; -import scala.tools.nsc.Global; import static org.junit.Assert.assertTrue; public class MockContainerListener { - private static final int NUM_CONDITIONS = 3; - private boolean allContainersAdded = false; - private boolean allContainersReleased = false; - private final int numExpectedContainersAdded; - private final int numExpectedContainersReleased; - private final int numExpectedContainersAssigned; - private final Runnable addContainerAssertions; - private final Runnable releaseContainerAssertions; - private final Runnable assignContainerAssertions; + private final CountDownLatch conditionLatch; + + + private final AsyncCountableCondition containersAdded; + private final AsyncCountableCondition containersReleased; + private final AsyncCountableCondition containersAssigned; + private final AsyncCountableCondition containersRunning; + + private final AsyncCountableCondition[] allConditions; public MockContainerListener(int numExpectedContainersAdded, int numExpectedContainersReleased, int numExpectedContainersAssigned, + int numExpectedContainersRunning, Runnable addContainerAssertions, Runnable releaseContainerAssertions, - Runnable assignContainerAssertions) { - this.numExpectedContainersAdded = numExpectedContainersAdded; - this.numExpectedContainersReleased = numExpectedContainersReleased; - this.numExpectedContainersAssigned = numExpectedContainersAssigned; - this.addContainerAssertions = addContainerAssertions; - this.releaseContainerAssertions = releaseContainerAssertions; - this.assignContainerAssertions = assignContainerAssertions; - } + Runnable assignContainerAssertions, + Runnable runContainerAssertions) { + containersAdded = new AsyncCountableCondition("containers added", numExpectedContainersAdded, addContainerAssertions); + containersReleased = new AsyncCountableCondition("containers released", numExpectedContainersReleased, releaseContainerAssertions); + containersAssigned = new AsyncCountableCondition("containers assigned", numExpectedContainersAssigned, assignContainerAssertions); + containersRunning = new AsyncCountableCondition("containers running", numExpectedContainersRunning, runContainerAssertions); - public synchronized void postAddContainer(Container container, int totalAddedContainers) { - if (totalAddedContainers == numExpectedContainersAdded) { - if (addContainerAssertions != null) { - addContainerAssertions.run(); + allConditions = new AsyncCountableCondition[] {containersAdded, containersReleased, containersAssigned, containersRunning}; + + int unsatisfiedConditions = 0; + for (AsyncCountableCondition condition : allConditions) { + if (!condition.isSatisfied()) { + unsatisfiedConditions++; } + } + + conditionLatch = new CountDownLatch(unsatisfiedConditions); + } - allContainersAdded = true; - this.notifyAll(); + public void postAddContainer(int totalAddedContainers) { + if (containersAdded.update(totalAddedContainers)) { + conditionLatch.countDown(); } } - public synchronized void postReleaseContainers(int totalReleasedContainers) { - if (totalReleasedContainers == numExpectedContainersReleased) { - if (releaseContainerAssertions != null) { - releaseContainerAssertions.run(); - } + public void postReleaseContainers(int totalReleasedContainers) { + if (containersReleased.update(totalReleasedContainers)) { + conditionLatch.countDown(); + } + } - allContainersReleased = true; - this.notifyAll(); + public void postUpdateRequestStateAfterAssignment(int totalAssignedContainers) { + if (containersAssigned.update(totalAssignedContainers)) { + conditionLatch.countDown(); } } - public synchronized void verify() { - // There could be 1 notifyAll() for each condition, so we must wait up to that many times - for (int i = 0; i < NUM_CONDITIONS && !(allContainersAdded && allContainersReleased); i++) { - try { - this.wait(5000); - } catch (InterruptedException e) { - // Do nothing - } + public void postRunContainer(int totalRunningContainers) { + if (containersRunning.update(totalRunningContainers)) { + conditionLatch.countDown(); } + } - assertTrue("Not all containers were added.", allContainersAdded); - assertTrue("Not all containers were released.", allContainersReleased); + /** + * This method should be called in the main thread. It waits for all the conditions to occur in the other + * threads and then verifies that they were in fact satisfied. + */ + public void verify() + throws InterruptedException { + conditionLatch.await(5, TimeUnit.SECONDS); + + for (AsyncCountableCondition condition : allConditions) { + condition.verify(); + } } - public void postUpdateRequestStateAfterAssignment(int totalAssignedContainers) { - if (totalAssignedContainers == numExpectedContainersAssigned) { - if (assignContainerAssertions != null) { - assignContainerAssertions.run(); + private static class AsyncCountableCondition { + private boolean satisfied = false; + private final int expectedCount; + private final Runnable postConditionAssertions; + private final String name; + private AssertionError assertionError = null; + + private AsyncCountableCondition(String name, int expectedCount, Runnable postConditionAssertions) { + this.name = name; + this.expectedCount = expectedCount; + if (expectedCount == 0) satisfied = true; + this.postConditionAssertions = postConditionAssertions; + } + + public boolean update(int latestCount) { + if (!satisfied && latestCount == expectedCount) { + if (postConditionAssertions != null) { + try { + postConditionAssertions.run(); + } catch (Throwable t) { + assertionError = new AssertionError(String.format("Assertion for '%s' failed", name), t); + } + } + + satisfied = true; + return true; } + return false; + } + + public boolean isSatisfied() { + return satisfied; + } + + public void verify() { + assertTrue(String.format("Condition '%s' was not satisfied", name), isSatisfied()); + + if (assertionError != null) { + throw assertionError; + } + } - this.notifyAll(); + @Override + public String toString() { + return name; } } } http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java index 879a7d0..7c0b504 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerRequestState.java @@ -30,7 +30,7 @@ import org.apache.samza.job.yarn.SamzaContainerRequest; public class MockContainerRequestState extends ContainerRequestState { - private final List<MockContainerListener> _mockContainerListeners = new ArrayList<MockContainerListener>(); + private final List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>(); private int numAddedContainers = 0; private int numReleasedContainers = 0; private int numAssignedContainers = 0; @@ -48,7 +48,7 @@ public class MockContainerRequestState extends ContainerRequestState { numAssignedContainers++; assignedRequests.add(request); - for (MockContainerListener listener : _mockContainerListeners) { + for (MockContainerListener listener : mockContainerListeners) { listener.postUpdateRequestStateAfterAssignment(numAssignedContainers); } } @@ -58,8 +58,8 @@ public class MockContainerRequestState extends ContainerRequestState { super.addContainer(container); numAddedContainers++; - for (MockContainerListener listener : _mockContainerListeners) { - listener.postAddContainer(container, numAddedContainers); + for (MockContainerListener listener : mockContainerListeners) { + listener.postAddContainer(numAddedContainers); } } @@ -67,7 +67,7 @@ public class MockContainerRequestState extends ContainerRequestState { public synchronized int releaseExtraContainers() { numReleasedContainers += super.releaseExtraContainers(); - for (MockContainerListener listener : _mockContainerListeners) { + for (MockContainerListener listener : mockContainerListeners) { listener.postReleaseContainers(numReleasedContainers); } @@ -79,17 +79,12 @@ public class MockContainerRequestState extends ContainerRequestState { super.releaseUnstartableContainer(container); numReleasedContainers += 1; - for (MockContainerListener listener : _mockContainerListeners) { + for (MockContainerListener listener : mockContainerListeners) { listener.postReleaseContainers(numReleasedContainers); } } public void registerContainerListener(MockContainerListener listener) { - _mockContainerListeners.add(listener); + mockContainerListeners.add(listener); } - - public void clearContainerListeners() { - _mockContainerListeners.clear(); - } - } http://git-wip-us.apache.org/repos/asf/samza/blob/1497bf6c/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java index 2f9669f..cf3e143 100644 --- a/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/util/MockContainerUtil.java @@ -33,6 +33,7 @@ import org.apache.samza.job.yarn.SamzaContainerLaunchException; public class MockContainerUtil extends ContainerUtil { + private final List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>(); public final Map<String, List<Container>> runningContainerList = new HashMap<>(); public Exception containerStartException = null; @@ -54,6 +55,10 @@ public class MockContainerUtil extends ContainerUtil { runningContainerList.put(hostname, list); } super.runContainer(samzaContainerId, container); + + for (MockContainerListener listener : mockContainerListeners) { + listener.postRunContainer(runningContainerList.size()); + } } @Override @@ -64,4 +69,11 @@ public class MockContainerUtil extends ContainerUtil { } } + public void registerContainerListener(MockContainerListener listener) { + mockContainerListeners.add(listener); + } + + public void clearContainerListeners() { + mockContainerListeners.clear(); + } }