This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 6615711 SAMZA-2248: Fix AM bookkeeping on receiving dead container
notifications (#1078)
6615711 is described below
commit 6615711028721d81c1765392d9e06a43396a3ede
Author: xinyuiscool <[email protected]>
AuthorDate: Fri Jun 28 14:31:36 2019 -0700
SAMZA-2248: Fix AM bookkeeping on receiving dead container notifications
(#1078)
---
.../clustermanager/AbstractContainerAllocator.java | 8 ++++++++
.../clustermanager/ContainerProcessManager.java | 6 ++++++
.../samza/clustermanager/ResourceRequestState.java | 20 ++++++++++++++++++
.../clustermanager/TestContainerRequestState.java | 24 ++++++++++++++++++++++
4 files changed, 58 insertions(+)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
index 90d5e5d..c5caa21 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
@@ -260,6 +260,14 @@ public abstract class AbstractContainerAllocator
implements Runnable {
}
/**
+ * Releases a single resource based on containerId.
+ * @param containerId container ID
+ */
+ public final void releaseResource(String containerId) {
+ resourceRequestState.releaseResource(containerId);
+ }
+
+ /**
* Stops the Allocator. Setting this flag to false exits the allocator loop.
*/
public void stop() {
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index b2cf6b9..f4e9b13 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -321,6 +321,12 @@ public class ContainerProcessManager implements
ClusterResourceManager.Callback
if (processorId == null) {
log.info("No running Processor ID found for Container ID: {} with
Status: {}. Ignoring redundant notification.", containerId,
resourceStatus.toString());
state.redundantNotifications.incrementAndGet();
+
+ if (resourceStatus.getExitCode() != SamzaResourceStatus.SUCCESS) {
+ // the requested container failed before assigning the request to it.
+ // Remove from the buffer if it is there
+ containerAllocator.releaseResource(containerId);
+ }
return;
}
state.runningProcessors.remove(processorId);
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
index 0cbcd5f..8da3337 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ResourceRequestState.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.clustermanager;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -230,6 +231,25 @@ public class ResourceRequestState {
}
/**
+ * Releases a single resource based on containerId.
+ * @param containerId Yarn container ID
+ */
+ public void releaseResource(String containerId) {
+ if (StringUtils.isEmpty(containerId)) {
+ log.warn("ContainerId is not specified");
+ return;
+ }
+
+ synchronized (lock) {
+ allocatedResources.values().forEach(resources -> {
+ if (resources != null) {
+ resources.removeIf(r -> containerId.equals(r.getContainerId()));
+ }
+ });
+ }
+ }
+
+ /**
* Releases a container that was allocated and assigned but could not be
started.
* e.g. because of a ConnectException while trying to communicate with the
NM.
* This method assumes the specified container and associated request have
already
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
index c1193b9..caae4a7 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerRequestState.java
@@ -193,4 +193,28 @@ public class TestContainerRequestState {
}
+ @Test
+ public void testReleaseResource() {
+ // Host-affinity is enabled
+ ResourceRequestState state = new ResourceRequestState(true, manager);
+
+ SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc",
"0");
+ SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, "def",
"0");
+ state.addResourceRequest(request);
+ state.addResourceRequest(request1);
+
+ SamzaResource container = new SamzaResource(1, 1024, "abc", "id0");
+ SamzaResource container1 = new SamzaResource(1, 1024, ANY_HOST, "id1");
+ state.addResource(container);
+ state.addResource(container1);
+
+ state.releaseResource("id0");
+ assertEquals(0, state.getResourcesOnAHost("abc").size());
+ assertEquals(1, state.getResourcesOnAHost(ANY_HOST).size());
+
+ state.releaseResource("id1");
+ assertEquals(0, state.getResourcesOnAHost("abc").size());
+ assertEquals(0, state.getResourcesOnAHost(ANY_HOST).size());
+
+ }
}