This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d022167  SAMZA-2378: Container Placements support for Standby 
containers enabled jobs (#1281)
d022167 is described below

commit d02216726e03695096b680f0f5bff9c0d6ca2e6c
Author: Sanil Jain <[email protected]>
AuthorDate: Fri Feb 28 14:37:45 2020 -0800

    SAMZA-2378: Container Placements support for Standby containers enabled 
jobs (#1281)
    
    Container Placements support for Standby containers enabled jobs
---
 .../samza/clustermanager/ContainerManager.java     |  97 ++++++++---
 .../clustermanager/ContainerProcessManager.java    |   2 +-
 .../clustermanager/SamzaApplicationState.java      |   5 +
 .../clustermanager/StandbyContainerManager.java    |  42 +++--
 .../metrics/ContainerProcessManagerMetrics.scala   |   2 +
 .../TestContainerPlacementActions.java             | 178 ++++++++++++++++++++-
 .../samza/clustermanager/TestStandbyAllocator.java |  12 +-
 7 files changed, 296 insertions(+), 42 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index c924447..3e3a060 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -48,8 +48,7 @@ import org.slf4j.LoggerFactory;
  * ContainerManager encapsulates logic and state related to container 
placement actions like move, restarts for active container
  * if issued externally.
  *
- * TODO SAMZA-2378: Container Placements for Standby containers enabled jobs
- *      SAMZA-2379: Container Placements for job running in degraded state
+ * TODO SAMZA-2379: Container Placements for job running in degraded state
  */
 public class ContainerManager {
 
@@ -136,7 +135,22 @@ public class ContainerManager {
         LOG.info("Waiting for running container to shutdown due to existing 
ContainerPlacement action {}", actionMetaData);
         return false;
       } else if (actionStatus == 
ContainerPlacementMetadata.ContainerStatus.STOPPED) {
-        allocator.runStreamProcessor(request, preferredHost);
+        // If the job has standby containers enabled, always check standby 
constraints before issuing a start on container
+        // Note: Always check constraints against allocated resource, since 
preferred host can be ANY_HOST as well
+        if (standbyContainerManager.isPresent() && 
!standbyContainerManager.get().checkStandbyConstraints(request.getProcessorId(),
 allocatedResource.getHost())) {
+          LOG.info(
+              "Starting container {} on host {} does not meet standby 
constraints, falling back to source host placement metadata: {}",
+              request.getProcessorId(), preferredHost, actionMetaData);
+          // Release unstartable container
+          standbyContainerManager.get().releaseUnstartableContainer(request, 
allocatedResource, preferredHost, resourceRequestState);
+          // Fallback to source host since the new allocated resource does not 
meet standby constraints
+          allocator.requestResource(processorId, 
actionMetaData.getSourceHost());
+          markContainerPlacementActionFailed(actionMetaData,
+              String.format("allocated resource %s does not meet standby 
constraints now, falling back to source host", allocatedResource));
+        } else {
+          LOG.info("Status updated for ContainerPlacement action: ", 
actionMetaData);
+          allocator.runStreamProcessor(request, preferredHost);
+        }
         return true;
       }
     }
@@ -225,13 +239,13 @@ public class ContainerManager {
    *
    * @param processorId logical processor id of container 0,1,2
    */
-  void handleContainerLaunchSuccess(String processorId) {
+  void handleContainerLaunchSuccess(String processorId, String containerHost) {
     if (hasActiveContainerPlacementAction(processorId)) {
       ContainerPlacementMetadata metadata = 
getPlacementActionMetadata(processorId).get();
       // Mark the active container running again and dispatch a response
       
metadata.setContainerStatus(ContainerPlacementMetadata.ContainerStatus.RUNNING);
       updateContainerPlacementActionStatus(metadata, 
ContainerPlacementMessage.StatusCode.SUCCEEDED,
-          "Successfully completed the container placement action");
+          "Successfully completed the container placement action started 
container on host " + containerHost);
     }
   }
 
@@ -296,7 +310,6 @@ public class ContainerManager {
    *
    * When host affinity is disabled a move / restart is only allowed on 
ANY_HOST
    * When host affinity is enabled move / restart is allowed on specific or 
ANY_HOST
-   * TODO: SAMZA-2378: Container Placements for Standby containers enabled jobs
    *
    * Container placement requests are tied to deploymentId which is currently 
{@link org.apache.samza.config.ApplicationConfig#APP_RUN_ID}
    * On job restarts container placement requests queued for the previous 
deployment are deleted using this
@@ -313,14 +326,15 @@ public class ContainerManager {
     if (!deQueueAction(requestMessage)) {
       return;
     }
-    Pair<ContainerPlacementMessage.StatusCode, String> actionStatus = 
validatePlacementAction(requestMessage);
     LOG.info("ContainerPlacement action is de-queued metadata: {}", 
requestMessage);
+    Pair<ContainerPlacementMessage.StatusCode, String> actionStatus = 
validatePlacementAction(requestMessage);
     // Action is de-queued upon so we record it in the cache
     placementRequestsCache.put(requestMessage.getUuid());
     // Remove the request message from metastore since this message is already 
acted upon
     
containerPlacementMetadataStore.deleteContainerPlacementRequestMessage(requestMessage.getUuid());
     // Request is bad just update the response on message & return
     if (actionStatus.getKey() == 
ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
+      LOG.info("Status updated for ContainerPlacement action request: {} 
response: {}", requestMessage, actionStatus.getValue());
       writeContainerPlacementResponseMessage(requestMessage, 
actionStatus.getKey(), actionStatus.getValue());
       return;
     }
@@ -335,14 +349,17 @@ public class ContainerManager {
       destinationHost = ANY_HOST;
     }
 
-    SamzaResourceRequest resourceRequest = 
containerAllocator.getResourceRequest(processorId, destinationHost);
+    // Register metadata
     ContainerPlacementMetadata actionMetaData = new 
ContainerPlacementMetadata(requestMessage, currentResource.getHost());
+    actions.put(processorId, actionMetaData);
+
+    SamzaResourceRequest resourceRequest = 
containerAllocator.getResourceRequest(processorId, destinationHost);
     // Record the resource request for monitoring
     actionMetaData.recordResourceRequest(resourceRequest);
     actions.put(processorId, actionMetaData);
     updateContainerPlacementActionStatus(actionMetaData, 
ContainerPlacementMessage.StatusCode.IN_PROGRESS, "Preferred Resources 
requested");
     containerAllocator.issueResourceRequest(resourceRequest);
-    LOG.info("Issued resource request for preferred resources for 
ContainerPlacement action: {}", actionMetaData);
+
   }
 
   /**
@@ -370,6 +387,7 @@ public class ContainerManager {
   }
 
   private void markContainerPlacementActionFailed(ContainerPlacementMetadata 
metaData, String failureMessage) {
+    samzaApplicationState.failedContainerPlacementActions.incrementAndGet();
     updateContainerPlacementActionStatus(metaData, 
ContainerPlacementMessage.StatusCode.FAILED, failureMessage);
   }
 
@@ -412,16 +430,17 @@ public class ContainerManager {
   }
 
   /**
-   * If there is an existing inflight request or container is pending a start, 
the container placement action shall wait
-   * until this in-flight action is complete
+   * These are specific scenarios in which a placement action should wait for 
existing action to complete before it is executed
+   * 1. If there is an placement request in progress on active container
+   * 2. If there is an placement request is progress on any of its standby 
container
+   * 3. If the container itself is pending a start
    *
    * @param requestMessage container placement request message
    * @return true if action should be taken right now, false if it needs to 
wait to be taken in future
    */
   private boolean deQueueAction(ContainerPlacementRequestMessage 
requestMessage) {
     // Do not dequeue action wait for the in-flight action to complete
-    if (hasActiveContainerPlacementAction(requestMessage.getProcessorId())) {
-      LOG.info("ContainerPlacement request: {} is en-queued because container 
has an in-progress placement action", requestMessage);
+    if 
(checkIfActiveOrStandbyContainerHasActivePlacementAction(requestMessage)) {
       return false;
     }
     // Do not dequeue the action wait for the container to come to a running 
state
@@ -435,32 +454,68 @@ public class ContainerManager {
 
   /**
    * A valid container placement action needs a valid processor id. Duplicate 
actions are handled by de-duping on uuid.
+   * If standby containers are enabled destination host requested must meet 
standby constraints
    *
    * @param requestMessage container placement request message
    * @return Pair<ContainerPlacementMessage.StatusCode, String> which is 
status code & response suggesting if the request is valid
    */
   private Pair<ContainerPlacementMessage.StatusCode, String> 
validatePlacementAction(ContainerPlacementRequestMessage requestMessage) {
-    String errorMessagePrefix = String.format("ContainerPlacement request: %s 
is rejected due to", requestMessage);
+    String errorMessagePrefix = 
ContainerPlacementMessage.StatusCode.BAD_REQUEST + " reason: %s";
     Boolean invalidAction = false;
     String errorMessage = null;
-    if (standbyContainerManager.isPresent()) {
-      errorMessage = String.format("%s not supported for hot standby enabled", 
errorMessagePrefix);
+    if 
(!samzaApplicationState.runningProcessors.containsKey(requestMessage.getProcessorId())
 &&
+        
!samzaApplicationState.pendingProcessors.containsKey(requestMessage.getProcessorId())
+    ) {
+      errorMessage = String.format(errorMessagePrefix, "invalid processor id 
neither in running or pending processors");
       invalidAction = true;
     } else if (placementRequestsCache.containsKey(requestMessage.getUuid())) {
-      errorMessage = String.format("%s duplicate UUID of the request, please 
retry", errorMessagePrefix);
+      errorMessage = String.format(errorMessagePrefix, "duplicate UUID of the 
request, please retry");
       invalidAction = true;
-    } else if (Integer.parseInt(requestMessage.getProcessorId()) >= 
samzaApplicationState.processorCount.get()
-    ) {
-      errorMessage = String.format("%s invalid processor id", 
errorMessagePrefix);
+    } else if (standbyContainerManager.isPresent() && 
!standbyContainerManager.get()
+        .checkStandbyConstraints(requestMessage.getProcessorId(), 
requestMessage.getDestinationHost())) {
+      errorMessage = String.format(errorMessagePrefix, "destination host does 
not meet standby constraints");
       invalidAction = true;
     }
 
     if (invalidAction) {
-      LOG.info(errorMessage);
       return new 
ImmutablePair<>(ContainerPlacementMessage.StatusCode.BAD_REQUEST, errorMessage);
     }
 
     return new ImmutablePair<>(ContainerPlacementMessage.StatusCode.ACCEPTED, 
"Request is accepted");
   }
 
+  /**
+   * Checks if there are any active container placement action on container 
itself or on any of its standby containers
+   * (if enabled)
+   */
+  private boolean 
checkIfActiveOrStandbyContainerHasActivePlacementAction(ContainerPlacementRequestMessage
 request) {
+    String processorId = request.getProcessorId();
+    // Container itself has active container placement actions
+    if (hasActiveContainerPlacementAction(processorId)) {
+      LOG.info("ContainerPlacement request: {} is en-queued because container 
has an in-progress placement action", request);
+      return true;
+    }
+
+    if (standbyContainerManager.isPresent()) {
+      // If requested placement action is on a standby container and its 
active container has a placement request,
+      // this request shall not be de-queued until in-flight action on active 
container is complete
+      if (StandbyTaskUtil.isStandbyContainer(processorId) && 
hasActiveContainerPlacementAction(
+          StandbyTaskUtil.getActiveContainerId(processorId))) {
+        LOG.info("ContainerPlacement request: {} is en-queued because its 
active container has an in-progress placement action", request);
+        return true;
+      }
+      // If requested placement action is on a standby container and its 
active container has a placement request,
+      // this request shall not be de-queued until in-flight action on active 
container is complete
+      if (!StandbyTaskUtil.isStandbyContainer(processorId)) {
+        for (String standby : 
standbyContainerManager.get().getStandbyList(processorId)) {
+          if (hasActiveContainerPlacementAction(standby)) {
+            LOG.info("ContainerPlacement request: {} is en-queued because one 
of its standby replica has an in-progress placement action", request);
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 7f99159..1535b2d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -405,7 +405,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
       if (state.neededProcessors.decrementAndGet() == 0) {
         state.jobHealthy.set(true);
       }
-      containerManager.handleContainerLaunchSuccess(processorId);
+      containerManager.handleContainerLaunchSuccess(processorId, 
containerHost);
     } else {
       LOG.warn("Did not find a pending Processor ID for Container ID: {} on 
host: {}. " +
           "Ignoring invalid/redundant notification.", containerId, 
containerHost);
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index ef09da1..c3c935d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -160,6 +160,11 @@ public class SamzaApplicationState {
   */
   public final AtomicInteger failoversToAnyHost = new AtomicInteger(0);
 
+  /**
+   * Number of occurrences of failed container placement actions
+   */
+  public final AtomicInteger failedContainerPlacementActions = new 
AtomicInteger(0);
+
   public SamzaApplicationState(JobModelManager jobModelManager) {
     this.jobModelManager = jobModelManager;
   }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index 7ea429d..a9d298d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -332,14 +332,12 @@ public class StandbyContainerManager {
   /**
    * Check if matching this SamzaResourceRequest to the given resource, meets 
all standby-container container constraints.
    *
-   * @param request The resource request to match.
-   * @param samzaResource The samzaResource to potentially match the resource 
to.
+   * @param containerIdToStart logical id of the container to start
+   * @param host potential host to start the container on
    * @return
    */
-  private boolean checkStandbyConstraints(SamzaResourceRequest request, 
SamzaResource samzaResource) {
-    String containerIDToStart = request.getProcessorId();
-    String host = samzaResource.getHost();
-    List<String> containerIDsForStandbyConstraints = 
this.standbyContainerConstraints.get(containerIDToStart);
+  boolean checkStandbyConstraints(String containerIdToStart, String host) {
+    List<String> containerIDsForStandbyConstraints = 
this.standbyContainerConstraints.get(containerIdToStart);
 
     // Check if any of these conflicting containers are running/launching on 
host
     for (String containerID : containerIDsForStandbyConstraints) {
@@ -348,7 +346,7 @@ public class StandbyContainerManager {
       // return false if a conflicting container is pending for launch on the 
host
       if (resource != null && resource.getHost().equals(host)) {
         log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
-            containerIDToStart, samzaResource.getHost(), containerID);
+            containerIdToStart, host, containerID);
         return false;
       }
 
@@ -356,7 +354,7 @@ public class StandbyContainerManager {
       resource = samzaApplicationState.runningProcessors.get(containerID);
       if (resource != null && resource.getHost().equals(host)) {
         log.info("Container {} cannot be started on host {} because container 
{} is already running on this host",
-            containerIDToStart, samzaResource.getHost(), containerID);
+            containerIdToStart, host, containerID);
         return false;
       }
     }
@@ -375,7 +373,7 @@ public class StandbyContainerManager {
       ResourceRequestState resourceRequestState) {
     String containerID = request.getProcessorId();
 
-    if (checkStandbyConstraints(request, samzaResource)) {
+    if (checkStandbyConstraints(containerID, samzaResource.getHost())) {
       // This resource can be used to launch this container
       log.info("Running container {} on {} meets standby constraints, 
preferredHost = {}", containerID,
           samzaResource.getHost(), preferredHost);
@@ -385,8 +383,7 @@ public class StandbyContainerManager {
       log.info(
           "Running standby container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource, and making a new 
ANY_HOST request",
           containerID, samzaResource.getHost());
-      resourceRequestState.releaseUnstartableContainer(samzaResource, 
preferredHost);
-      resourceRequestState.cancelResourceRequest(request);
+      releaseUnstartableContainer(request, samzaResource, preferredHost, 
resourceRequestState);
       containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
       samzaApplicationState.failedStandbyAllocations.incrementAndGet();
     } else {
@@ -394,9 +391,7 @@ public class StandbyContainerManager {
       log.warn(
           "Running active container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource",
           containerID, samzaResource.getHost());
-      resourceRequestState.releaseUnstartableContainer(samzaResource, 
preferredHost);
-      resourceRequestState.cancelResourceRequest(request);
-
+      releaseUnstartableContainer(request, samzaResource, preferredHost, 
resourceRequestState);
       Optional<FailoverMetadata> failoverMetadata = 
getFailoverMetadata(request);
       String lastKnownResourceID =
           failoverMetadata.isPresent() ? 
failoverMetadata.get().activeContainerResourceID : "unknown-" + containerID;
@@ -425,6 +420,25 @@ public class StandbyContainerManager {
     }
   }
 
+  /**
+   * Fetches a list of standby container for an active container
+   * @param activeContainerId logical id of the container ex: 0,1,2
+   * @return list of standby containers ex: for active container 0: {0-0, 0-1}
+   */
+  List<String> getStandbyList(String activeContainerId) {
+    return this.standbyContainerConstraints.get(activeContainerId);
+  }
+
+  /**
+   * Release un-startable resources immediately and deletes requests 
corresponsing to it
+   */
+  void releaseUnstartableContainer(SamzaResourceRequest request, SamzaResource 
resource, String preferredHost,
+      ResourceRequestState resourceRequestState) {
+    resourceRequestState.releaseUnstartableContainer(resource, preferredHost);
+    resourceRequestState.cancelResourceRequest(request);
+  }
+
+
   // Handle an expired resource request that was made for placing a standby 
container
   private void handleExpiredRequestForStandbyContainer(String containerID, 
SamzaResourceRequest request,
       Optional<SamzaResource> alternativeResource, ContainerAllocator 
containerAllocator,
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
index 7bde882..91fec28 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala
@@ -60,6 +60,8 @@ class ContainerProcessManagerMetrics(val config: Config,
   val mFailoversToAnyHost = newGauge("failovers-to-any-host", () => 
state.failoversToAnyHost.get())
   val mFailoversToStandby = newGauge("failovers-to-standby", () => 
state.failoversToStandby.get())
 
+  val mFailedContainerPlacementActions = 
newGauge("failed-container-placements-actions", () => 
state.failedContainerPlacementActions.get())
+
   val mContainerMemoryMb = newGauge("container-memory-mb", () => 
clusterManagerConfig.getContainerMemoryMb)
   val mContainerCpuCores = newGauge("container-cpu-cores", () => 
clusterManagerConfig.getNumCores)
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index b433cba..9497f8e 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import org.apache.commons.lang3.RandomStringUtils;
 import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator;
@@ -45,6 +46,7 @@ import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
@@ -83,6 +85,7 @@ public class TestContainerPlacementActions {
       put("job.name", "test-job");
       put("job.coordinator.system", "test-kafka");
       put("app.run.id", "appAttempt-001");
+      put("job.standbytasks.replication.factor", "2");
     }
   };
 
@@ -126,6 +129,19 @@ public class TestContainerPlacementActions {
         mockLocalityManager, this.server);
   }
 
+  private JobModelManager 
getJobModelManagerWithHostAffinityWithStandby(Map<String, String> 
containerIdToHost) {
+    Map<String, Map<String, String>> localityMap = new HashMap<>();
+    containerIdToHost.forEach((containerId, host) -> {
+        localityMap.put(containerId,
+            ImmutableMap.of(SetContainerHostMapping.HOST_KEY, 
containerIdToHost.get(containerId)));
+      });
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
+    // Generate JobModel for standby containers
+    JobModel standbyJobModel = TestStandbyAllocator.getJobModelWithStandby(2, 
2, 2, Optional.of(mockLocalityManager));
+    return new JobModelManager(standbyJobModel, server, null);
+  }
+
   @Before
   public void setup() throws Exception {
     server = new MockHttpServer("/", 7777, null, new 
ServletHolder(DefaultServlet.class));
@@ -148,8 +164,20 @@ public class TestContainerPlacementActions {
             clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager);
   }
 
+  public void setupStandby() throws Exception {
+    state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinityWithStandby(ImmutableMap.of("0",
 "host-1", "1", "host-2", "0-0", "host-2", "1-0", "host-1")));
+    callback = mock(ClusterResourceManager.Callback.class);
+    MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
+    // Enable standby
+    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, true));
+    allocatorWithHostAffinity = new 
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, 
containerManager);
+    cpm = new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(),
+        clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager);
+  }
+
   @Test(timeout = 10000)
-  public void testContainerSuccessfulMoveAction() throws Exception {
+  public void testContainerSuccessfulMoveActionWithoutStandby() throws 
Exception {
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
         new Thread(() -> {
@@ -676,6 +704,154 @@ public class TestContainerPlacementActions {
     assertBadRequests("2", "host8", containerManager, 
allocatorWithHostAffinity);
   }
 
+
+  @Test(timeout = 30000)
+  public void testContainerSuccessfulMoveActionWithStandbyEnabled() throws 
Exception {
+    // Setup standby for job
+    setupStandby();
+
+    // Spawn a Request Allocator Thread
+    Thread requestAllocatorThread = new Thread(
+        new 
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new 
ApplicationConfig(config)),
+        "ContainerPlacement Request Allocator Thread");
+    requestAllocatorThread.start();
+
+    doAnswer(new Answer<Void>() {
+      public Void answer(InvocationOnMock invocation) {
+        new Thread(() -> {
+            Object[] args = invocation.getArguments();
+            cpm.onResourcesAvailable((List<SamzaResource>) args[0]);
+          }, "AMRMClientAsync").start();
+        return null;
+      }
+    }).when(callback).onResourcesAvailable(anyList());
+
+    doAnswer(new Answer<Void>() {
+      public Void answer(InvocationOnMock invocation) {
+        new Thread(() -> {
+            Object[] args = invocation.getArguments();
+            cpm.onStreamProcessorLaunchSuccess((SamzaResource) args[0]);
+          }, "AMRMClientAsync").start();
+        return null;
+      }
+    }).when(callback).onStreamProcessorLaunchSuccess(any());
+
+    doAnswer(new Answer<Void>() {
+      public Void answer(InvocationOnMock invocation) {
+        new Thread(() -> {
+            Object[] args = invocation.getArguments();
+            cpm.onResourcesCompleted((List<SamzaResourceStatus>) args[0]);
+          }, "AMRMClientAsync").start();
+          return null;
+      }
+    }).when(callback).onResourcesCompleted(anyList());
+
+    cpm.start();
+
+    if (!allocatorWithHostAffinity.awaitContainersStart(4, 4, 
TimeUnit.SECONDS)) {
+      fail("timed out waiting for the containers to start");
+    }
+
+    while (state.runningProcessors.size() != 4) {
+      Thread.sleep(100);
+    }
+
+    // First running state of the app
+    Consumer<SamzaApplicationState> stateCheck = (SamzaApplicationState state) 
-> {
+      assertEquals(4, state.runningProcessors.size());
+      assertEquals("host-1", state.runningProcessors.get("0").getHost());
+      assertEquals("host-2", state.runningProcessors.get("1").getHost());
+      assertEquals("host-2", state.runningProcessors.get("0-0").getHost());
+      assertEquals("host-1", state.runningProcessors.get("1-0").getHost());
+      assertEquals(4, state.preferredHostRequests.get());
+      assertEquals(0, state.failedStandbyAllocations.get());
+      assertEquals(0, state.anyHostRequests.get());
+    };
+    // Invoke a state check
+    stateCheck.accept(state);
+
+    // Initiate a bad container placement action to move a standby to its 
active host and vice versa
+    // which should fail because this violates standby constraints
+    UUID badRequest1 = 
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001",
 "0-0", "host-1",
+        null, System.currentTimeMillis());
+
+    UUID badRequest2 = 
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001",
 "0", "host-2",
+        null, System.currentTimeMillis() + 100);
+
+    // Wait for the ControlActions to complete
+    while (true) {
+      if 
(containerPlacementMetadataStore.readContainerPlacementResponseMessage(badRequest2).isPresent()
 &&
+          
containerPlacementMetadataStore.readContainerPlacementResponseMessage(badRequest2).get().getStatusCode()
+              == ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
+        break;
+      }
+      Thread.sleep(Duration.ofSeconds(5).toMillis());
+    }
+
+    // App running state should remain the same
+    stateCheck.accept(state);
+
+    Optional<ContainerPlacementResponseMessage> responseMessageMove1 =
+        
containerPlacementMetadataStore.readContainerPlacementResponseMessage(badRequest1);
+    Optional<ContainerPlacementResponseMessage> responseMessageMove2 =
+        
containerPlacementMetadataStore.readContainerPlacementResponseMessage(badRequest2);
+
+    // Assert that both the requests were bad
+    assertTrue(responseMessageMove1.isPresent());
+    assertEquals(responseMessageMove1.get().getStatusCode(), 
ContainerPlacementMessage.StatusCode.BAD_REQUEST);
+    assertTrue(responseMessageMove2.isPresent());
+    assertEquals(responseMessageMove2.get().getStatusCode(), 
ContainerPlacementMessage.StatusCode.BAD_REQUEST);
+
+
+    // Initiate a standby failover which is supposed to be done in two steps
+    // Step 1. Move the standby container to any other host: move 0-0 to say 
host-3
+    // Step 2. Move the active container to the standby's host: move 0 to 
host-1
+
+    // Action will get executed first
+    UUID standbyMoveRequest =
+        
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001",
 "0-0", "host-3",
+            null, System.currentTimeMillis());
+    // Action will get executed when standbyMoveRequest move request is 
complete
+    UUID activeMoveRequest = 
containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001",
 "0", "host-2", null,
+            System.currentTimeMillis() + 100);
+
+    // Wait for the ControlActions to complete
+    while (true) {
+      if 
(containerPlacementMetadataStore.readContainerPlacementResponseMessage(activeMoveRequest).isPresent()
 &&
+          
containerPlacementMetadataStore.readContainerPlacementResponseMessage(activeMoveRequest).get().getStatusCode()
+              == ContainerPlacementMessage.StatusCode.SUCCEEDED) {
+        break;
+      }
+      Thread.sleep(Duration.ofSeconds(5).toMillis());
+    }
+
+    assertEquals(4, state.runningProcessors.size());
+    assertEquals("host-2", state.runningProcessors.get("0").getHost());
+    assertEquals("host-2", state.runningProcessors.get("1").getHost());
+    assertEquals("host-3", state.runningProcessors.get("0-0").getHost());
+    assertEquals("host-1", state.runningProcessors.get("1-0").getHost());
+    assertEquals(6, state.preferredHostRequests.get());
+    assertEquals(0, state.failedStandbyAllocations.get());
+    assertEquals(0, state.anyHostRequests.get());
+
+
+    Optional<ContainerPlacementResponseMessage> responseStandbyMove =
+        
containerPlacementMetadataStore.readContainerPlacementResponseMessage(standbyMoveRequest);
+
+    Optional<ContainerPlacementResponseMessage> responseActiveMove =
+        
containerPlacementMetadataStore.readContainerPlacementResponseMessage(activeMoveRequest);
+
+    assertTrue(responseStandbyMove.isPresent());
+    assertEquals(responseStandbyMove.get().getStatusCode(), 
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+
+    assertTrue(responseActiveMove.isPresent());
+    assertEquals(responseActiveMove.get().getStatusCode(), 
ContainerPlacementMessage.StatusCode.SUCCEEDED);
+
+    // Request should be deleted as soon as ita accepted / being acted upon
+    
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(standbyMoveRequest).isPresent());
+    
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(activeMoveRequest).isPresent());
+  }
+
   private void assertResponseMessage(ContainerPlacementResponseMessage 
responseMessage,
       ContainerPlacementRequestMessage requestMessage) {
     assertEquals(responseMessage.getProcessorId(), 
requestMessage.getProcessorId());
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
index c075f14..459f39d 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
@@ -22,8 +22,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import org.apache.samza.Partition;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
@@ -38,7 +40,7 @@ public class TestStandbyAllocator {
 
   @Test
   public void testWithNoStandby() {
-    JobModel jobModel = getJobModelWithStandby(1, 1, 1);
+    JobModel jobModel = getJobModelWithStandby(1, 1, 1, Optional.empty());
     List<String> containerConstraints = 
StandbyTaskUtil.getStandbyContainerConstraints("0", jobModel);
     Assert.assertEquals("Constrained container count should be 0", 0, 
containerConstraints.size());
   }
@@ -57,7 +59,7 @@ public class TestStandbyAllocator {
 
 
   public void testWithStandby(int nContainers, int nTasks, int 
replicationFactor) {
-    JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, 
replicationFactor);
+    JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, 
replicationFactor, Optional.empty());
 
     for (String containerID : jobModel.getContainers().keySet()) {
       List<String> containerConstraints = 
StandbyTaskUtil.getStandbyContainerConstraints(containerID, jobModel);
@@ -79,7 +81,7 @@ public class TestStandbyAllocator {
   }
 
   // Helper method to create a jobmodel with given number of containers, tasks 
and replication factor
-  private JobModel getJobModelWithStandby(int nContainers, int nTasks, int 
replicationFactor) {
+  public static JobModel getJobModelWithStandby(int nContainers, int nTasks, 
int replicationFactor, Optional<LocalityManager> localityManager) {
     Map<String, ContainerModel> containerModels = new HashMap<>();
     int taskID = 0;
 
@@ -102,7 +104,7 @@ public class TestStandbyAllocator {
     }
 
     containerModels.putAll(standbyContainerModels);
-    return new JobModel(new MapConfig(), containerModels);
+    return new JobModel(new MapConfig(), containerModels, 
localityManager.orElse(null));
   }
 
   // Helper method that creates a taskmodel with one input ssp
@@ -113,7 +115,7 @@ public class TestStandbyAllocator {
   }
 
   // Helper method to create standby-taskModels from active-taskModels
-  private Map<TaskName, TaskModel> getStandbyTasks(Map<TaskName, TaskModel> 
tasks, int replicaNum) {
+  private static Map<TaskName, TaskModel> getStandbyTasks(Map<TaskName, 
TaskModel> tasks, int replicaNum) {
     Map<TaskName, TaskModel> standbyTasks = new HashMap<>();
     tasks.forEach((taskName, taskModel) -> {
         TaskName standbyTaskName = 
StandbyTaskUtil.getStandbyTaskName(taskName, replicaNum);

Reply via email to