This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6615711  SAMZA-2248: Fix AM bookkeeping on receiving dead container 
notifications (#1078)
6615711 is described below

commit 6615711028721d81c1765392d9e06a43396a3ede
Author: xinyuiscool <[email protected]>
AuthorDate: Fri Jun 28 14:31:36 2019 -0700

    SAMZA-2248: Fix AM bookkeeping on receiving dead container notifications 
(#1078)
---
 .../clustermanager/AbstractContainerAllocator.java |  8 ++++++++
 .../clustermanager/ContainerProcessManager.java    |  6 ++++++
 .../samza/clustermanager/ResourceRequestState.java | 20 ++++++++++++++++++
 .../clustermanager/TestContainerRequestState.java  | 24 ++++++++++++++++++++++
 4 files changed, 58 insertions(+)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index 90d5e5d..c5caa21 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -260,6 +260,14 @@ public abstract class AbstractContainerAllocator 
implements Runnable {
   }
 
   /**
+   * Releases a single resource based on containerId.
+   * @param containerId container ID
+   */
+  public final void releaseResource(String containerId) {
+    resourceRequestState.releaseResource(containerId);
+  }
+
+  /**
    * Stops the Allocator. Setting this flag to false exits the allocator loop.
    */
   public void stop() {
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index b2cf6b9..f4e9b13 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -321,6 +321,12 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     if (processorId == null) {
       log.info("No running Processor ID found for Container ID: {} with 
Status: {}. Ignoring redundant notification.", containerId, 
resourceStatus.toString());
       state.redundantNotifications.incrementAndGet();
+
+      if (resourceStatus.getExitCode() != SamzaResourceStatus.SUCCESS) {
+        // the requested container failed before assigning the request to it.
+        // Remove from the buffer if it is there
+        containerAllocator.releaseResource(containerId);
+      }
       return;
     }
     state.runningProcessors.remove(processorId);
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index 0cbcd5f..8da3337 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.clustermanager;
 
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -230,6 +231,25 @@ public class ResourceRequestState {
   }
 
   /**
+   * Releases a single resource based on containerId.
+   * @param containerId Yarn container ID
+   */
+  public void releaseResource(String containerId) {
+    if (StringUtils.isEmpty(containerId)) {
+      log.warn("ContainerId is not specified");
+      return;
+    }
+
+    synchronized (lock) {
+      allocatedResources.values().forEach(resources -> {
+          if (resources != null) {
+            resources.removeIf(r -> containerId.equals(r.getContainerId()));
+          }
+        });
+    }
+  }
+
+  /**
    * Releases a container that was allocated and assigned but could not be 
started.
    * e.g. because of a ConnectException while trying to communicate with the 
NM.
    * This method assumes the specified container and associated request have 
already
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
index c1193b9..caae4a7 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
@@ -193,4 +193,28 @@ public class TestContainerRequestState {
 
   }
 
+  @Test
+  public void testReleaseResource() {
+    // Host-affinity is enabled
+    ResourceRequestState state = new ResourceRequestState(true, manager);
+
+    SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", 
"0");
+    SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, "def", 
"0");
+    state.addResourceRequest(request);
+    state.addResourceRequest(request1);
+
+    SamzaResource container = new SamzaResource(1, 1024, "abc", "id0");
+    SamzaResource container1 = new SamzaResource(1, 1024, ANY_HOST, "id1");
+    state.addResource(container);
+    state.addResource(container1);
+
+    state.releaseResource("id0");
+    assertEquals(0, state.getResourcesOnAHost("abc").size());
+    assertEquals(1, state.getResourcesOnAHost(ANY_HOST).size());
+
+    state.releaseResource("id1");
+    assertEquals(0, state.getResourcesOnAHost("abc").size());
+    assertEquals(0, state.getResourcesOnAHost(ANY_HOST).size());
+
+  }
 }

Reply via email to