Repository: samza
Updated Branches:
  refs/heads/master 96c333469 -> b6219f181


SAMZA-1649: Improve host-aware allocation to account for strict locality

Currently working on a doc for the behavior of the CapacityScheduler and 
further testing is needed on an actual cluster - but here's a summary of why we 
should set relax-locality = false:
 - Node-local requests are honored only when relax-locality = false
 - With relax-locality = true, the scheduler biases interactivity over 
data-locality for requests that ask for few resources relative to the size of 
the cluster.

In addition to the above change, this PR also modifies the allocator algorithm 
to fallback to "ANY_HOST" requests so that we make progress when the node is 
unavailable.

Author: Jagadish <jvenkatra...@linkedin.com>

Reviewers: Prateek Maheshwari <pmahe...@linkedin.com>

Closes #471 from vjagadish/relax-locality-fix


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b6219f18
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b6219f18
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b6219f18

Branch: refs/heads/master
Commit: b6219f181ff54e79aedcf6f1b7fff264925a8af7
Parents: 96c3334
Author: Jagadish <jvenkatra...@linkedin.com>
Authored: Thu Apr 12 19:23:44 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Thu Apr 12 19:23:44 2018 -0700

----------------------------------------------------------------------
 .../HostAwareContainerAllocator.java            |  17 ++-
 .../clustermanager/ResourceRequestState.java    |  18 +++
 .../MockClusterResourceManager.java             |  39 ++++--
 .../TestHostAwareContainerAllocator.java        | 124 +++++++++++++++++--
 .../job/yarn/YarnClusterResourceManager.java    |   2 +-
 5 files changed, 169 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
index 66e2246..fe462e7 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java
@@ -78,13 +78,18 @@ public class HostAwareContainerAllocator extends 
AbstractContainerAllocator {
         boolean expired = requestExpired(request);
         boolean resourceAvailableOnAnyHost = 
hasAllocatedResource(ResourceRequestState.ANY_HOST);
 
-        if (expired && resourceAvailableOnAnyHost) {
-          log.info("Request expired. running on ANY_HOST");
-          runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+        if (expired) {
+          if (resourceAvailableOnAnyHost) {
+            log.info("Request for container: {} on {} has expired. Running on 
ANY_HOST", request.getContainerID(), request.getPreferredHost());
+            runStreamProcessor(request, ResourceRequestState.ANY_HOST);
+          } else {
+            log.info("Request for container: {} on {} has expired. Requesting 
additional resources on ANY_HOST.", request.getContainerID(), 
request.getPreferredHost());
+            resourceRequestState.cancelResourceRequest(request);
+            requestResource(containerID, ResourceRequestState.ANY_HOST);
+          }
         } else {
-          log.info("Either the request timestamp {} is greater than resource 
request timeout {}ms or we couldn't "
-                  + "find any free allocated resources in the buffer. Breaking 
out of loop.",
-              request.getRequestTimestampMs(), requestTimeout);
+          log.info("Request for container: {} on {} has not yet expired. 
Request creation time: {}. Request timeout: {}",
+              new Object[]{request.getContainerID(), 
request.getPreferredHost(), request.getRequestTimestampMs(), requestTimeout});
           break;
         }
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index fe2067c..51caa39 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -102,6 +102,24 @@ public class ResourceRequestState {
   }
 
   /**
+   * Cancels a {@link SamzaResourceRequest} previously submitted to the {@link 
ClusterResourceManager}
+   *
+   * @param request {@link SamzaResourceRequest} to cancel
+   */
+  public void cancelResourceRequest(SamzaResourceRequest request) {
+    log.info("Canceling resource request on {} for {}", 
request.getPreferredHost(), request.getContainerID());
+    synchronized (lock) {
+      requestsQueue.remove(request);
+      if (hostAffinityEnabled) {
+        // assignedHost may not always be the preferred host.
+        // Hence, we should safely decrement the counter for the preferredHost
+        requestsToCountMap.get(request.getPreferredHost()).decrementAndGet();
+      }
+      manager.cancelResourceRequest(request);
+    }
+  }
+
+  /**
    * Invoked each time a resource is returned from a {@link 
ClusterResourceManager}.
    * @param samzaResource The resource that was returned from the {@link 
ClusterResourceManager}
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index 452cadc..471c7fe 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -23,20 +23,27 @@ import com.google.common.collect.ImmutableList;
 import org.apache.samza.job.CommandBuilder;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 public class MockClusterResourceManager extends ClusterResourceManager {
-  Set<SamzaResource> releasedResources = new HashSet<>();
-  List<SamzaResource> resourceRequests = new ArrayList<>();
-  List<SamzaResourceRequest> cancelledRequests = new ArrayList<>();
-  List<SamzaResource> launchedResources = new ArrayList<>();
-  List<MockContainerListener> mockContainerListeners = new 
ArrayList<MockContainerListener>();
+  final Set<SamzaResource> releasedResources = Collections.synchronizedSet(new 
HashSet<>());
+  final List<SamzaResource> resourceRequests = 
Collections.synchronizedList(new ArrayList<>());
+  final List<SamzaResourceRequest> cancelledRequests = 
Collections.synchronizedList(new ArrayList<>());
+  final List<SamzaResource> launchedResources = 
Collections.synchronizedList(new ArrayList<>());
+  final List<MockContainerListener> mockContainerListeners = 
Collections.synchronizedList(new ArrayList<>());
+
+  private final Semaphore requestCountSemaphore = new Semaphore(0);
+  private final Semaphore launchCountSemaphore = new Semaphore(0);
+
   Throwable nextException = null;
 
-  public MockClusterResourceManager(ClusterResourceManager.Callback callback) {
+  MockClusterResourceManager(ClusterResourceManager.Callback callback) {
     super(callback);
   }
 
@@ -50,7 +57,7 @@ public class MockClusterResourceManager extends 
ClusterResourceManager {
     SamzaResource resource = new SamzaResource(resourceRequest.getNumCores(), 
resourceRequest.getMemoryMB(),
         resourceRequest.getPreferredHost(), UUID.randomUUID().toString());
     resourceRequests.add(resource);
-
+    requestCountSemaphore.release();
     clusterManagerCallback.onResourcesAvailable(ImmutableList.of(resource));
   }
 
@@ -59,6 +66,14 @@ public class MockClusterResourceManager extends 
ClusterResourceManager {
     cancelledRequests.add(request);
   }
 
+  public boolean awaitResourceRequests(int numExpectedRequests, long val, 
TimeUnit unit) throws Exception  {
+    return requestCountSemaphore.tryAcquire(numExpectedRequests, val, unit);
+  }
+
+  public boolean awaitContainerLaunch(int numExpectedContainers, long val, 
TimeUnit unit) throws Exception {
+    return launchCountSemaphore.tryAcquire(numExpectedContainers, val, unit);
+  }
+
   @Override
   public void releaseResources(SamzaResource resource) {
     releasedResources.add(resource);
@@ -75,11 +90,7 @@ public class MockClusterResourceManager extends 
ClusterResourceManager {
     for (MockContainerListener listener : mockContainerListeners) {
       listener.postRunContainer(launchedResources.size());
     }
-  }
-
-  @Override
-  public void stop(SamzaApplicationState.SamzaAppStatus status) {
-
+    launchCountSemaphore.release();
   }
 
   public void registerContainerListener(MockContainerListener listener) {
@@ -90,4 +101,8 @@ public class MockClusterResourceManager extends 
ClusterResourceManager {
     mockContainerListeners.clear();
   }
 
+  @Override
+  public void stop(SamzaApplicationState.SamzaAppStatus status) {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
index c26d727..490de51 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java
@@ -21,8 +21,10 @@ package org.apache.samza.clustermanager;
 import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.LocalityManager;
@@ -33,6 +35,7 @@ import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -46,7 +49,7 @@ import static org.mockito.Mockito.when;
 public class TestHostAwareContainerAllocator {
 
   private final MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
-  private final MockClusterResourceManager manager = new 
MockClusterResourceManager(callback);
+  private final MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback);
   private final Config config = getConfig();
   private final JobModelManager reader = initializeJobModelManager(config, 1);
 
@@ -70,8 +73,8 @@ public class TestHostAwareContainerAllocator {
 
   @Before
   public void setup() throws Exception {
-    containerAllocator = new HostAwareContainerAllocator(manager, 
timeoutMillis, config, state);
-    requestState = new MockContainerRequestState(manager, true);
+    containerAllocator = new 
HostAwareContainerAllocator(clusterResourceManager, timeoutMillis, config, 
state);
+    requestState = new MockContainerRequestState(clusterResourceManager, true);
     Field requestStateField = 
containerAllocator.getClass().getSuperclass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
     requestStateField.set(containerAllocator, requestState);
@@ -187,9 +190,9 @@ public class TestHostAwareContainerAllocator {
     Runnable releasedAssertions = new Runnable() {
       @Override
       public void run() {
-        assertEquals(2, manager.releasedResources.size());
-        assertTrue(manager.releasedResources.contains(resource1));
-        assertTrue(manager.releasedResources.contains(resource2));
+        assertEquals(2, clusterResourceManager.releasedResources.size());
+        
assertTrue(clusterResourceManager.releasedResources.contains(resource1));
+        
assertTrue(clusterResourceManager.releasedResources.contains(resource2));
 
         // Test that state is cleaned up
         assertEquals(0, requestState.numPendingRequests());
@@ -227,8 +230,8 @@ public class TestHostAwareContainerAllocator {
 
     containerAllocator.requestResources(containersToHostMapping);
 
-    assertNotNull(manager.resourceRequests);
-    assertEquals(manager.resourceRequests.size(), 4);
+    assertNotNull(clusterResourceManager.resourceRequests);
+    assertEquals(clusterResourceManager.resourceRequests.size(), 4);
     assertEquals(requestState.numPendingRequests(), 4);
 
     Map<String, AtomicInteger> requestsMap = 
requestState.getRequestsToCountMap();
@@ -247,7 +250,7 @@ public class TestHostAwareContainerAllocator {
    */
 
   @Test
-  public void testExpiredRequestHandling() throws Exception {
+  public void testExpiredRequestAreAssignedToAnyHost() throws Exception {
     final SamzaResource resource0 = new SamzaResource(1, 1000, "xyz", "id1");
     final SamzaResource resource1 = new SamzaResource(1, 1000, "zzz", "id2");
 
@@ -289,13 +292,13 @@ public class TestHostAwareContainerAllocator {
     Runnable runningContainerAssertions = new Runnable() {
       @Override
       public void run() {
-        assertTrue(manager.launchedResources.contains(resource0));
-        assertTrue(manager.launchedResources.contains(resource1));
+        
assertTrue(clusterResourceManager.launchedResources.contains(resource0));
+        
assertTrue(clusterResourceManager.launchedResources.contains(resource1));
       }
     };
     MockContainerListener listener = new MockContainerListener(2, 0, 2, 2, 
addContainerAssertions, null, assignContainerAssertions, 
runningContainerAssertions);
     requestState.registerContainerListener(listener);
-    ((MockClusterResourceManager) manager).registerContainerListener(listener);
+    ((MockClusterResourceManager) 
clusterResourceManager).registerContainerListener(listener);
     containerAllocator.addResource(resource0);
     containerAllocator.addResource(resource1);
     allocatorThread.start();
@@ -303,6 +306,103 @@ public class TestHostAwareContainerAllocator {
     listener.verify();
   }
 
+  @Test
+  public void testExpiredRequestsAreCancelled() throws Exception {
+    // request one container each on host-1 and host-2
+    containerAllocator.requestResources(ImmutableMap.of("0", "host-1", "1", 
"host-2"));
+    // assert that the requests made it to YARN
+    Assert.assertEquals(clusterResourceManager.resourceRequests.size(), 2);
+    // allocate one resource from YARN on a different host (host-3)
+    SamzaResource resource0 = new SamzaResource(1, 1000, "host-3", "id1");
+    containerAllocator.addResource(resource0);
+    // let the matching begin
+    allocatorThread.start();
+
+    // verify that a container is launched on host-3 after the request expires
+    if (!clusterResourceManager.awaitContainerLaunch(1, 20, TimeUnit.SECONDS)) 
{
+      Assert.fail("Timed out waiting container launch");
+    }
+    Assert.assertEquals(1, clusterResourceManager.launchedResources.size());
+    
Assert.assertEquals(clusterResourceManager.launchedResources.get(0).getHost(), 
"host-3");
+    
Assert.assertEquals(clusterResourceManager.launchedResources.get(0).getResourceID(),
 "id1");
+
+    // Now, there are no more resources left to run the 2nd container. Verify 
that we eventually issue another request
+    if (!clusterResourceManager.awaitResourceRequests(4, 20, 
TimeUnit.SECONDS)) {
+      Assert.fail("Timed out waiting for resource requests");
+    }
+    // verify that we have cancelled previous requests and there's one 
outstanding request
+    Assert.assertEquals(clusterResourceManager.cancelledRequests.size(), 3);
+  }
+
+  @Test
+  public void testExpiryWithNonResponsiveClusterManager() throws Exception {
+
+    final SamzaResource resource0 = new SamzaResource(1, 1000, "host-3", 
"id1");
+    final SamzaResource resource1 = new SamzaResource(1, 1000, "host-4", 
"id2");
+
+    Map<String, String> containersToHostMapping = ImmutableMap.of("0", 
"host-1", "1", "host-2");
+
+    Runnable addContainerAssertions = new Runnable() {
+      @Override
+      public void run() {
+        // verify that resources are buffered in the right queue. ie, only 
those resources on previously requested hosts
+        // in the preferred-host queue while other resources end up in the 
ANY_HOST queue
+        assertNull(requestState.getResourcesOnAHost("host-3"));
+        assertNull(requestState.getResourcesOnAHost("host-4"));
+        
assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
+        assertEquals(1, 
requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size());
+      }
+    };
+
+    Runnable assignContainerAssertions = new Runnable() {
+      // verify that we processed all requests
+      @Override
+      public void run() {
+        assertEquals(requestState.numPendingRequests(), 0);
+      }
+    };
+
+    Runnable runningContainerAssertions = new Runnable() {
+      // verify that the two containers were actually launched
+      @Override
+      public void run() {
+        
assertTrue(clusterResourceManager.launchedResources.contains(resource0));
+        
assertTrue(clusterResourceManager.launchedResources.contains(resource1));
+      }
+    };
+    MockContainerListener listener = new MockContainerListener(2, 0, 2, 2, 
addContainerAssertions, null, assignContainerAssertions, 
runningContainerAssertions);
+    requestState.registerContainerListener(listener);
+    clusterResourceManager.registerContainerListener(listener);
+
+    // request for resources - one each on host-1 and host-2
+    containerAllocator.requestResources(containersToHostMapping);
+    assertEquals(requestState.numPendingRequests(), 2);
+    assertNotNull(requestState.getRequestsToCountMap());
+    assertNotNull(requestState.getRequestsToCountMap().get("host-1"));
+    assertTrue(requestState.getRequestsToCountMap().get("host-1").get() == 1);
+    assertNotNull(requestState.getRequestsToCountMap().get("host-2"));
+    assertTrue(requestState.getRequestsToCountMap().get("host-2").get() == 1);
+
+    // verify that no containers have been launched yet (since, YARN has not 
provided any resources)
+    assertEquals(0, clusterResourceManager.launchedResources.size());
+
+    // provide a resource on host-3
+    containerAllocator.addResource(resource0);
+    allocatorThread.start();
+    // verify that the first preferred host request should expire and 
container-0 should launch on host-3
+    if (!clusterResourceManager.awaitContainerLaunch(1, 20, TimeUnit.SECONDS)) 
{
+      Assert.fail("Timed out waiting for container-0 to launch");
+    }
+    // verify that the second preferred host request should expire and should 
trigger ANY_HOST requests
+    if (!clusterResourceManager.awaitResourceRequests(4, 20, 
TimeUnit.SECONDS)) {
+      Assert.fail("Timed out waiting for resource requests");
+    }
+    // finally, provide a container from YARN after multiple requests
+    containerAllocator.addResource(resource1);
+    // verify all the test assertions
+    listener.verify();
+  }
+
   @After
   public void teardown() throws Exception {
     reader.stop();

http://git-wip-us.apache.org/repos/asf/samza/blob/b6219f18/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 695b35a..f8c7d9b 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -238,7 +238,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
               new String[]{preferredHost},
               null,
               priority,
-              true,
+              false,
               containerLabel);
     }
     //ensure that updating the state and making the request are done 
atomically.

Reply via email to