This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ae3450  Samza-2330: Handle expired resource request for Container 
allocator when host affinity is disabled     (#1170)
2ae3450 is described below

commit 2ae34502e3526cbb8e275d87c26c7bc4ce2d8ed4
Author: Sanil Jain <[email protected]>
AuthorDate: Tue Oct 8 13:41:06 2019 -0700

    Samza-2330: Handle expired resource request for Container allocator when 
host affinity is disabled   (#1170)
    
    * Adding expiry check for unresponsive Cluster Manager when host affinity 
is off
    
    * Fixing after rebase
    
    * Addressing Ray's feedback
    
    * Updating javadocs
    
    * Nitpick improvements
---
 .../versioned/jobs/samza-configurations.md         |  2 +-
 .../samza/clustermanager/ContainerAllocator.java   | 19 ++++----
 .../TestContainerAllocatorWithHostAffinity.java    | 12 ++---
 .../TestContainerAllocatorWithoutHostAffinity.java | 55 +++++++++++++++++++++-
 .../TestContainerProcessManager.java               |  2 +-
 5 files changed, 70 insertions(+), 20 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index a247460..4ccc02b 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -299,7 +299,7 @@ Samza supports both standalone and clustered 
([YARN](yarn-jobs.html)) [deploymen
 |cluster-manager.container.fail.job.after.retries|true|This configuration sets 
the behavior of the job after all `cluster-manager.container.retry.count`s are 
exhausted and each retry is within the 
`cluster-manager.container.retry.window.ms` period on any single container. If 
set to true, the whole job will fail if any container fails after the last 
retry. If set to false, the job will continue to run without the failed 
container. The typical use cases of setting this to false is to aid i [...]
 |cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor 
of `job.jmx.enabled`|
 |cluster-manager.allocator.sleep.ms|3600|The container allocator thread is 
responsible for matching requests to allocated containers. The sleep interval 
for this thread is configured using this property.|
-|cluster-manager.container.request.timeout.ms|5000|The allocator thread 
periodically checks the state of the container requests and allocated 
containers to determine the assignment of a container to an allocated resource. 
This property determines the number of milliseconds before a container request 
is considered to have expired / timed-out. When a request expires, it gets 
allocated to any available container that was returned by the cluster manager.|
+|cluster-manager.container.request.timeout.ms|5000|The allocator thread 
periodically checks the state of the container requests and allocated 
containers to determine the assignment of a container to an allocated resource. 
If no resource is obtained after cluster-manager.container.request.timeout.ms 
the request is declared to be expired.. When a request expires, it gets 
allocated to any available container that was returned by the cluster manager 
if none is available the existing resource [...]
 |task.execute|bin/run-container.sh|The command that starts a Samza container. 
The script must be included in the [job package](./packaging.html). There is 
usually no need to customize this.|
 |task.java.home| |The JAVA_HOME path for Samza containers. By setting this 
property, you can use a java version that is different from your cluster's java 
version. Remember to set the `yarn.am.java.home` as well.|
 |yarn.am.container.<br>memory.mb|1024|Each Samza job when running in Yarn has 
one special container, the [ApplicationMaster](../yarn/application-master.html) 
(AM), which manages the execution of the job. This property determines how much 
memory, in megabytes, to request from YARN for running the ApplicationMaster.|
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 5df5cd7..361d1eb 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -56,10 +56,10 @@ import org.slf4j.LoggerFactory;
  *    When host-affinity is disabled, the resource-request's preferredHost 
param is set to {@link ResourceRequestState#ANY_HOST}
  *  </li>
  *  <li>
- *    When host-affinity is enabled and a preferred resource has not been 
obtained after {@code requestExpiryTimeout}
- *    milliseconds of the request being made, the resource is declared 
expired. The expired request are handled by
- *    allocating them to *ANY* allocated resource if available. If no surplus 
resources are available the current preferred
- *    resource-request is cancelled and resource-request for ANY_HOST is issued
+ *    When the preferred resource has not been obtained after {@code 
requestExpiryTimeout} milliseconds of the request
+ *    being made, the resource is declared expired. Expired request are 
handled by allocating them to *ANY*
+ *    allocated resource if available. If no surplus resources are available 
the current preferred resource-request
+ *    is cancelled and resource-request for ANY_HOST is issued
  *  </li>
  *  <li>
  *    When host-affinity is not enabled, this periodically wakes up to assign 
a processor to *ANY* allocated resource.
@@ -219,9 +219,7 @@ public class ContainerAllocator implements Runnable {
 
         if (expired) {
           updateExpiryMetrics(request);
-          if (hostAffinityEnabled) {
-            handleExpiredRequestWithHostAffinityEnabled(processorId, 
preferredHost, request);
-          }
+          handleExpiredRequest(processorId, preferredHost, request);
         } else {
           LOG.info("Request for Processor ID: {} on preferred host {} has not 
expired yet."
                   + "Request creation time: {}. Current Time: {}. Request 
timeout: {} ms", processorId, preferredHost,
@@ -233,11 +231,12 @@ public class ContainerAllocator implements Runnable {
   }
 
   /**
-   * Handles an expired resource request when {@code hostAffinityEnabled} is 
true, in this case since the
-   * preferred host, we try to see if a surplus ANY_HOST is available in the 
request queue.
+   * Handles an expired resource request for both active and standby 
containers. Since a preferred host cannot be obtained
+   * this method checks the availability of surplus ANY_HOST resources and 
launches the container if available. Otherwise
+   * issues an ANY_HOST request. This behavior holds regardless of 
host-affinity enabled or not.
    */
   @VisibleForTesting
-  void handleExpiredRequestWithHostAffinityEnabled(String processorId, String 
preferredHost,
+  void handleExpiredRequest(String processorId, String preferredHost,
       SamzaResourceRequest request) {
     boolean resourceAvailableOnAnyHost = 
hasAllocatedResource(ResourceRequestState.ANY_HOST);
     if (standbyContainerManager.isPresent()) {
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index bb9cb79..6bc4e49 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -410,8 +410,8 @@ public class TestContainerAllocatorWithHostAffinity {
     // Request Preferred Resources
     spyAllocator.requestResources(new HashMap<String, String>() {
       {
-        put("0", "abc");
-        put("1", "def");
+        put("0", "hostname-0");
+        put("1", "hostname-1");
       }
     });
 
@@ -425,9 +425,9 @@ public class TestContainerAllocatorWithHostAffinity {
     // Verify that all the request that were created as preferred host 
requests expired
     assertTrue(state.preferredHostRequests.get() == 2);
     assertTrue(state.expiredPreferredHostRequests.get() == 2);
-    verify(spyAllocator, 
times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("abc"),
+    verify(spyAllocator, times(1)).handleExpiredRequest(eq("0"), 
eq("hostname-0"),
         any(SamzaResourceRequest.class));
-    verify(spyAllocator, 
times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("def"),
+    verify(spyAllocator, times(1)).handleExpiredRequest(eq("1"), 
eq("hostname-1"),
         any(SamzaResourceRequest.class));
 
     // Verify that preferred host request were cancelled and since no surplus 
resources were available
@@ -470,9 +470,9 @@ public class TestContainerAllocatorWithHostAffinity {
 
     // Verify that all the request that were created as preferred host 
requests expired
     assertTrue(state.expiredPreferredHostRequests.get() == 2);
-    verify(spyAllocator, 
times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("abc"),
+    verify(spyAllocator, times(1)).handleExpiredRequest(eq("0"), eq("abc"),
         any(SamzaResourceRequest.class));
-    verify(spyAllocator, 
times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("def"),
+    verify(spyAllocator, times(1)).handleExpiredRequest(eq("1"), eq("def"),
         any(SamzaResourceRequest.class));
 
     // Verify that runStreamProcessor was invoked with already available 
ANY_HOST requests
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index 16eac0b..bbccbcb 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.JobModelManager;
@@ -87,6 +88,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
         put("cluster-manager.container.count", "1");
         put("cluster-manager.container.retry.count", "1");
         put("cluster-manager.container.retry.window.ms", "1999999999");
+        put("cluster-manager.container.request.timeout.ms", "3");
         put("cluster-manager.allocator.sleep.ms", "10");
         put("cluster-manager.container.memory.mb", "512");
         put("yarn.package.path", "/foo");
@@ -282,12 +284,61 @@ public class TestContainerAllocatorWithoutHostAffinity {
     resourceRequestCaptor.getAllValues()
         .forEach(resourceRequest -> 
assertEquals(resourceRequest.getPreferredHost(), 
ResourceRequestState.ANY_HOST));
     assertTrue(state.anyHostRequests.get() == containersToHostMapping.size());
-    // Expiry currently is only handled for host affinity enabled cases
-    verify(spyAllocator, 
never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(),
+    // Expiry currently should not be invoked
+    verify(spyAllocator, never()).handleExpiredRequest(anyString(), 
anyString(),
         any(SamzaResourceRequest.class));
     // Only updated when host affinity is enabled
     assertTrue(state.matchedResourceRequests.get() == 0);
     assertTrue(state.preferredHostRequests.get() == 0);
     spyAllocator.stop();
   }
+
+  @Test
+  public void testExpiredRequestAllocationOnAnyHost() throws Exception {
+    MockClusterResourceManager spyManager = spy(new 
MockClusterResourceManager(callback, state));
+    spyAllocator = Mockito.spy(
+        new ContainerAllocator(spyManager, config, state, false, 
Optional.empty()));
+
+    // Request Resources
+    spyAllocator.requestResources(new HashMap<String, String>() {
+      {
+        put("0", "host-0");
+        put("1", "host-1");
+      }
+    });
+
+    spyThread = new Thread(spyAllocator);
+    // Start the container allocator thread periodic assignment
+    spyThread.start();
+
+    // Let the request expire, expiration timeout is 3 ms
+    Thread.sleep(20);
+
+    // Verify that all the request that were created as ANY_HOST host
+    // and all created requests expired
+    assertEquals(state.preferredHostRequests.get(), 0);
+    // Atleast 2 requests should expire & 2 ANY_HOST requests should be 
generated
+    assertTrue(state.anyHostRequests.get() >= 4);
+    assertTrue(state.expiredAnyHostRequests.get() >= 2);
+
+    verify(spyAllocator, atLeastOnce()).handleExpiredRequest(eq("0"), 
eq(ResourceRequestState.ANY_HOST),
+        any(SamzaResourceRequest.class));
+    verify(spyAllocator, atLeastOnce()).handleExpiredRequest(eq("1"), 
eq(ResourceRequestState.ANY_HOST),
+        any(SamzaResourceRequest.class));
+
+    // Verify that preferred host request were cancelled and since no surplus 
resources were available
+    // requestResource was invoked with ANY_HOST requests
+    ArgumentCaptor<SamzaResourceRequest> cancelledRequestCaptor = 
ArgumentCaptor.forClass(SamzaResourceRequest.class);
+    // At least 2 preferred host requests were cancelled
+    verify(spyManager, 
atLeast(2)).cancelResourceRequest(cancelledRequestCaptor.capture());
+    // Verify all the request cancelled were ANY_HOST
+    assertTrue(cancelledRequestCaptor.getAllValues()
+        .stream()
+        .map(resourceRequest -> resourceRequest.getPreferredHost())
+        .collect(Collectors.toSet())
+        .size() == 1);
+    containerAllocator.stop();
+
+  }
+
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index d0ea463..827bddf 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -71,7 +71,7 @@ public class TestContainerProcessManager {
       put("cluster-manager.container.retry.count", "1");
       put("cluster-manager.container.retry.window.ms", "1999999999");
       put("cluster-manager.allocator.sleep.ms", "1");
-      put("cluster-manager.container.request.timeout.ms", "2");
+      put("cluster-manager.container.request.timeout.ms", "100");
       put("cluster-manager.container.memory.mb", "512");
       put("yarn.package.path", "/foo");
       put("task.inputs", "test-system.test-stream");

Reply via email to