This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 5748fa6  SAMZA-2605: Make Standby Container Requests Rack Aware  
(#1446)
5748fa6 is described below

commit 5748fa6adec729840beef03495aa6b74f739f017
Author: Pawas Chhokra <pchho...@linkedin.com>
AuthorDate: Thu Dec 24 11:20:54 2020 -0800

    SAMZA-2605: Make Standby Container Requests Rack Aware  (#1446)
    
    Feature: The aim of this feature is to make all standby container requests 
rack aware such that all active containers and their corresponding standby 
containers are always on different racks. This helps with decreased downtime of 
applications during rack failures.
    
    One of the requirements of this feature is that the value of 
job.standbytasks.replication.factor is at max 2 for the rack awareness 
functionality to be honored.
    
    Changes: This PR uses the FaultDomainManager interface for Yarn to request 
for rack aware nodes while making standby container requests.
    
    Usage Instructions: For a job with host affinity and standby containers, 
set the config cluster-manager.fault-domain-aware.standby.enabled to true to 
enable this feature.
---
 .../samza/clustermanager/ContainerAllocator.java   |  53 +++++++++
 .../samza/clustermanager/ContainerManager.java     |   6 +-
 .../clustermanager/ContainerProcessManager.java    |  27 ++++-
 .../clustermanager/StandbyContainerManager.java    | 130 ++++++++++++++++++---
 .../TestClusterBasedJobCoordinator.java            |   1 +
 .../TestContainerAllocatorWithHostAffinity.java    |  11 +-
 .../TestContainerAllocatorWithoutHostAffinity.java |   9 +-
 .../TestContainerPlacementActions.java             |  15 ++-
 .../TestContainerProcessManager.java               |  33 +++---
 .../samza/job/yarn/YarnClusterResourceManager.java |   7 +-
 10 files changed, 239 insertions(+), 53 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index fa5f783..88be21f 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -22,6 +22,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
@@ -348,6 +349,16 @@ public class ContainerAllocator implements Runnable {
   }
 
   /**
+   * Requests a resource from the cluster manager
+   * @param processorId Samza processor ID that will be run when a resource is 
allocated for this request
+   * @param preferredHost name of the host that you prefer to run the 
processor on
+   * @param faultDomains set of fault domains on which to schedule this 
resource
+   */
+  public final void requestResource(String processorId, String preferredHost, 
Set<FaultDomain> faultDomains) {
+    requestResourceWithDelay(processorId, preferredHost, Duration.ZERO, 
faultDomains);
+  }
+
+  /**
    * Requests a resource from the cluster manager with a request timestamp of 
the current time plus the specified delay.
    * @param processorId Samza processor ID that will be run when a resource is 
allocated for this request
    * @param preferredHost name of the host that you prefer to run the 
processor on
@@ -359,6 +370,18 @@ public class ContainerAllocator implements Runnable {
   }
 
   /**
+   * Requests a resource from the cluster manager with a request timestamp of 
the current time plus the specified delay.
+   * @param processorId Samza processor ID that will be run when a resource is 
allocated for this request
+   * @param preferredHost name of the host that you prefer to run the 
processor on
+   * @param delay the {@link Duration} to add to the request timestamp
+   * @param faultDomains set of fault domains on which to schedule this 
resource
+   */
+  public final void requestResourceWithDelay(String processorId, String 
preferredHost, Duration delay, Set<FaultDomain> faultDomains) {
+    SamzaResourceRequest request = getResourceRequestWithDelay(processorId, 
preferredHost, delay, faultDomains);
+    issueResourceRequest(request);
+  }
+
+  /**
    * Creates a {@link SamzaResourceRequest} to send to the cluster manager
    * @param processorId Samza processor ID that will be run when a resource is 
allocated for this request
    * @param preferredHost name of the host that you prefer to run the 
processor on
@@ -369,6 +392,17 @@ public class ContainerAllocator implements Runnable {
   }
 
   /**
+   * Creates a {@link SamzaResourceRequest} to send to the cluster manager
+   * @param processorId Samza processor ID that will be run when a resource is 
allocated for this request
+   * @param preferredHost name of the host that you prefer to run the 
processor on
+   * @param faultDomains set of fault domains on which to schedule this 
resource
+   * @return the created request
+   */
+  public final SamzaResourceRequest getResourceRequest(String processorId, 
String preferredHost, Set<FaultDomain> faultDomains) {
+    return getResourceRequestWithDelay(processorId, preferredHost, 
Duration.ZERO, faultDomains);
+  }
+
+  /**
    * Creates a {@link SamzaResourceRequest} to send to the cluster manager 
with a request timestamp of the current time
    * plus the specified delay.
    * @param processorId Samza processor ID that will be run when a resource is 
allocated for this request
@@ -380,6 +414,19 @@ public class ContainerAllocator implements Runnable {
     return new SamzaResourceRequest(this.containerNumCpuCores, 
this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay));
   }
 
+  /**
+   * Creates a {@link SamzaResourceRequest} to send to the cluster manager 
with a request timestamp of the current time
+   * plus the specified delay.
+   * @param processorId Samza processor ID that will be run when a resource is 
allocated for this request
+   * @param preferredHost name of the host that you prefer to run the 
processor on
+   * @param delay the {@link Duration} to add to the request timestamp
+   * @param faultDomains set of fault domains on which to schedule this 
resource
+   * @return the created request
+   */
+  public final SamzaResourceRequest getResourceRequestWithDelay(String 
processorId, String preferredHost, Duration delay, Set<FaultDomain> 
faultDomains) {
+    return new SamzaResourceRequest(this.containerNumCpuCores, 
this.containerMemoryMb, preferredHost, processorId, Instant.now().plus(delay), 
faultDomains);
+  }
+
   public final void issueResourceRequest(SamzaResourceRequest request) {
     resourceRequestState.addResourceRequest(request);
     state.containerRequests.incrementAndGet();
@@ -388,6 +435,9 @@ public class ContainerAllocator implements Runnable {
     } else {
       state.preferredHostRequests.incrementAndGet();
     }
+    if (!request.getFaultDomains().isEmpty()) {
+      state.faultDomainAwareContainerRequests.incrementAndGet();
+    }
   }
 
   /**
@@ -480,5 +530,8 @@ public class ContainerAllocator implements Runnable {
     } else {
       state.expiredPreferredHostRequests.incrementAndGet();
     }
+    if (!request.getFaultDomains().isEmpty()) {
+      state.expiredFaultDomainAwareContainerRequests.incrementAndGet();
+    }
   }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 24130fd..5fcf328 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
+import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.placement.ContainerPlacementMessage;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
@@ -88,8 +89,9 @@ public class ContainerManager {
 
   public ContainerManager(ContainerPlacementMetadataStore 
containerPlacementMetadataStore,
       SamzaApplicationState samzaApplicationState, ClusterResourceManager 
clusterResourceManager,
-      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager 
localityManager) {
+      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager 
localityManager, FaultDomainManager faultDomainManager, Config config) {
     Preconditions.checkNotNull(localityManager, "Locality manager cannot be 
null");
+    Preconditions.checkNotNull(faultDomainManager, "Fault domain manager 
cannot be null");
     this.samzaApplicationState = samzaApplicationState;
     this.clusterResourceManager = clusterResourceManager;
     this.actions = new ConcurrentHashMap<>();
@@ -100,7 +102,7 @@ public class ContainerManager {
     // Enable standby container manager if required
     if (standByEnabled) {
       this.standbyContainerManager =
-          Optional.of(new StandbyContainerManager(samzaApplicationState, 
clusterResourceManager, localityManager));
+          Optional.of(new StandbyContainerManager(samzaApplicationState, 
clusterResourceManager, localityManager, config, faultDomainManager));
     } else {
       this.standbyContainerManager = Optional.empty();
     }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 995cf7d..143e0b3 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -149,6 +149,9 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     ResourceManagerFactory factory = 
getContainerProcessManagerFactory(clusterManagerConfig);
     this.clusterResourceManager = 
checkNotNull(factory.getClusterResourceManager(this, state));
 
+    FaultDomainManagerFactory faultDomainManagerFactory = 
getFaultDomainManagerFactory(clusterManagerConfig);
+    FaultDomainManager faultDomainManager = 
checkNotNull(faultDomainManagerFactory.getFaultDomainManager(config, registry));
+
     // Initialize metrics
     this.containerProcessManagerMetrics = new 
ContainerProcessManagerMetrics(config, state, registry);
     this.jvmMetrics = new JvmMetrics(registry);
@@ -172,8 +175,8 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     // Wire all metrics to all reporters
     this.metricsReporters.values().forEach(reporter -> 
reporter.register(METRICS_SOURCE_NAME, registry));
 
-    this.containerManager = new ContainerManager(metadataStore, state, 
clusterResourceManager, hostAffinityEnabled,
-        jobConfig.getStandbyTasksEnabled(), localityManager);
+    this.containerManager = new ContainerManager(metadataStore, state, 
clusterResourceManager,
+            hostAffinityEnabled, jobConfig.getStandbyTasksEnabled(), 
localityManager, faultDomainManager, config);
 
     this.containerAllocator = new 
ContainerAllocator(this.clusterResourceManager, config, state, 
hostAffinityEnabled, this.containerManager);
     this.allocatorThread = new Thread(this.containerAllocator, "Container 
Allocator Thread");
@@ -649,6 +652,26 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
   }
 
   /**
+   * Returns an instantiated {@link FaultDomainManagerFactory} from a {@link 
ClusterManagerConfig}. The
+   * {@link FaultDomainManagerFactory} is used to return an implementation of 
a {@link FaultDomainManager}
+   *
+   * @param clusterManagerConfig, the cluster manager config to parse.
+   *
+   */
+  private FaultDomainManagerFactory getFaultDomainManagerFactory(final 
ClusterManagerConfig clusterManagerConfig) {
+    final String faultDomainManagerFactoryClass = 
clusterManagerConfig.getFaultDomainManagerClass();
+    final FaultDomainManagerFactory factory;
+
+    try {
+      factory = ReflectionUtil.getObj(faultDomainManagerFactoryClass, 
FaultDomainManagerFactory.class);
+    } catch (Exception e) {
+      LOG.error("Error creating the fault domain manager.", e);
+      throw new SamzaException(e);
+    }
+    return factory;
+  }
+
+  /**
    * Obtains the ID of the Samza processor pending launch on the provided 
resource (container).
    *
    * ContainerProcessManager [INFO] Container ID: 
container_e66_1569376389369_0221_01_000049 matched pending Processor ID: 0 on 
host: ltx1-app0772.stg.linkedin.com
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index b849ea5..a07a924 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -19,6 +19,7 @@
 package org.apache.samza.clustermanager;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -28,6 +29,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ClusterManagerConfig;
+import org.apache.samza.config.Config;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
@@ -56,8 +59,13 @@ public class StandbyContainerManager {
   // Resource-manager, used to stop containers
   private ClusterResourceManager clusterResourceManager;
 
-  public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
-      ClusterResourceManager clusterResourceManager, LocalityManager 
localityManager) {
+  // FaultDomainManager, used to get fault domain information of different 
hosts from the cluster manager.
+  private final FaultDomainManager faultDomainManager;
+
+  private final boolean isFaultDomainAwareStandbyEnabled;
+
+  public StandbyContainerManager(SamzaApplicationState samzaApplicationState, 
ClusterResourceManager clusterResourceManager,
+                                 LocalityManager localityManager, Config 
config, FaultDomainManager faultDomainManager) {
     this.failovers = new ConcurrentHashMap<>();
     this.localityManager = localityManager;
     this.standbyContainerConstraints = new HashMap<>();
@@ -70,6 +78,9 @@ public class StandbyContainerManager {
         .forEach(containerId -> standbyContainerConstraints.put(containerId,
             StandbyTaskUtil.getStandbyContainerConstraints(containerId, 
jobModel)));
     this.clusterResourceManager = clusterResourceManager;
+    this.faultDomainManager = faultDomainManager;
+    ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
+    this.isFaultDomainAwareStandbyEnabled = 
clusterManagerConfig.getFaultDomainAwareStandbyEnabled();
 
     log.info("Populated standbyContainerConstraints map {}", 
standbyContainerConstraints);
   }
@@ -125,7 +136,9 @@ public class StandbyContainerManager {
 
     if (StandbyTaskUtil.isStandbyContainer(containerID)) {
       log.info("Handling launch fail for standby-container {}, requesting 
resource on any host {}", containerID);
-      containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
+      String activeContainerHost = getActiveContainerHost(containerID)
+              .orElse(null);
+      requestResource(containerAllocator, containerID, 
ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
     } else {
       initiateStandbyAwareAllocation(containerID, resourceID, 
containerAllocator);
     }
@@ -157,6 +170,22 @@ public class StandbyContainerManager {
   }
 
   /**
+   * This method removes the fault domain of the host passed as an argument, 
from the set of fault domains, and then returns it.
+   * The set of fault domains returned is based on the set difference between 
all the available fault domains in the
+   * cluster and the fault domain associated with the host that is passed as 
input.
+   * @param hostToAvoid hostname whose fault domains are excluded
+   * @return The set of fault domains which excludes the fault domain that the 
given host is on
+   */
+  public Set<FaultDomain> getAllowedFaultDomainsGivenHostToAvoid(String 
hostToAvoid) {
+    Set<FaultDomain> allFaultDomains = faultDomainManager.getAllFaultDomains();
+    Set<FaultDomain> faultDomainToAvoid = Optional.ofNullable(hostToAvoid)
+            .map(faultDomainManager::getFaultDomainsForHost)
+            .orElse(Collections.emptySet());
+    allFaultDomains.removeAll(faultDomainToAvoid);
+    return allFaultDomains;
+  }
+
+  /**
    *  If a standby container has stopped, then there are two possible cases
    *    Case 1. during a failover, the standby container was stopped for an 
active's start, then we
    *       1. request a resource on the standby's host to place the 
activeContainer, and
@@ -181,24 +210,29 @@ public class StandbyContainerManager {
 
       // request standbycontainer's host for active-container
       SamzaResourceRequest resourceRequestForActive =
-          containerAllocator.getResourceRequestWithDelay(activeContainerID, 
standbyContainerHostname, preferredHostRetryDelay);
+        containerAllocator.getResourceRequestWithDelay(activeContainerID, 
standbyContainerHostname, preferredHostRetryDelay);
       // record the resource request, before issuing it to avoid race with 
allocation-thread
       failoverMetadata.get().recordResourceRequest(resourceRequestForActive);
       containerAllocator.issueResourceRequest(resourceRequestForActive);
 
       // request any-host for standby container
-      containerAllocator.requestResource(standbyContainerID, 
ResourceRequestState.ANY_HOST);
+      requestResource(containerAllocator, standbyContainerID, 
ResourceRequestState.ANY_HOST, Duration.ZERO, standbyContainerHostname);
     } else {
       log.info("Issuing request for standby container {} on host {}, since 
this is not for a failover",
           standbyContainerID, preferredHost);
-      containerAllocator.requestResourceWithDelay(standbyContainerID, 
preferredHost, preferredHostRetryDelay);
+      String activeContainerHost = getActiveContainerHost(standbyContainerID)
+              .orElse(null);
+      requestResource(containerAllocator, standbyContainerID, preferredHost, 
preferredHostRetryDelay, activeContainerHost);
     }
   }
 
   /** Method to handle standby-aware allocation for an active container.
    *  We try to find a standby host for the active container, and issue a stop 
on any standby-containers running on it,
    *  request resource to place the active on the standby's host, and one to 
place the standby elsewhere.
-   *
+   *  When requesting for resources,
+   *  NOTE: When rack awareness is turned on, we always pass the 
<code>hostToAvoid</> parameter as null for the {@link #requestResource} method 
used here
+   *  because the hostname of the previous active processor that died does not 
exist in the running or pending container list anymore.
+   *  However, different racks will always be guaranteed through {@link 
#checkStandbyConstraintsAndRunStreamProcessor}.
    * @param activeContainerID the samzaContainerID of the active-container
    * @param resourceID  the samza-resource-ID of the container when it failed 
(used to index failover-state)
    */
@@ -235,7 +269,7 @@ public class StandbyContainerManager {
 
         // record the resource request, before issuing it to avoid race with 
allocation-thread
         SamzaResourceRequest resourceRequestForActive =
-            containerAllocator.getResourceRequest(activeContainerID, 
standbyHost);
+                containerAllocator.getResourceRequest(activeContainerID, 
standbyHost);
         failoverMetadata.recordResourceRequest(resourceRequestForActive);
         containerAllocator.issueResourceRequest(resourceRequestForActive);
         samzaApplicationState.failoversToStandby.incrementAndGet();
@@ -281,7 +315,7 @@ public class StandbyContainerManager {
     Optional<FailoverMetadata> failoverMetadata = 
getFailoverMetadata(activeContainerResourceID);
 
     // Iterate over the list of running standby containers, to find a standby 
resource that we have not already
-    // used for a failover for this active resoruce
+    // used for a failover for this active resource
     for (String standbyContainerID : 
this.standbyContainerConstraints.get(activeContainerID)) {
 
       if 
(samzaApplicationState.runningProcessors.containsKey(standbyContainerID)) {
@@ -307,7 +341,7 @@ public class StandbyContainerManager {
               .map(ProcessorLocality::host)
               .orElse(null);
 
-      if (StringUtils.isNotBlank(standbyHost)) {
+      if (StringUtils.isBlank(standbyHost)) {
         log.info("No last known standbyHost for container {}", 
standbyContainerID);
       } else if (failoverMetadata.isPresent() && 
failoverMetadata.get().isStandbyHostUsed(standbyHost)) {
 
@@ -361,8 +395,44 @@ public class StandbyContainerManager {
   }
 
   /**
-   * Check if matching this SamzaResourceRequest to the given resource, meets 
all standby-container container constraints.
+   * This method checks from the config if standby allocation is fault domain 
aware or not, and requests resources accordingly.
+   *
+   * @param containerAllocator ContainerAllocator object that requests for 
resources from the resource manager
+   * @param containerID Samza container ID that will be run when a resource is 
allocated for this request
+   * @param preferredHost name of the host that you prefer to run the 
processor on
+   * @param preferredHostRetryDelay the {@link Duration} to add to the request 
timestamp
+   * @param hostToAvoid The hostname to avoid requesting this resource on if 
fault domain aware standby allocation is enabled
+   */
+  void requestResource(ContainerAllocator containerAllocator, String 
containerID, String preferredHost, Duration preferredHostRetryDelay, String 
hostToAvoid) {
+    if (StandbyTaskUtil.isStandbyContainer(containerID) && 
isFaultDomainAwareStandbyEnabled) {
+      containerAllocator.requestResourceWithDelay(containerID, preferredHost, 
preferredHostRetryDelay, getAllowedFaultDomainsGivenHostToAvoid(hostToAvoid));
+    } else {
+      containerAllocator.requestResourceWithDelay(containerID, preferredHost, 
preferredHostRetryDelay, new HashSet<>());
+    }
+  }
+
+  /**
+   * This method returns the active container host given a standby or active 
container ID.
    *
+   * @param containerID Standby or active container container ID
+   * @return The active container host
+   */
+  Optional<String> getActiveContainerHost(String containerID) {
+    String activeContainerId = containerID;
+    if (StandbyTaskUtil.isStandbyContainer(containerID)) {
+      activeContainerId = StandbyTaskUtil.getActiveContainerId(containerID);
+    }
+    SamzaResource resource = 
samzaApplicationState.pendingProcessors.get(activeContainerId);
+    if (resource == null) {
+      resource = 
samzaApplicationState.runningProcessors.get(activeContainerId);
+    }
+    return Optional.ofNullable(resource)
+            .map(SamzaResource::getHost);
+  }
+
+  /**
+   * Check if matching this SamzaResourceRequest to the given resource, meets 
all standby-container container constraints.
+   * This includes the check that a standby and its active should not be on 
the same fault domain or the same host.
    * @param containerIdToStart logical id of the container to start
    * @param host potential host to start the container on
    * @return
@@ -375,17 +445,33 @@ public class StandbyContainerManager {
       SamzaResource resource = 
samzaApplicationState.pendingProcessors.get(containerID);
 
       // return false if a conflicting container is pending for launch on the 
host
-      if (resource != null && resource.getHost().equals(host)) {
+      if (resource != null && isFaultDomainAwareStandbyEnabled
+              && faultDomainManager.hasSameFaultDomains(host, 
resource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this fault domain",
+                containerIdToStart, host, containerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          
samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (resource != null && resource.getHost().equals(host)) {
         log.info("Container {} cannot be started on host {} because container 
{} is already scheduled on this host",
-            containerIdToStart, host, containerID);
+                containerIdToStart, host, containerID);
         return false;
       }
 
       // return false if a conflicting container is running on the host
       resource = samzaApplicationState.runningProcessors.get(containerID);
-      if (resource != null && resource.getHost().equals(host)) {
+      if (resource != null && isFaultDomainAwareStandbyEnabled
+              && faultDomainManager.hasSameFaultDomains(host, 
resource.getHost())) {
+        log.info("Container {} cannot be started on host {} because container 
{} is already running on this fault domain",
+                containerIdToStart, host, containerID);
+        if (StandbyTaskUtil.isStandbyContainer(containerIdToStart)) {
+          
samzaApplicationState.failedFaultDomainAwareContainerAllocations.incrementAndGet();
+        }
+        return false;
+      } else if (resource != null && resource.getHost().equals(host)) {
         log.info("Container {} cannot be started on host {} because container 
{} is already running on this host",
-            containerIdToStart, host, containerID);
+                containerIdToStart, host, containerID);
         return false;
       }
     }
@@ -409,16 +495,21 @@ public class StandbyContainerManager {
       log.info("Running container {} on {} meets standby constraints, 
preferredHost = {}", containerID,
           samzaResource.getHost(), preferredHost);
       containerAllocator.runStreamProcessor(request, preferredHost);
+      if (isFaultDomainAwareStandbyEnabled && 
StandbyTaskUtil.isStandbyContainer(containerID)) {
+        
samzaApplicationState.faultDomainAwareContainersStarted.incrementAndGet();
+      }
     } else if (StandbyTaskUtil.isStandbyContainer(containerID)) {
       // This resource cannot be used to launch this standby container, so we 
make a new anyhost request
       log.info(
           "Running standby container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource, and making a new 
ANY_HOST request",
           containerID, samzaResource.getHost());
       releaseUnstartableContainer(request, samzaResource, preferredHost, 
resourceRequestState);
-      containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
+      String activeContainerHost = getActiveContainerHost(containerID)
+              .orElse(null);
+      requestResource(containerAllocator, containerID, 
ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
       samzaApplicationState.failedStandbyAllocations.incrementAndGet();
     } else {
-      // This resource cannot be used to launch this active container 
container, so we initiate a failover
+      // This resource cannot be used to launch this active container, so we 
initiate a failover
       log.warn(
           "Running active container {} on host {} does not meet standby 
constraints, cancelling resource request, releasing resource",
           containerID, samzaResource.getHost());
@@ -469,7 +560,6 @@ public class StandbyContainerManager {
     resourceRequestState.cancelResourceRequest(request);
   }
 
-
   // Handle an expired resource request that was made for placing a standby 
container
   private void handleExpiredRequestForStandbyContainer(String containerID, 
SamzaResourceRequest request,
       Optional<SamzaResource> alternativeResource, ContainerAllocator 
containerAllocator,
@@ -486,7 +576,9 @@ public class StandbyContainerManager {
       // If there is no alternative-resource for the standby container we make 
a new anyhost request
       log.info("Handling expired request, requesting anyHost resource for 
standby container {}", containerID);
       resourceRequestState.cancelResourceRequest(request);
-      containerAllocator.requestResource(containerID, 
ResourceRequestState.ANY_HOST);
+      String activeContainerHost = getActiveContainerHost(containerID)
+              .orElse(null);
+      requestResource(containerAllocator, containerID, 
ResourceRequestState.ANY_HOST, Duration.ZERO, activeContainerHost);
     }
   }
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
index 50a1ee1..e0b0739 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
@@ -88,6 +88,7 @@ public class TestClusterBasedJobCoordinator {
     configMap.put("task.inputs", "kafka.topic1");
     configMap.put("systems.kafka.samza.factory", 
"org.apache.samza.system.MockSystemFactory");
     configMap.put("samza.cluster-manager.factory", 
"org.apache.samza.clustermanager.MockClusterResourceManagerFactory");
+    configMap.put("cluster-manager.fault-domain-manager.factory", 
"org.apache.samza.clustermanager.MockFaultDomainManagerFactory");
     configMap.put("job.coordinator.monitor-partition-change.frequency.ms", 
"1");
 
     MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", 
"topic1", new Partition(0)), new ArrayList<>());
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 2b4a4b0..2c9ba81 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -64,6 +64,7 @@ public class TestContainerAllocatorWithHostAffinity {
   private final SamzaApplicationState state = new 
SamzaApplicationState(jobModelManager);
 
   private final MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+  private final FaultDomainManager faultDomainManager = 
mock(FaultDomainManager.class);
   private ContainerPlacementMetadataStore containerPlacementMetadataStore;
   private ContainerManager containerManager;
 
@@ -89,7 +90,7 @@ public class TestContainerAllocatorWithHostAffinity {
     coordinatorStreamStore.init();
     containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
-    containerManager = new ContainerManager(containerPlacementMetadataStore, 
state, clusterResourceManager, true, false, mockLocalityManager);
+    containerManager = new ContainerManager(containerPlacementMetadataStore, 
state, clusterResourceManager, true, false, mockLocalityManager, 
faultDomainManager, config);
     containerAllocator =
         new ContainerAllocator(clusterResourceManager, config, state, true, 
containerManager);
     requestState = new MockContainerRequestState(clusterResourceManager, true);
@@ -369,7 +370,7 @@ public class TestContainerAllocatorWithHostAffinity {
     ClusterResourceManager.Callback mockCPM = 
mock(MockClusterResourceManagerCallback.class);
     ClusterResourceManager mockClusterResourceManager = new 
MockClusterResourceManager(mockCPM, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
mockClusterResourceManager, true, false, mock(LocalityManager.class));
+        new ContainerManager(containerPlacementMetadataStore, state, 
mockClusterResourceManager, true, false, mock(LocalityManager.class), 
faultDomainManager, config);
     // Mock the callback from ClusterManager to add resources to the allocator
     doAnswer((InvocationOnMock invocation) -> {
       SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, 
List.class).get(0);
@@ -416,7 +417,7 @@ public class TestContainerAllocatorWithHostAffinity {
   public void testExpiredRequestAllocationOnAnyHost() throws Exception {
     MockClusterResourceManager spyManager = spy(new 
MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, 
spyManager, true, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
spyManager, true, false, mock(LocalityManager.class), faultDomainManager, 
config));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyManager, config, state, true, 
spyContainerManager));
     // Request Preferred Resources
@@ -460,7 +461,7 @@ public class TestContainerAllocatorWithHostAffinity {
     // Add Extra Resources
     MockClusterResourceManager spyClusterResourceManager = spy(new 
MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, 
spyClusterResourceManager, true, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
spyClusterResourceManager, true, false, mock(LocalityManager.class), 
faultDomainManager, config));
 
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyClusterResourceManager, config, state, true, 
spyContainerManager));
@@ -513,7 +514,7 @@ public class TestContainerAllocatorWithHostAffinity {
     ClusterResourceManager.Callback mockCPM = 
mock(MockClusterResourceManagerCallback.class);
     MockClusterResourceManager mockClusterResourceManager = new 
MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, 
mockClusterResourceManager, true, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
mockClusterResourceManager, true, false, mock(LocalityManager.class), 
faultDomainManager, config));
 
     SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, 
"host-0", "id0",
         System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index ac5d6f3..1f063d7 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -62,6 +62,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
 
   private final SamzaApplicationState state = new 
SamzaApplicationState(jobModelManager);
   private final MockClusterResourceManager manager = new 
MockClusterResourceManager(callback, state);
+  private final FaultDomainManager faultDomainManager = 
mock(FaultDomainManager.class);
 
   private CoordinatorStreamStore coordinatorStreamStore;
   private ContainerPlacementMetadataStore containerPlacementMetadataStore;
@@ -83,7 +84,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
     containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
     containerAllocator = new ContainerAllocator(manager, config, state, false,
-        new ContainerManager(containerPlacementMetadataStore, state, manager, 
false, false, mockLocalityManager));
+        new ContainerManager(containerPlacementMetadataStore, state, manager, 
false, false, mockLocalityManager, faultDomainManager, config));
     requestState = new MockContainerRequestState(manager, false);
     Field requestStateField = 
containerAllocator.getClass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
@@ -179,9 +180,11 @@ public class TestContainerAllocatorWithoutHostAffinity {
     });
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new 
HashMap<>()));
+    ContainerManager containerManager = new 
ContainerManager(containerPlacementMetadataStore, state, manager, false,
+        false, mockLocalityManager, faultDomainManager, config);
     containerAllocator =
         
MockContainerAllocatorWithoutHostAffinity.createContainerAllocatorWithConfigOverride(manager,
 config, state,
-            new ContainerManager(containerPlacementMetadataStore, state, 
manager, false, false, mockLocalityManager),
+            containerManager,
             override);
     MockContainerAllocatorWithoutHostAffinity mockAllocator =
         (MockContainerAllocatorWithoutHostAffinity) containerAllocator;
@@ -331,7 +334,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
     ClusterResourceManager.Callback mockCPM = 
mock(ClusterResourceManager.Callback.class);
     ClusterResourceManager mockManager = new 
MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, 
mockManager, false, false, mock(LocalityManager.class)));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
mockManager, false, false, mock(LocalityManager.class), faultDomainManager, 
config));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(mockManager, config, state, false, 
spyContainerManager));
     // Mock the callback from ClusterManager to add resources to the allocator
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index c781f4d..e5ead9e 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -141,13 +141,14 @@ public class TestContainerPlacementActions {
     state = new 
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 
2, server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
     localityManager = mock(LocalityManager.class);
     when(localityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of(
             "0", new ProcessorLocality("0", "host-1"),
             "1", new ProcessorLocality("1", "host-2"))));
-    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, localityManager));
+    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, localityManager, faultDomainManager, 
config));
     allocatorWithHostAffinity = new 
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, 
containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(),
             clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager, localityManager, false);
@@ -171,9 +172,10 @@ public class TestContainerPlacementActions {
     state = new SamzaApplicationState(getJobModelManagerWithStandby());
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
     // Enable standby
-    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, true, mockLocalityManager));
+    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, true, mockLocalityManager, faultDomainManager, 
config));
     allocatorWithHostAffinity = new 
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, 
containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(),
         clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager, mockLocalityManager, false);
@@ -552,8 +554,9 @@ public class TestContainerPlacementActions {
     state = new 
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 
2, this.server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
-    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, localityManager));
+    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, localityManager, faultDomainManager, 
config));
     allocatorWithHostAffinity = new 
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, 
containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(),
         clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager, localityManager, false);
@@ -666,8 +669,9 @@ public class TestContainerPlacementActions {
         new 
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 
2, this.server));
     ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, false, false, localityManager);
+        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, false, false, localityManager, faultDomainManager, 
config);
     MockContainerAllocatorWithoutHostAffinity allocatorWithoutHostAffinity =
         new MockContainerAllocatorWithoutHostAffinity(clusterResourceManager, 
new MapConfig(conf), state,
             containerManager);
@@ -801,8 +805,9 @@ public class TestContainerPlacementActions {
         new 
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 
2, this.server));
     ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     ContainerManager containerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, localityManager));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, localityManager, faultDomainManager, 
config));
     MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
         new MockContainerAllocatorWithHostAffinity(clusterResourceManager, 
config, state, containerManager);
     ContainerProcessManager cpm = new ContainerProcessManager(
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index bcbe53f..ad45c5e 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -141,11 +141,12 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of("0", new 
ProcessorLocality("0", "host1"))));
     ContainerManager containerManager =
-        buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, mockLocalityManager);
+        buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, mockLocalityManager, faultDomainManager);
     ContainerProcessManager cpm =
         buildContainerProcessManager(new ClusterManagerConfig(new 
MapConfig(conf)), state, clusterResourceManager, Optional.empty());
 
@@ -592,6 +593,7 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
 
     if (withHostAffinity) {
@@ -604,7 +606,7 @@ public class TestContainerProcessManager {
 
     ContainerManager containerManager =
         buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
-            clusterManagerConfig.getHostAffinityEnabled(), false, 
mockLocalityManager);
+            clusterManagerConfig.getHostAffinityEnabled(), false, 
mockLocalityManager, faultDomainManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -614,7 +616,7 @@ public class TestContainerProcessManager {
 
     ContainerProcessManager cpm =
         buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.of(allocator),
-            mockLocalityManager, false);
+            mockLocalityManager, false, faultDomainManager);
 
     // start triggers a request
     cpm.start();
@@ -755,11 +757,12 @@ public class TestContainerProcessManager {
     configMap.putAll(getConfig());
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of("0", new 
ProcessorLocality("1", "host1"))));
     ContainerManager containerManager = 
buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
-        
Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, 
mockLocalityManager);
+            
Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, 
mockLocalityManager, faultDomainManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -792,11 +795,12 @@ public class TestContainerProcessManager {
     SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(2));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
     when(mockLocalityManager.readLocality())
         .thenReturn(new LocalityModel(ImmutableMap.of("0", new 
ProcessorLocality("0", "host1"), "1", new ProcessorLocality("1", "host2"))));
     ContainerManager containerManager = 
buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
-        
Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), 
false, mockLocalityManager);
+            
Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), 
false, mockLocalityManager, faultDomainManager);
 
     MockContainerAllocatorWithHostAffinity allocator = new 
MockContainerAllocatorWithHostAffinity(
         clusterResourceManager,
@@ -806,7 +810,7 @@ public class TestContainerProcessManager {
 
     ContainerProcessManager cpm =
         spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, 
clusterResourceManager,
-            Optional.of(allocator), mockLocalityManager, false));
+            Optional.of(allocator), mockLocalityManager, false, 
faultDomainManager));
 
     cpm.start();
     assertFalse(cpm.shouldShutdown());
@@ -1031,16 +1035,16 @@ public class TestContainerProcessManager {
       SamzaApplicationState samzaApplicationState, ClusterResourceManager 
clusterResourceManager,
       boolean hostAffinityEnabled, boolean standByEnabled) {
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new 
HashMap<>()));
     return buildContainerManager(containerPlacementMetadataStore, 
samzaApplicationState, clusterResourceManager,
-        hostAffinityEnabled, standByEnabled, mockLocalityManager);
+        hostAffinityEnabled, standByEnabled, mockLocalityManager, 
faultDomainManager);
   }
 
   private ContainerManager 
buildContainerManager(ContainerPlacementMetadataStore 
containerPlacementMetadataStore,
-      SamzaApplicationState samzaApplicationState, ClusterResourceManager 
clusterResourceManager,
-      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager 
localityManager) {
-    return new ContainerManager(containerPlacementMetadataStore, 
samzaApplicationState, clusterResourceManager,
-        hostAffinityEnabled, standByEnabled, localityManager);
+      SamzaApplicationState samzaApplicationState, ClusterResourceManager 
clusterResourceManager, boolean hostAffinityEnabled,
+      boolean standByEnabled, LocalityManager localityManager, 
FaultDomainManager faultDomainManager) {
+    return new ContainerManager(containerPlacementMetadataStore, 
samzaApplicationState, clusterResourceManager, hostAffinityEnabled, 
standByEnabled, localityManager, faultDomainManager, config);
   }
   private ContainerProcessManager 
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, 
SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, 
Optional<ContainerAllocator> allocator) {
@@ -1050,16 +1054,17 @@ public class TestContainerProcessManager {
   private ContainerProcessManager 
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, 
SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, 
Optional<ContainerAllocator> allocator, boolean restartContainer) {
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    FaultDomainManager faultDomainManager = mock(FaultDomainManager.class);
     when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new 
HashMap<>()));
     return buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, allocator,
-        mockLocalityManager, restartContainer);
+        mockLocalityManager, restartContainer, faultDomainManager);
   }
 
   private ContainerProcessManager 
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, 
SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, 
Optional<ContainerAllocator> allocator, LocalityManager localityManager,
-      boolean restartContainers) {
+      boolean restartContainers, FaultDomainManager faultDomainManager) {
     return new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(), clusterResourceManager,
         allocator, buildContainerManager(containerPlacementMetadataStore, 
state, clusterResourceManager,
-        clusterManagerConfig.getHostAffinityEnabled(), false, 
localityManager), localityManager, restartContainers);
+        clusterManagerConfig.getHostAffinityEnabled(), false, localityManager, 
faultDomainManager), localityManager, restartContainers);
   }
 }
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index fa784e0..ccdd00f 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -241,6 +241,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     String processorId = resourceRequest.getProcessorId();
     String requestId = resourceRequest.getRequestId();
     String preferredHost = resourceRequest.getPreferredHost();
+    String[] racks = 
resourceRequest.getFaultDomains().stream().map(FaultDomain::getId).toArray(String[]::new);
     int memoryMb = resourceRequest.getMemoryMB();
     int cpuCores = resourceRequest.getNumCores();
     Resource capability = Resource.newInstance(memoryMb, cpuCores);
@@ -261,15 +262,15 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
       Priority priority = Priority.newInstance(ANY_HOST_PRIORITY);
       boolean relaxLocality = true;
       log.info("Requesting resources for Processor ID: {} on nodes: {} on 
racks: {} with capability: {}, priority: {}, relaxLocality: {}, 
nodeLabelsExpression: {}",
-          processorId, null, null, capability, priority, relaxLocality, 
nodeLabelsExpression);
+          processorId, null, Arrays.toString(racks), capability, priority, 
relaxLocality, nodeLabelsExpression);
       issuedRequest = new AMRMClient.ContainerRequest(capability, null, null, 
priority, relaxLocality, nodeLabelsExpression);
     } else {
       String[] nodes = {preferredHost};
       Priority priority = Priority.newInstance(PREFERRED_HOST_PRIORITY);
       boolean relaxLocality = false;
       log.info("Requesting resources for Processor ID: {} on nodes: {} on 
racks: {} with capability: {}, priority: {}, relaxLocality: {}, 
nodeLabelsExpression: {}",
-          processorId, Arrays.toString(nodes), null, capability, priority, 
relaxLocality, nodeLabelsExpression);
-      issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, null, 
priority, relaxLocality, nodeLabelsExpression);
+          processorId, Arrays.toString(nodes), Arrays.toString(racks), 
capability, priority, relaxLocality, nodeLabelsExpression);
+      issuedRequest = new AMRMClient.ContainerRequest(capability, nodes, 
racks, priority, relaxLocality, nodeLabelsExpression);
     }
     // ensure that updating the state and making the request are done 
atomically.
     synchronized (lock) {

Reply via email to