rmatharu commented on a change in pull request #903: SEP-19: Allocator changes 
for standby-aware container allocation, and active container failover
URL: https://github.com/apache/samza/pull/903#discussion_r260913111
 
 

 ##########
 File path: 
samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
 ##########
 @@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.clustermanager;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.storage.kv.Entry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Encapsulates logic and state concerning standby-containers.
+ */
+public class StandbyContainerManager {
+
+  private static final Logger log = 
LoggerFactory.getLogger(StandbyContainerManager.class);
+
+  private final SamzaApplicationState samzaApplicationState;
+
+  // Map of samza containerIDs to their corresponding active and standby 
containers, e.g., 0 -> {0-0, 0-1}, 0-0 -> {0, 0-1}
+  // This is used for checking no two standbys or active-standby-pair are 
started on the same host
+  private final Map<String, List<String>> standbyContainerConstraints;
+
+  // Map of active containers that are in failover, indexed by the active 
container's resourceID (at the time of failure)
+  private final Map<String, FailoverMetadata> failovers;
+
+  // Resource-manager, used to stop containers
+  private ClusterResourceManager clusterResourceManager;
+
+  public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
+      ClusterResourceManager clusterResourceManager) {
+    this.failovers = new ConcurrentHashMap<>();
+    this.standbyContainerConstraints = new HashMap<>();
+    this.samzaApplicationState = samzaApplicationState;
+    JobModel jobModel = samzaApplicationState.jobModelManager.jobModel();
+
+    // populate the standbyContainerConstraints map by iterating over all 
containers
+    jobModel.getContainers()
+        .keySet()
+        .forEach(containerId -> standbyContainerConstraints.put(containerId,
+            StandbyTaskUtil.getStandbyContainerConstraints(containerId, 
jobModel)));
+    this.clusterResourceManager = clusterResourceManager;
+
+    log.info("Populated standbyContainerConstraints map {}", 
standbyContainerConstraints);
+  }
+
+  /**
+   * We handle the stopping of a container depending on the case which is 
decided using the exit-status:
+   *    Case 1. an Active-Container which has stopped for an "unknown" reason, 
then we start it on the given preferredHost
+   *    Case 2. Active container has stopped because of node failure, thene we 
initiate a failover
+   *    Case 3. StandbyContainer has stopped after it was chosen for failover, 
see {@link StandbyContainerManager#handleStandbyContainerStop}
+   *    Case 4. StandbyContainer has stopped but not because of a failover, 
see {@link StandbyContainerManager#handleStandbyContainerStop}
+   *
+   * @param containerID containerID of the stopped container
+   * @param resourceID last resourceID of the stopped container
+   * @param preferredHost the host on which the container was running
+   * @param exitStatus the exit status of the failed container
+   * @param containerAllocator the container allocator
+   */
+  public void handleContainerStop(String containerID, String resourceID, 
String preferredHost, int exitStatus,
+      AbstractContainerAllocator containerAllocator) {
+
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      handleStandbyContainerStop(containerID, resourceID, preferredHost, 
containerAllocator);
+    } else {
+      // initiate failover for the active container based on the exitStatus
+      switch (exitStatus) {
+        case SamzaResourceStatus.DISK_FAIL:
+        case SamzaResourceStatus.ABORTED:
+        case SamzaResourceStatus.PREEMPTED:
+          initiateActiveContainerFailover(containerID, resourceID, 
containerAllocator);
+          break;
+      // in all other cases, request resource for the failed container
+        default:
+          log.info("Requesting resource for active-container {} on host {}", 
containerID, preferredHost);
+          containerAllocator.requestResource(containerID, preferredHost);
+          break;
+      }
+    }
+  }
+
+  /**
+   * Handle the failed launch of a container, based on
+   *    Case 1. If it is an active container, then initiate a failover for it.
+   *    Case 2. If it is standby container, request a new resource on AnyHost.
+   * @param containerID the ID of the container that has failed
+   */
+  public void handleContainerLaunchFail(String containerID, String resourceID,
+      AbstractContainerAllocator containerAllocator) {
+
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      log.info("Handling launch fail for standby-container {}, requesting 
resource on any host {}", containerID);
+      containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
+    } else {
+      initiateActiveContainerFailover(containerID, resourceID, 
containerAllocator);
+    }
+  }
+
+  /**
+   *  If a standby container has stopped, then there are two possible cases
+   *    Case 1. during a failover, the standby container was stopped for an 
active's start, then we
+   *       1. request a resource on the standby's host to place the 
activeContainer, and
+   *       2. request anyhost to place this standby
+   *
+   *    Case 2. independent of a failover, the standby container stopped, in 
which proceed with its resource-request
+   * @param standbyContainerID SamzaContainerID of the standby container
+   * @param preferredHost Preferred host of the standby container
+   */
+  private void handleStandbyContainerStop(String standbyContainerID, String 
resourceID, String preferredHost,
+      AbstractContainerAllocator containerAllocator) {
+
+    // if this standbyContainerResource was stopped for a failover, we will 
find a metadata entry
+    Optional<StandbyContainerManager.FailoverMetadata> failoverMetadata = 
this.checkIfUsedForFailover(resourceID);
+
+    if (failoverMetadata.isPresent()) {
+      String activeContainerID = failoverMetadata.get().activeContainerID;
+      String standbyContainerHostname = 
failoverMetadata.get().getStandbyContainerHostname(resourceID);
+
+      log.info("Requesting resource for active container {} on host {}, and 
backup container {} on any host",
+          activeContainerID, standbyContainerHostname, standbyContainerID);
+
+      containerAllocator.requestResource(activeContainerID,
+          standbyContainerHostname); // request standbycontainer's host for 
active-container
+      containerAllocator.requestResource(standbyContainerID,
+          ResourceRequestState.ANY_HOST); // request anyhost for standby 
container
+      return;
+    } else {
+      log.info("Issuing request for standby container {} on host {}, since 
this is not for a failover",
+          standbyContainerID, preferredHost);
+      containerAllocator.requestResource(standbyContainerID, preferredHost);
+      return;
+    }
+  }
+
+  /** Method to handle failover for a container.
+   *  If it is an active container,
+   *  We try to find a standby for the active container, and issue a stop on 
it.
+   *  If we do not find a standby container, we simply issue an anyhost 
request to place it.
+   *  If it is standby container, we simply forward the request to the 
containerAllocator
+   *
+   * @param containerID the samzaContainerID of the active-container
+   * @param resourceID  the samza-resource-ID of the container when it failed 
(used to index failover-state)
+   */
+  private void initiateActiveContainerFailover(String containerID, String 
resourceID,
+      AbstractContainerAllocator containerAllocator) {
+    Optional<Entry<String, SamzaResource>> standbyContainer = 
this.selectStandby(containerID, resourceID);
+
+    // If we find a standbyContainer, we initiate a failover
+    if (standbyContainer.isPresent()) {
+
+      // ResourceID of the active container at the time of its last failure
+      String standbyContainerId = standbyContainer.get().getKey();
+      SamzaResource standbyResource = standbyContainer.get().getValue();
+      String standbyResourceID = standbyResource.getResourceID();
+      String standbyHost = standbyResource.getHost();
+
+      // update the failover state
+      this.registerFailover(containerID, resourceID, standbyResourceID, 
standbyHost);
+      log.info("Initiating failover and stopping standby container, found 
standbyContainer {} = resource {}, "
+          + "for active container {}", standbyContainerId, standbyResourceID, 
containerID);
+      samzaApplicationState.failoversToStandby.incrementAndGet();
+
+      this.clusterResourceManager.stopStreamProcessor(standbyResource);
+      return;
+    } else {
+
+      // If we dont find a standbyContainer, we proceed with the ANYHOST 
request
+      log.info("No standby container found for active container {}, making a 
request for {}", containerID,
+          ResourceRequestState.ANY_HOST);
+      samzaApplicationState.failoversToAnyHost.incrementAndGet();
+      containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
+      return;
+    }
+  }
+
+  /**
+   * Method to select a standby container for a given active container that 
has stopped.
+   * TODO: enrich this method to select standby's intelligently based on lag, 
timestamp, load-balencing, etc.
+   * @param activeContainerID Samza containerID of the active container
+   * @param activeContainerResourceID ResourceID of the active container at 
the time of its last failure
+   * @return
+   */
+  private Optional<Entry<String, SamzaResource>> selectStandby(String 
activeContainerID,
+      String activeContainerResourceID) {
+
+    log.info("Standby containers {} for active container {}", 
this.standbyContainerConstraints.get(activeContainerID), activeContainerID);
 
 Review comment:
   But the same map also contains standyContainerID to active and other standby 
containerIDs.
   Its a map of samza containerIDs to their corresponding active and standby 
containers, e.g., 0 -> {0-0, 0-1}, 0-0 -> {0, 0-1}
   This is used for checking no two standbys or active-standby-pair are started 
on the same host

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to