rmatharu commented on a change in pull request #952: Improved standby-aware 
container allocation for active-containers on job redeploys
URL: https://github.com/apache/samza/pull/952#discussion_r268030578
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
 ##########
 @@ -161,54 +161,72 @@ private void handleStandbyContainerStop(String 
standbyContainerID, String resour
     }
   }
 
-  /** Method to handle failover for an active container.
-   *  We try to find a standby for the active container, and issue a stop on 
it.
-   *  If we do not find a standby container, we simply issue an anyhost 
request to place it.
+  /** Method to handle standby-aware allocation for an active container.
+   *  We try to find a standby host for the active container, and issue a stop 
on any standby-containers running on it,
+   *  request resource to place the active on the standby's host, and one to 
place the standby elsewhere.
    *
-   * @param containerID the samzaContainerID of the active-container
+   * @param activeContainerID the samzaContainerID of the active-container
    * @param resourceID  the samza-resource-ID of the container when it failed 
(used to index failover-state)
    */
-  private void initiateActiveContainerFailover(String containerID, String 
resourceID,
+  private void initiateStandbyAwareAllocation(String activeContainerID, String 
resourceID,
       AbstractContainerAllocator containerAllocator) {
 
-    Optional<Entry<String, SamzaResource>> standbyContainer = 
this.selectStandby(containerID, resourceID);
+    String standbyHost = this.selectStandbyHost(activeContainerID, resourceID);
 
-    // If we find a standbyContainer, we initiate a failover
-    if (standbyContainer.isPresent()) {
+    // Check if there is a running standby-container on that host that needs 
to be stopped
+    List<String> standbyContainers = 
this.standbyContainerConstraints.get(activeContainerID);
+    Map<String, SamzaResource> runningStandbyContainersOnHost = 
this.samzaApplicationState.runningContainers.entrySet().stream().filter(x -> 
standbyContainers.contains(x.getKey()))
+        .filter(x -> 
x.getValue().getHost().equals(standbyHost)).collect(Collectors.toMap(Map.Entry::getKey,
 Map.Entry::getValue));
 
-      String standbyContainerId = standbyContainer.get().getKey();
-      SamzaResource standbyResource = standbyContainer.get().getValue();
-      String standbyResourceID = standbyResource.getResourceID();
-      String standbyHost = standbyResource.getHost();
+    // if the standbyHost returned is anyhost, we proceed with the request 
directly
+    if (standbyHost.equals(ResourceRequestState.ANY_HOST)) {
+      log.info("No standby container found for active container {}, making a 
resource-request for placing {} on {}", activeContainerID, activeContainerID, 
ResourceRequestState.ANY_HOST);
+      samzaApplicationState.failoversToAnyHost.incrementAndGet();
+      containerAllocator.requestResource(activeContainerID, 
ResourceRequestState.ANY_HOST);
+
+    } else if (runningStandbyContainersOnHost.isEmpty()) {
+      // if there are no running standby-containers on the standbyHost,  we 
proceed to directly to make a resource request
 
-      // update the state
-      FailoverMetadata failoverMetadata = 
this.registerActiveContainerFailure(containerID, resourceID);
-      failoverMetadata.updateStandbyContainer(standbyResourceID, standbyHost);
+      log.info("No running standby container to stop on host {}, making a 
resource-request for placing {} on {}", standbyHost, activeContainerID, 
standbyHost);
+      FailoverMetadata failoverMetadata = 
this.registerActiveContainerFailure(activeContainerID, resourceID);
 
-      log.info("Initiating failover and stopping standby container, found 
standbyContainer {} = resource {}, "
-          + "for active container {}", standbyContainerId, standbyResourceID, 
containerID);
+      // record the resource request, before issuing it to avoid race with 
allocation-thread
+      SamzaResourceRequest resourceRequestForActive = 
containerAllocator.getResourceRequest(activeContainerID, standbyHost);
+      failoverMetadata.recordResourceRequest(resourceRequestForActive);
+      containerAllocator.issueResourceRequest(resourceRequestForActive);
 
 Review comment:
   i need to recordResource requests, to handle expired requests, so i create 
one and then use
   issueResourceRequest to issue it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to