This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/1.3.0 by this push:
     new 5478bee  Revert "Samza-2330: Handle expired resource request for 
Container allocator when host affinity is disabled"
5478bee is described below

commit 5478beeb811c599c0f8e1b849a5d6c8533a3b7ba
Author: Sanil Jain <[email protected]>
AuthorDate: Fri Oct 25 11:43:01 2019 -0700

    Revert "Samza-2330: Handle expired resource request for Container allocator 
when host affinity is disabled"
    
    This reverts commit 2ae34502e3526cbb8e275d87c26c7bc4ce2d8ed4.
---
 .../versioned/jobs/samza-configurations.md         |  2 +-
 .../samza/clustermanager/ContainerAllocator.java   | 16 ++++---
 .../TestContainerAllocatorWithHostAffinity.java    | 10 ++--
 .../TestContainerAllocatorWithoutHostAffinity.java | 55 +---------------------
 .../TestContainerProcessManager.java               |  2 +-
 5 files changed, 18 insertions(+), 67 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 4ccc02b..a247460 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -299,7 +299,7 @@ Samza supports both standalone and clustered 
([YARN](yarn-jobs.html)) [deploymen
 |cluster-manager.container.fail.job.after.retries|true|This configuration sets 
the behavior of the job after all `cluster-manager.container.retry.count`s are 
exhausted and each retry is within the 
`cluster-manager.container.retry.window.ms` period on any single container. If 
set to true, the whole job will fail if any container fails after the last 
retry. If set to false, the job will continue to run without the failed 
container. The typical use cases of setting this to false is to aid i [...]
 |cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor 
of `job.jmx.enabled`|
 |cluster-manager.allocator.sleep.ms|3600|The container allocator thread is 
responsible for matching requests to allocated containers. The sleep interval 
for this thread is configured using this property.|
-|cluster-manager.container.request.timeout.ms|5000|The allocator thread 
periodically checks the state of the container requests and allocated 
containers to determine the assignment of a container to an allocated resource. 
If no resource is obtained after cluster-manager.container.request.timeout.ms 
the request is declared to be expired.. When a request expires, it gets 
allocated to any available container that was returned by the cluster manager 
if none is available the existing resource [...]
+|cluster-manager.container.request.timeout.ms|5000|The allocator thread 
periodically checks the state of the container requests and allocated 
containers to determine the assignment of a container to an allocated resource. 
This property determines the number of milliseconds before a container request 
is considered to have expired / timed-out. When a request expires, it gets 
allocated to any available container that was returned by the cluster manager.|
 |task.execute|bin/run-container.sh|The command that starts a Samza container. 
The script must be included in the [job package](./packaging.html). There is 
usually no need to customize this.|
 |task.java.home| |The JAVA_HOME path for Samza containers. By setting this 
property, you can use a java version that is different from your cluster's java 
version. Remember to set the `yarn.am.java.home` as well.|
 |yarn.am.container.<br>memory.mb|1024|Each Samza job when running in Yarn has 
one special container, the [ApplicationMaster](../yarn/application-master.html) 
(AM), which manages the execution of the job. This property determines how much 
memory, in megabytes, to request from YARN for running the ApplicationMaster.|
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 361d1eb..89855dc 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -56,10 +56,10 @@ import org.slf4j.LoggerFactory;
  *    When host-affinity is disabled, the resource-request's preferredHost 
param is set to {@link ResourceRequestState#ANY_HOST}
  *  </li>
  *  <li>
- *    When the preferred resource has not been obtained after {@code 
requestExpiryTimeout} milliseconds of the request
- *    being made, the resource is declared expired. Expired request are 
handled by allocating them to *ANY*
- *    allocated resource if available. If no surplus resources are available 
the current preferred resource-request
- *    is cancelled and resource-request for ANY_HOST is issued
+ *    When host-affinity is enabled and a preferred resource has not been 
obtained after {@code requestExpiryTimeout}
+ *    milliseconds of the request being made, the resource is declared 
expired. The expired request are handled by
+ *    allocating them to *ANY* allocated resource if available. If no surplus 
resources are available the current preferred
+ *    resource-request is cancelled and resource-request for ANY_HOST is issued
  *  </li>
  *  <li>
  *    When host-affinity is not enabled, this periodically wakes up to assign 
a processor to *ANY* allocated resource.
@@ -219,7 +219,9 @@ public class ContainerAllocator implements Runnable {
 
         if (expired) {
           updateExpiryMetrics(request);
-          handleExpiredRequest(processorId, preferredHost, request);
+          if (hostAffinityEnabled) {
+            handleExpiredRequestWithHostAffinityEnabled(processorId, 
preferredHost, request);
+          }
         } else {
           LOG.info("Request for Processor ID: {} on preferred host {} has not 
expired yet."
                   + "Request creation time: {}. Current Time: {}. Request 
timeout: {} ms", processorId, preferredHost,
@@ -233,10 +235,10 @@ public class ContainerAllocator implements Runnable {
   /**
    * Handles an expired resource request for both active and standby 
containers. Since a preferred host cannot be obtained
    * this method checks the availability of surplus ANY_HOST resources and 
launches the container if available. Otherwise
-   * issues an ANY_HOST request. This behavior holds regardless of 
host-affinity enabled or not.
+   * issues an ANY_HOST request.
    */
   @VisibleForTesting
-  void handleExpiredRequest(String processorId, String preferredHost,
+  void handleExpiredRequestWithHostAffinityEnabled(String processorId, String 
preferredHost,
       SamzaResourceRequest request) {
     boolean resourceAvailableOnAnyHost = 
hasAllocatedResource(ResourceRequestState.ANY_HOST);
     if (standbyContainerManager.isPresent()) {
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 823191b..927df89 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -425,9 +425,9 @@ public class TestContainerAllocatorWithHostAffinity {
     // Verify that all the request that were created as preferred host 
requests expired
     assertTrue(state.preferredHostRequests.get() == 2);
     assertTrue(state.expiredPreferredHostRequests.get() == 2);
-    verify(spyAllocator, times(1)).handleExpiredRequest(eq("0"), 
eq("hostname-0"),
+    verify(spyAllocator, 
times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
         any(SamzaResourceRequest.class));
-    verify(spyAllocator, times(1)).handleExpiredRequest(eq("1"), 
eq("hostname-1"),
+    verify(spyAllocator, 
times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
         any(SamzaResourceRequest.class));
 
     // Verify that preferred host request were cancelled and since no surplus 
resources were available
@@ -469,10 +469,10 @@ public class TestContainerAllocatorWithHostAffinity {
     Thread.sleep(100);
 
     // Verify that all the request that were created as preferred host 
requests expired
-    assertEquals(state.expiredPreferredHostRequests.get(), 2);
-    verify(spyAllocator, times(1)).handleExpiredRequest(eq("0"), 
eq("hostname-0"),
+    assertTrue(state.expiredPreferredHostRequests.get() == 2);
+    verify(spyAllocator, 
times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
         any(SamzaResourceRequest.class));
-    verify(spyAllocator, times(1)).handleExpiredRequest(eq("1"), 
eq("hostname-1"),
+    verify(spyAllocator, 
times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
         any(SamzaResourceRequest.class));
 
     // Verify that runStreamProcessor was invoked with already available 
ANY_HOST requests
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index f30f800..16eac0b 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.JobModelManager;
@@ -88,7 +87,6 @@ public class TestContainerAllocatorWithoutHostAffinity {
         put("cluster-manager.container.count", "1");
         put("cluster-manager.container.retry.count", "1");
         put("cluster-manager.container.retry.window.ms", "1999999999");
-        put("cluster-manager.container.request.timeout.ms", "3");
         put("cluster-manager.allocator.sleep.ms", "10");
         put("cluster-manager.container.memory.mb", "512");
         put("yarn.package.path", "/foo");
@@ -284,61 +282,12 @@ public class TestContainerAllocatorWithoutHostAffinity {
     resourceRequestCaptor.getAllValues()
         .forEach(resourceRequest -> 
assertEquals(resourceRequest.getPreferredHost(), 
ResourceRequestState.ANY_HOST));
     assertTrue(state.anyHostRequests.get() == containersToHostMapping.size());
-    // Expiry currently should not be invoked
-    verify(spyAllocator, never()).handleExpiredRequest(anyString(), 
anyString(),
+    // Expiry currently is only handled for host affinity enabled cases
+    verify(spyAllocator, 
never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(),
         any(SamzaResourceRequest.class));
     // Only updated when host affinity is enabled
     assertTrue(state.matchedResourceRequests.get() == 0);
     assertTrue(state.preferredHostRequests.get() == 0);
     spyAllocator.stop();
   }
-
-  @Test
-  public void testExpiredRequestAllocationOnAnyHost() throws Exception {
-    MockClusterResourceManager spyManager = spy(new 
MockClusterResourceManager(callback, state));
-    spyAllocator = Mockito.spy(
-        new ContainerAllocator(spyManager, config, state, false, 
Optional.empty()));
-
-    // Request Resources
-    spyAllocator.requestResources(new HashMap<String, String>() {
-      {
-        put("0", "host-0");
-        put("1", "host-1");
-      }
-    });
-
-    spyThread = new Thread(spyAllocator);
-    // Start the container allocator thread periodic assignment
-    spyThread.start();
-
-    // Let the request expire, expiration timeout is 3 ms
-    Thread.sleep(100);
-
-    // Verify that all the request that were created as ANY_HOST host
-    // and all created requests expired
-    assertEquals(state.preferredHostRequests.get(), 0);
-    // Atleast 2 requests should expire & 2 ANY_HOST requests should be 
generated
-    assertTrue(state.anyHostRequests.get() >= 4);
-    assertTrue(state.expiredAnyHostRequests.get() >= 2);
-
-    verify(spyAllocator, atLeastOnce()).handleExpiredRequest(eq("0"), 
eq(ResourceRequestState.ANY_HOST),
-        any(SamzaResourceRequest.class));
-    verify(spyAllocator, atLeastOnce()).handleExpiredRequest(eq("1"), 
eq(ResourceRequestState.ANY_HOST),
-        any(SamzaResourceRequest.class));
-
-    // Verify that preferred host request were cancelled and since no surplus 
resources were available
-    // requestResource was invoked with ANY_HOST requests
-    ArgumentCaptor<SamzaResourceRequest> cancelledRequestCaptor = 
ArgumentCaptor.forClass(SamzaResourceRequest.class);
-    // At least 2 preferred host requests were cancelled
-    verify(spyManager, 
atLeast(2)).cancelResourceRequest(cancelledRequestCaptor.capture());
-    // Verify all the request cancelled were ANY_HOST
-    assertTrue(cancelledRequestCaptor.getAllValues()
-        .stream()
-        .map(resourceRequest -> resourceRequest.getPreferredHost())
-        .collect(Collectors.toSet())
-        .size() == 1);
-    containerAllocator.stop();
-
-  }
-
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 1ee68aa..f100393 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -70,7 +70,7 @@ public class TestContainerProcessManager {
       put("cluster-manager.container.retry.count", "1");
       put("cluster-manager.container.retry.window.ms", "1999999999");
       put("cluster-manager.allocator.sleep.ms", "1");
-      put("cluster-manager.container.request.timeout.ms", "100");
+      put("cluster-manager.container.request.timeout.ms", "2");
       put("cluster-manager.container.memory.mb", "512");
       put("yarn.package.path", "/foo");
       put("task.inputs", "test-system.test-stream");

Reply via email to