Repository: samza Updated Branches: refs/heads/master 96c333469 -> b6219f181
SAMZA-1649: Improve host-aware allocation to account for strict locality Currently working on a doc for the behavior of the CapacityScheduler and further testing is needed on an actual cluster - but here's a summary of why we should set relax-locality = false: - Node-local requests are honored only when relax-locality = false - With relax-locality = true, the scheduler biases interactivity over data-locality for requests that ask for few resources relative to the size of the cluster. In addition to the above change, this PR also modifies the allocator algorithm to fallback to "ANY_HOST" requests so that we make progress when the node is unavailable. Author: Jagadish <jvenkatra...@linkedin.com> Reviewers: Prateek Maheshwari <pmahe...@linkedin.com> Closes #471 from vjagadish/relax-locality-fix Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b6219f18 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b6219f18 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b6219f18 Branch: refs/heads/master Commit: b6219f181ff54e79aedcf6f1b7fff264925a8af7 Parents: 96c3334 Author: Jagadish <jvenkatra...@linkedin.com> Authored: Thu Apr 12 19:23:44 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Thu Apr 12 19:23:44 2018 -0700 ---------------------------------------------------------------------- .../HostAwareContainerAllocator.java | 17 ++- .../clustermanager/ResourceRequestState.java | 18 +++ .../MockClusterResourceManager.java | 39 ++++-- .../TestHostAwareContainerAllocator.java | 124 +++++++++++++++++-- .../job/yarn/YarnClusterResourceManager.java | 2 +- 5 files changed, 169 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java index 66e2246..fe462e7 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java @@ -78,13 +78,18 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator { boolean expired = requestExpired(request); boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST); - if (expired && resourceAvailableOnAnyHost) { - log.info("Request expired. running on ANY_HOST"); - runStreamProcessor(request, ResourceRequestState.ANY_HOST); + if (expired) { + if (resourceAvailableOnAnyHost) { + log.info("Request for container: {} on {} has expired. Running on ANY_HOST", request.getContainerID(), request.getPreferredHost()); + runStreamProcessor(request, ResourceRequestState.ANY_HOST); + } else { + log.info("Request for container: {} on {} has expired. Requesting additional resources on ANY_HOST.", request.getContainerID(), request.getPreferredHost()); + resourceRequestState.cancelResourceRequest(request); + requestResource(containerID, ResourceRequestState.ANY_HOST); + } } else { - log.info("Either the request timestamp {} is greater than resource request timeout {}ms or we couldn't " - + "find any free allocated resources in the buffer. Breaking out of loop.", - request.getRequestTimestampMs(), requestTimeout); + log.info("Request for container: {} on {} has not yet expired. Request creation time: {}. Request timeout: {}", + new Object[]{request.getContainerID(), request.getPreferredHost(), request.getRequestTimestampMs(), requestTimeout}); break; } } http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java index fe2067c..51caa39 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java @@ -102,6 +102,24 @@ public class ResourceRequestState { } /** + * Cancels a {@link SamzaResourceRequest} previously submitted to the {@link ClusterResourceManager} + * + * @param request {@link SamzaResourceRequest} to cancel + */ + public void cancelResourceRequest(SamzaResourceRequest request) { + log.info("Canceling resource request on {} for {}", request.getPreferredHost(), request.getContainerID()); + synchronized (lock) { + requestsQueue.remove(request); + if (hostAffinityEnabled) { + // assignedHost may not always be the preferred host. + // Hence, we should safely decrement the counter for the preferredHost + requestsToCountMap.get(request.getPreferredHost()).decrementAndGet(); + } + manager.cancelResourceRequest(request); + } + } + + /** * Invoked each time a resource is returned from a {@link ClusterResourceManager}. * @param samzaResource The resource that was returned from the {@link ClusterResourceManager} */ http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java index 452cadc..471c7fe 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java @@ -23,20 +23,27 @@ import com.google.common.collect.ImmutableList; import org.apache.samza.job.CommandBuilder; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; public class MockClusterResourceManager extends ClusterResourceManager { - Set<SamzaResource> releasedResources = new HashSet<>(); - List<SamzaResource> resourceRequests = new ArrayList<>(); - List<SamzaResourceRequest> cancelledRequests = new ArrayList<>(); - List<SamzaResource> launchedResources = new ArrayList<>(); - List<MockContainerListener> mockContainerListeners = new ArrayList<MockContainerListener>(); + final Set<SamzaResource> releasedResources = Collections.synchronizedSet(new HashSet<>()); + final List<SamzaResource> resourceRequests = Collections.synchronizedList(new ArrayList<>()); + final List<SamzaResourceRequest> cancelledRequests = Collections.synchronizedList(new ArrayList<>()); + final List<SamzaResource> launchedResources = Collections.synchronizedList(new ArrayList<>()); + final List<MockContainerListener> mockContainerListeners = Collections.synchronizedList(new ArrayList<>()); + + private final Semaphore requestCountSemaphore = new Semaphore(0); + private final Semaphore launchCountSemaphore = new Semaphore(0); + Throwable nextException = null; - public MockClusterResourceManager(ClusterResourceManager.Callback callback) { + MockClusterResourceManager(ClusterResourceManager.Callback callback) { super(callback); } @@ -50,7 +57,7 @@ public class MockClusterResourceManager extends ClusterResourceManager { SamzaResource resource = new SamzaResource(resourceRequest.getNumCores(), resourceRequest.getMemoryMB(), resourceRequest.getPreferredHost(), UUID.randomUUID().toString()); resourceRequests.add(resource); - + requestCountSemaphore.release(); clusterManagerCallback.onResourcesAvailable(ImmutableList.of(resource)); } @@ -59,6 +66,14 @@ public class MockClusterResourceManager extends ClusterResourceManager { cancelledRequests.add(request); } + public boolean awaitResourceRequests(int numExpectedRequests, long val, TimeUnit unit) throws Exception { + return requestCountSemaphore.tryAcquire(numExpectedRequests, val, unit); + } + + public boolean awaitContainerLaunch(int numExpectedContainers, long val, TimeUnit unit) throws Exception { + return launchCountSemaphore.tryAcquire(numExpectedContainers, val, unit); + } + @Override public void releaseResources(SamzaResource resource) { releasedResources.add(resource); @@ -75,11 +90,7 @@ public class MockClusterResourceManager extends ClusterResourceManager { for (MockContainerListener listener : mockContainerListeners) { listener.postRunContainer(launchedResources.size()); } - } - - @Override - public void stop(SamzaApplicationState.SamzaAppStatus status) { - + launchCountSemaphore.release(); } public void registerContainerListener(MockContainerListener listener) { @@ -90,4 +101,8 @@ public class MockClusterResourceManager extends ClusterResourceManager { mockContainerListeners.clear(); } + @Override + public void stop(SamzaApplicationState.SamzaAppStatus status) { + + } } http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java index c26d727..490de51 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java @@ -21,8 +21,10 @@ package org.apache.samza.clustermanager; import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.ImmutableMap; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; import org.apache.samza.container.LocalityManager; @@ -33,6 +35,7 @@ import org.apache.samza.testUtils.MockHttpServer; import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.ServletHolder; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -46,7 +49,7 @@ import static org.mockito.Mockito.when; public class TestHostAwareContainerAllocator { private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback(); - private final MockClusterResourceManager manager = new MockClusterResourceManager(callback); + private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback); private final Config config = getConfig(); private final JobModelManager reader = initializeJobModelManager(config, 1); @@ -70,8 +73,8 @@ public class TestHostAwareContainerAllocator { @Before public void setup() throws Exception { - containerAllocator = new HostAwareContainerAllocator(manager, timeoutMillis, config, state); - requestState = new MockContainerRequestState(manager, true); + containerAllocator = new HostAwareContainerAllocator(clusterResourceManager, timeoutMillis, config, state); + requestState = new MockContainerRequestState(clusterResourceManager, true); Field requestStateField = containerAllocator.getClass().getSuperclass().getDeclaredField("resourceRequestState"); requestStateField.setAccessible(true); requestStateField.set(containerAllocator, requestState); @@ -187,9 +190,9 @@ public class TestHostAwareContainerAllocator { Runnable releasedAssertions = new Runnable() { @Override public void run() { - assertEquals(2, manager.releasedResources.size()); - assertTrue(manager.releasedResources.contains(resource1)); - assertTrue(manager.releasedResources.contains(resource2)); + assertEquals(2, clusterResourceManager.releasedResources.size()); + assertTrue(clusterResourceManager.releasedResources.contains(resource1)); + assertTrue(clusterResourceManager.releasedResources.contains(resource2)); // Test that state is cleaned up assertEquals(0, requestState.numPendingRequests()); @@ -227,8 +230,8 @@ public class TestHostAwareContainerAllocator { containerAllocator.requestResources(containersToHostMapping); - assertNotNull(manager.resourceRequests); - assertEquals(manager.resourceRequests.size(), 4); + assertNotNull(clusterResourceManager.resourceRequests); + assertEquals(clusterResourceManager.resourceRequests.size(), 4); assertEquals(requestState.numPendingRequests(), 4); Map<String, AtomicInteger> requestsMap = requestState.getRequestsToCountMap(); @@ -247,7 +250,7 @@ public class TestHostAwareContainerAllocator { */ @Test - public void testExpiredRequestHandling() throws Exception { + public void testExpiredRequestAreAssignedToAnyHost() throws Exception { final SamzaResource resource0 = new SamzaResource(1, 1000, "xyz", "id1"); final SamzaResource resource1 = new SamzaResource(1, 1000, "zzz", "id2"); @@ -289,13 +292,13 @@ public class TestHostAwareContainerAllocator { Runnable runningContainerAssertions = new Runnable() { @Override public void run() { - assertTrue(manager.launchedResources.contains(resource0)); - assertTrue(manager.launchedResources.contains(resource1)); + assertTrue(clusterResourceManager.launchedResources.contains(resource0)); + assertTrue(clusterResourceManager.launchedResources.contains(resource1)); } }; MockContainerListener listener = new MockContainerListener(2, 0, 2, 2, addContainerAssertions, null, assignContainerAssertions, runningContainerAssertions); requestState.registerContainerListener(listener); - ((MockClusterResourceManager) manager).registerContainerListener(listener); + ((MockClusterResourceManager) clusterResourceManager).registerContainerListener(listener); containerAllocator.addResource(resource0); containerAllocator.addResource(resource1); allocatorThread.start(); @@ -303,6 +306,103 @@ public class TestHostAwareContainerAllocator { listener.verify(); } + @Test + public void testExpiredRequestsAreCancelled() throws Exception { + // request one container each on host-1 and host-2 + containerAllocator.requestResources(ImmutableMap.of("0", "host-1", "1", "host-2")); + // assert that the requests made it to YARN + Assert.assertEquals(clusterResourceManager.resourceRequests.size(), 2); + // allocate one resource from YARN on a different host (host-3) + SamzaResource resource0 = new SamzaResource(1, 1000, "host-3", "id1"); + containerAllocator.addResource(resource0); + // let the matching begin + allocatorThread.start(); + + // verify that a container is launched on host-3 after the request expires + if (!clusterResourceManager.awaitContainerLaunch(1, 20, TimeUnit.SECONDS)) { + Assert.fail("Timed out waiting container launch"); + } + Assert.assertEquals(1, clusterResourceManager.launchedResources.size()); + Assert.assertEquals(clusterResourceManager.launchedResources.get(0).getHost(), "host-3"); + Assert.assertEquals(clusterResourceManager.launchedResources.get(0).getResourceID(), "id1"); + + // Now, there are no more resources left to run the 2nd container. Verify that we eventually issue another request + if (!clusterResourceManager.awaitResourceRequests(4, 20, TimeUnit.SECONDS)) { + Assert.fail("Timed out waiting for resource requests"); + } + // verify that we have cancelled previous requests and there's one outstanding request + Assert.assertEquals(clusterResourceManager.cancelledRequests.size(), 3); + } + + @Test + public void testExpiryWithNonResponsiveClusterManager() throws Exception { + + final SamzaResource resource0 = new SamzaResource(1, 1000, "host-3", "id1"); + final SamzaResource resource1 = new SamzaResource(1, 1000, "host-4", "id2"); + + Map<String, String> containersToHostMapping = ImmutableMap.of("0", "host-1", "1", "host-2"); + + Runnable addContainerAssertions = new Runnable() { + @Override + public void run() { + // verify that resources are buffered in the right queue. ie, only those resources on previously requested hosts + // in the preferred-host queue while other resources end up in the ANY_HOST queue + assertNull(requestState.getResourcesOnAHost("host-3")); + assertNull(requestState.getResourcesOnAHost("host-4")); + assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST)); + assertEquals(1, requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size()); + } + }; + + Runnable assignContainerAssertions = new Runnable() { + // verify that we processed all requests + @Override + public void run() { + assertEquals(requestState.numPendingRequests(), 0); + } + }; + + Runnable runningContainerAssertions = new Runnable() { + // verify that the two containers were actually launched + @Override + public void run() { + assertTrue(clusterResourceManager.launchedResources.contains(resource0)); + assertTrue(clusterResourceManager.launchedResources.contains(resource1)); + } + }; + MockContainerListener listener = new MockContainerListener(2, 0, 2, 2, addContainerAssertions, null, assignContainerAssertions, runningContainerAssertions); + requestState.registerContainerListener(listener); + clusterResourceManager.registerContainerListener(listener); + + // request for resources - one each on host-1 and host-2 + containerAllocator.requestResources(containersToHostMapping); + assertEquals(requestState.numPendingRequests(), 2); + assertNotNull(requestState.getRequestsToCountMap()); + assertNotNull(requestState.getRequestsToCountMap().get("host-1")); + assertTrue(requestState.getRequestsToCountMap().get("host-1").get() == 1); + assertNotNull(requestState.getRequestsToCountMap().get("host-2")); + assertTrue(requestState.getRequestsToCountMap().get("host-2").get() == 1); + + // verify that no containers have been launched yet (since, YARN has not provided any resources) + assertEquals(0, clusterResourceManager.launchedResources.size()); + + // provide a resource on host-3 + containerAllocator.addResource(resource0); + allocatorThread.start(); + // verify that the first preferred host request should expire and container-0 should launch on host-3 + if (!clusterResourceManager.awaitContainerLaunch(1, 20, TimeUnit.SECONDS)) { + Assert.fail("Timed out waiting for container-0 to launch"); + } + // verify that the second preferred host request should expire and should trigger ANY_HOST requests + if (!clusterResourceManager.awaitResourceRequests(4, 20, TimeUnit.SECONDS)) { + Assert.fail("Timed out waiting for resource requests"); + } + // finally, provide a container from YARN after multiple requests + containerAllocator.addResource(resource1); + // verify all the test assertions + listener.verify(); + } + @After public void teardown() throws Exception { reader.stop(); http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java index 695b35a..f8c7d9b 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -238,7 +238,7 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement new String[]{preferredHost}, null, priority, - true, + false, containerLabel); } //ensure that updating the state and making the request are done atomically.