This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f7f9f3c  SAMZA-2439: Remove LocalityManager and container location 
information from JobModel (#1421)
f7f9f3c is described below

commit f7f9f3c7905b047f262383bcc64ebb01ab73f421
Author: mynameborat <bharath.kumarasubraman...@gmail.com>
AuthorDate: Wed Aug 26 19:39:32 2020 -0700

    SAMZA-2439: Remove LocalityManager and container location information from 
JobModel (#1421)
    
    Issues
    Currently locality information is part of job model. Job model typically is 
immutable and fixed within the lifecycle of an application attempt. The 
locality information on the other hand is dynamic and changes in the event of 
container movements. Due to this difference, it makes it complicated to 
program, model or define semantics around these models when building features. 
Furthermore, by removing this dependency
    
    - Enables us to move JobModel to public APIs and expose it in JobContext
    - Enables us to cache and serve serialized JobModel from the AM servlet to 
reduce AM overhead (memory, open connections, num threads) during container 
startup, esp. for jobs with a large number of containers (See: #1241)
    - Removes tech debt: models should be immutable, and should not update 
themselves.
    - Removes tech debt: makes current container location a first class concept 
for container scheduling / placement , and for tools like dashboard, 
samza-rest, auto-scaling, diagnostics etc.
    
    Changes
    - Separated out locality information out of job model into LocalityModel
    - Introduced an endpoint in AM to serve locality information
    - Added Json MixIns for locality models (LocalityModel & ContainerLocality)
    - Moved JobModel to samza-api and exposed through JobContext
    
    API Changes:
    - Introduced new models for locality.
    - Previous job model endpoint will no longer serve locality information. 
i.e. tools using these will need to update to use the new endpoint.
    - Expose JobModel via JobContext
---
 .../java/org/apache/samza/context/JobContext.java  |   6 +
 .../java/org/apache/samza/job/model/JobModel.java  |  76 ------------
 .../org/apache/samza/job/model/LocalityModel.java  |  83 +++++++++++++
 .../apache/samza/job/model/ProcessorLocality.java  |  86 ++++++++++++++
 .../clustermanager/ClusterBasedJobCoordinator.java |   8 +-
 .../samza/clustermanager/ContainerManager.java     |  16 ++-
 .../clustermanager/ContainerProcessManager.java    |  36 ++++--
 .../clustermanager/StandbyContainerManager.java    |  18 ++-
 .../apache/samza/container/LocalityManager.java    |  49 ++++----
 .../org/apache/samza/context/JobContextImpl.java   |  15 ++-
 .../samza/coordinator/server/LocalityServlet.java  |  69 +++++++++++
 .../apache/samza/execution/LocalJobPlanner.java    |   1 -
 .../apache/samza/execution/RemoteJobPlanner.java   |   1 -
 .../apache/samza/processor/StreamProcessor.java    |   2 +-
 .../apache/samza/runtime/ContainerLaunchUtil.java  |   2 +-
 .../serializers/model/JsonLocalityModelMixIn.java  |  37 +++---
 .../model/JsonProcessorLocalityMixIn.java          |  48 ++++++++
 .../samza/serializers/model/SamzaObjectMapper.java |   7 ++
 .../org/apache/samza/storage/StorageRecovery.java  |   4 +-
 .../apache/samza/coordinator/JobModelManager.scala |  32 ++---
 .../apache/samza/job/local/ThreadJobFactory.scala  |   2 +-
 .../TestContainerAllocatorWithHostAffinity.java    |  27 ++---
 .../TestContainerAllocatorWithoutHostAffinity.java |   8 +-
 .../TestContainerPlacementActions.java             |  73 +++++-------
 .../TestContainerProcessManager.java               | 127 ++++++++++++--------
 .../samza/clustermanager/TestStandbyAllocator.java |  10 +-
 .../samza/container/TestLocalityManager.java       |  20 +++-
 .../samza/coordinator/JobModelManagerTestUtil.java |  19 +--
 .../samza/coordinator/TestJobModelManager.java     | 112 ++++--------------
 .../operators/impl/TestOperatorImplGraph.java      |   2 +-
 .../samza/rest/proxy/task/SamzaTaskProxy.java      |   9 +-
 .../test/performance/TestKeyValuePerformance.scala |   2 +-
 .../samza/validation/YarnJobValidationTool.java    |  42 +++----
 .../resources/scalate/WEB-INF/views/index.scaml    |   3 +-
 .../apache/samza/webapp/TestLocalityServlet.java   | 131 +++++++++++++++++++++
 35 files changed, 762 insertions(+), 421 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/context/JobContext.java 
b/samza-api/src/main/java/org/apache/samza/context/JobContext.java
index 8e41980..7166446 100644
--- a/samza-api/src/main/java/org/apache/samza/context/JobContext.java
+++ b/samza-api/src/main/java/org/apache/samza/context/JobContext.java
@@ -19,6 +19,7 @@
 package org.apache.samza.context;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.job.model.JobModel;
 
 
 /**
@@ -46,4 +47,9 @@ public interface JobContext {
    * @return the id for this job
    */
   String getJobId();
+
+  /**
+   * @return the {@link JobModel} for the job
+   */
+  JobModel getJobModel();
 }
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java 
b/samza-api/src/main/java/org/apache/samza/job/model/JobModel.java
similarity index 55%
rename from samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
rename to samza-api/src/main/java/org/apache/samza/job/model/JobModel.java
index be26f10..d1f5e72 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-api/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -20,11 +20,8 @@
 package org.apache.samza.job.model;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 
 /**
  * <p>
@@ -39,34 +36,14 @@ import 
org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
  * </p>
  */
 public class JobModel {
-  private static final String EMPTY_STRING = "";
   private final Config config;
   private final Map<String, ContainerModel> containers;
 
-  private final LocalityManager localityManager;
-  private final Map<String, String> localityMappings;
-
   public int maxChangeLogStreamPartitions;
 
   public JobModel(Config config, Map<String, ContainerModel> containers) {
-    this(config, containers, null);
-  }
-
-  public JobModel(Config config, Map<String, ContainerModel> containers, 
LocalityManager localityManager) {
     this.config = config;
     this.containers = Collections.unmodifiableMap(containers);
-    this.localityManager = localityManager;
-
-    // initialize container localityMappings
-    this.localityMappings = new HashMap<>();
-    if (localityManager == null) {
-      for (String containerId : containers.keySet()) {
-        localityMappings.put(containerId, null);
-      }
-    } else {
-      populateContainerLocalityMappings();
-    }
-
 
     // Compute the number of change log stream partitions as the maximum 
partition-id
     // of all total number of tasks of the job; Increment by 1 because 
partition ids
@@ -84,59 +61,6 @@ public class JobModel {
     return config;
   }
 
-  /**
-   * Returns the container to host mapping for a given container ID and 
mapping key
-   *
-   * @param containerId the ID of the container
-   * @param key mapping key which is one of the keys declared in {@link 
org.apache.samza.coordinator.stream.messages.SetContainerHostMapping}
-   * @return the value if it exists for a given container and key, otherwise 
an empty string
-   */
-  public String getContainerToHostValue(String containerId, String key) {
-    if (localityManager == null) {
-      return EMPTY_STRING;
-    }
-    final Map<String, String> mappings = 
localityManager.readContainerLocality().get(containerId);
-    if (mappings == null) {
-      return EMPTY_STRING;
-    }
-    if (!mappings.containsKey(key)) {
-      return EMPTY_STRING;
-    }
-    return mappings.get(key);
-  }
-
-  public Map<String, String> getAllContainerToHostValues(String key) {
-    if (localityManager == null) {
-      return Collections.EMPTY_MAP;
-    }
-    Map<String, String> allValues = new HashMap<>();
-    for (Map.Entry<String, Map<String, String>> entry : 
localityManager.readContainerLocality().entrySet()) {
-      String value = entry.getValue().get(key);
-      if (value != null) {
-        allValues.put(entry.getKey(), value);
-      }
-    }
-    return allValues;
-  }
-
-  private void populateContainerLocalityMappings() {
-    Map<String, Map<String, String>> allMappings = 
localityManager.readContainerLocality();
-    for (String containerId: containers.keySet()) {
-      if (allMappings.containsKey(containerId)) {
-        localityMappings.put(containerId, 
allMappings.get(containerId).get(SetContainerHostMapping.HOST_KEY));
-      } else {
-        localityMappings.put(containerId, null);
-      }
-    }
-  }
-
-  public Map<String, String> getAllContainerLocality() {
-    if (localityManager != null) {
-      populateContainerLocalityMappings();
-    }
-    return localityMappings;
-  }
-
   public Map<String, ContainerModel> getContainers() {
     return containers;
   }
diff --git 
a/samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java 
b/samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
new file mode 100644
index 0000000..7775434
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/model/LocalityModel.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Objects;
+import java.util.Map;
+
+/**
+ * A model to represent the locality information of an application. The 
locality information refers to the
+ * whereabouts of the physical execution of a samza container. This locality 
information is used
+ * to place the container, if possible, on the same host that it was last 
seen. By doing this, stateful applications
+ * can minimize the bootstrap time of their state by leveraging the local copy.
+ *
+ * It is suffice to have only {@link ProcessorLocality} model and use it 
within locality manager. However, this abstraction
+ * enables us extend locality beyond container. e.g. It is useful to track 
task locality to enable heterogeneous containers
+ * or fine grained execution model.
+ *
+ * In YARN deployment model, processors are interchangeably used for container 
and <i>processorId</i>refers to
+ * logical container id.
+ */
+public class LocalityModel {
+  /*
+   * A collection of processor locality keyed by processorId.
+   */
+  private final Map<String, ProcessorLocality> processorLocalities;
+
+  /**
+   * Construct locality model for the job from the input map of processor 
localities.
+   * @param processorLocalities host locality information for the job keyed by 
processor id
+   */
+  public LocalityModel(Map<String, ProcessorLocality> processorLocalities) {
+    this.processorLocalities = ImmutableMap.copyOf(processorLocalities);
+  }
+
+  /*
+   * Returns a {@link Map} of {@link ProcessorLocality} keyed by processors id.
+   */
+  public Map<String, ProcessorLocality> getProcessorLocalities() {
+    return processorLocalities;
+  }
+
+  /*
+   * Returns the {@link ProcessorLocality} for the given container processorId.
+   */
+  public ProcessorLocality getProcessorLocality(String processorId) {
+    return processorLocalities.get(processorId);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof LocalityModel)) {
+      return false;
+    }
+    LocalityModel that = (LocalityModel) o;
+    return Objects.deepEquals(processorLocalities, that.processorLocalities);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(processorLocalities);
+  }
+}
diff --git 
a/samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java 
b/samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java
new file mode 100644
index 0000000..3478568
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/job/model/ProcessorLocality.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.job.model;
+
+import java.util.Objects;
+
+/**
+ * A data model to represent the processor locality information. The locality 
information refers to the whereabouts
+ * of the physical execution of container.
+ * Fields such as <i>jmxUrl</i> and <i>jmxTunnelingUrl</i> exist for backward 
compatibility reasons as they were
+ * historically stored under the same name space as locality and surfaced 
within the framework through the locality
+ * manager.
+ */
+public class ProcessorLocality {
+  /* Processor identifier. In YARN deployment model, this corresponds to the 
logical container id */
+  private final String id;
+  /* Host on which the processor is currently placed */
+  private final String host;
+  private final String jmxUrl;
+  /* JMX tunneling URL for debugging */
+  private final String jmxTunnelingUrl;
+
+  public ProcessorLocality(String id, String host) {
+    this(id, host, "", "");
+  }
+
+  public ProcessorLocality(String id, String host, String jmxUrl, String 
jmxTunnelingUrl) {
+    this.id = id;
+    this.host = host;
+    this.jmxUrl = jmxUrl;
+    this.jmxTunnelingUrl = jmxTunnelingUrl;
+  }
+
+  public String id() {
+    return id;
+  }
+
+  public String host() {
+    return host;
+  }
+
+  public String jmxUrl() {
+    return jmxUrl;
+  }
+
+  public String jmxTunnelingUrl() {
+    return jmxTunnelingUrl;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ProcessorLocality that = (ProcessorLocality) o;
+    return Objects.equals(id, that.id)
+        && Objects.equals(host, that.host)
+        && Objects.equals(jmxUrl, that.jmxUrl)
+        && Objects.equals(jmxTunnelingUrl, that.jmxTunnelingUrl);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(id, host, jmxUrl, jmxTunnelingUrl);
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 8482a3b..68e2f77 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -44,6 +44,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.InputStreamsDiscoveredException;
 import org.apache.samza.coordinator.JobModelManager;
@@ -54,6 +55,7 @@ import org.apache.samza.coordinator.StreamRegexMonitor;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.JobModelUtil;
@@ -174,6 +176,7 @@ public class ClusterBasedJobCoordinator {
   private final MetadataStore metadataStore;
 
   private final SystemAdmins systemAdmins;
+  private final LocalityManager localityManager;
 
   /**
    * Internal variable for the instance of {@link JmxServer}
@@ -215,6 +218,8 @@ public class ClusterBasedJobCoordinator {
     ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
     this.isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
     this.jobCoordinatorSleepInterval = 
clusterManagerConfig.getJobCoordinatorSleepInterval();
+    this.localityManager =
+        new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetContainerHostMapping.TYPE));
 
     // build metastore for container placement messages
     containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(metadataStore);
@@ -343,6 +348,7 @@ public class ClusterBasedJobCoordinator {
       systemAdmins.stop();
       shutDowncontainerPlacementRequestAllocatorAndUtils();
       containerProcessManager.stop();
+      localityManager.close();
       metadataStore.close();
     } catch (Throwable e) {
       LOG.error("Exception while stopping cluster based job coordinator", e);
@@ -457,7 +463,7 @@ public class ClusterBasedJobCoordinator {
 
   @VisibleForTesting
   ContainerProcessManager createContainerProcessManager() {
-    return new ContainerProcessManager(config, state, metrics, 
containerPlacementMetadataStore);
+    return new ContainerProcessManager(config, state, metrics, 
containerPlacementMetadataStore, localityManager);
   }
 
   /**
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 2730c0c..24130fd 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -19,6 +19,7 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.time.Duration;
 import java.util.Optional;
 import java.util.UUID;
@@ -27,10 +28,11 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.placement.ContainerPlacementMessage;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
 import org.apache.samza.container.placement.ContainerPlacementResponseMessage;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.util.BoundedLinkedHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -82,19 +84,23 @@ public class ContainerManager {
 
   private final Optional<StandbyContainerManager> standbyContainerManager;
 
+  private final LocalityManager localityManager;
+
   public ContainerManager(ContainerPlacementMetadataStore 
containerPlacementMetadataStore,
       SamzaApplicationState samzaApplicationState, ClusterResourceManager 
clusterResourceManager,
-      boolean hostAffinityEnabled, boolean standByEnabled) {
+      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager 
localityManager) {
+    Preconditions.checkNotNull(localityManager, "Locality manager cannot be 
null");
     this.samzaApplicationState = samzaApplicationState;
     this.clusterResourceManager = clusterResourceManager;
     this.actions = new ConcurrentHashMap<>();
     this.placementRequestsCache = new 
BoundedLinkedHashSet<UUID>(UUID_CACHE_SIZE);
     this.hostAffinityEnabled = hostAffinityEnabled;
     this.containerPlacementMetadataStore = containerPlacementMetadataStore;
+    this.localityManager = localityManager;
     // Enable standby container manager if required
     if (standByEnabled) {
       this.standbyContainerManager =
-          Optional.of(new StandbyContainerManager(samzaApplicationState, 
clusterResourceManager));
+          Optional.of(new StandbyContainerManager(samzaApplicationState, 
clusterResourceManager, localityManager));
     } else {
       this.standbyContainerManager = Optional.empty();
     }
@@ -529,7 +535,9 @@ public class ContainerManager {
           processorId, currentResource.getContainerId(), 
currentResource.getHost(), requestMessage);
       sourceHost = currentResource.getHost();
     } else {
-      sourceHost = 
samzaApplicationState.jobModelManager.jobModel().getContainerToHostValue(processorId,
 SetContainerHostMapping.HOST_KEY);
+      sourceHost = 
Optional.ofNullable(localityManager.readLocality().getProcessorLocality(processorId))
+          .map(ProcessorLocality::host)
+          .orElse(null);
       LOG.info("Processor ID: {} is not running and was last seen on host: {} 
for ContainerPlacement action: {}",
           processorId, sourceHost, requestMessage);
     }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index f6e3b1f..1bc1669 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -19,6 +19,7 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
@@ -26,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
 import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
@@ -33,9 +35,11 @@ import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.placement.ContainerPlacementRequestMessage;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.diagnostics.DiagnosticsManager;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.metrics.ContainerProcessManagerMetrics;
 import org.apache.samza.metrics.JvmMetrics;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -103,6 +107,8 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
 
   private final Option<DiagnosticsManager> diagnosticsManager;
 
+  private final LocalityManager localityManager;
+
   /**
    * A standard interface to request resources.
    */
@@ -130,7 +136,8 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
   private Map<String, MetricsReporter> metricsReporters;
 
   public ContainerProcessManager(Config config, SamzaApplicationState state, 
MetricsRegistryMap registry,
-      ContainerPlacementMetadataStore metadataStore) {
+      ContainerPlacementMetadataStore metadataStore, LocalityManager 
localityManager) {
+    Preconditions.checkNotNull(localityManager, "Locality manager cannot be 
null");
     this.state = state;
     this.clusterManagerConfig = new ClusterManagerConfig(config);
     this.jobConfig = new JobConfig(config);
@@ -159,11 +166,12 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
       diagnosticsManager = Option.empty();
     }
 
+    this.localityManager = localityManager;
     // Wire all metrics to all reporters
     this.metricsReporters.values().forEach(reporter -> 
reporter.register(METRICS_SOURCE_NAME, registry));
 
     this.containerManager = new ContainerManager(metadataStore, state, 
clusterResourceManager, hostAffinityEnabled,
-        jobConfig.getStandbyTasksEnabled());
+        jobConfig.getStandbyTasksEnabled(), localityManager);
 
     this.containerAllocator = new 
ContainerAllocator(this.clusterResourceManager, config, state, 
hostAffinityEnabled, this.containerManager);
     this.allocatorThread = new Thread(this.containerAllocator, "Container 
Allocator Thread");
@@ -176,7 +184,8 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
       MetricsRegistryMap registry,
       ClusterResourceManager resourceManager,
       Optional<ContainerAllocator> allocator,
-      ContainerManager containerManager) {
+      ContainerManager containerManager,
+      LocalityManager localityManager) {
     this.state = state;
     this.clusterManagerConfig = clusterManagerConfig;
     this.jobConfig = new JobConfig(clusterManagerConfig);
@@ -186,6 +195,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     this.clusterResourceManager = resourceManager;
     this.containerManager = containerManager;
     this.diagnosticsManager = Option.empty();
+    this.localityManager = localityManager;
     this.containerAllocator = allocator.orElseGet(
       () -> new ContainerAllocator(this.clusterResourceManager, 
clusterManagerConfig, state,
           hostAffinityEnabled, this.containerManager));
@@ -233,8 +243,16 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     
state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
 
     // Request initial set of containers
-    Map<String, String> processorToHostMapping = 
state.jobModelManager.jobModel().getAllContainerLocality();
-    containerAllocator.requestResources(processorToHostMapping);
+    LocalityModel localityModel = localityManager.readLocality();
+    Map<String, String> processorToHost = new HashMap<>();
+    
state.jobModelManager.jobModel().getContainers().keySet().forEach((containerId) 
-> {
+      String host = 
Optional.ofNullable(localityModel.getProcessorLocality(containerId))
+          .map(ProcessorLocality::host)
+          .filter(StringUtils::isNotBlank)
+          .orElse(null);
+      processorToHost.put(containerId, host);
+    });
+    containerAllocator.requestResources(processorToHost);
 
     // Start container allocator thread
     LOG.info("Starting the container allocator thread");
@@ -476,8 +494,10 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
 
     state.neededProcessors.incrementAndGet();
     // Find out previously running container location
-    String lastSeenOn = 
state.jobModelManager.jobModel().getContainerToHostValue(processorId, 
SetContainerHostMapping.HOST_KEY);
-    if (!hostAffinityEnabled || lastSeenOn == null) {
+    String lastSeenOn = 
Optional.ofNullable(localityManager.readLocality().getProcessorLocality(processorId))
+        .map(ProcessorLocality::host)
+        .orElse(null);
+    if (!hostAffinityEnabled || StringUtils.isBlank(lastSeenOn)) {
       lastSeenOn = ResourceRequestState.ANY_HOST;
     }
     LOG.info("Container ID: {} for Processor ID: {} was last seen on host 
{}.", containerId, processorId, lastSeenOn);
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
index 30d0de9..b849ea5 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/StandbyContainerManager.java
@@ -26,8 +26,10 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +44,8 @@ public class StandbyContainerManager {
 
   private final SamzaApplicationState samzaApplicationState;
 
+  private final LocalityManager localityManager;
+
   // Map of samza containerIDs to their corresponding active and standby 
containers, e.g., 0 -> {0-0, 0-1}, 0-0 -> {0, 0-1}
   // This is used for checking no two standbys or active-standby-pair are 
started on the same host
   private final Map<String, List<String>> standbyContainerConstraints;
@@ -53,8 +57,9 @@ public class StandbyContainerManager {
   private ClusterResourceManager clusterResourceManager;
 
   public StandbyContainerManager(SamzaApplicationState samzaApplicationState,
-      ClusterResourceManager clusterResourceManager) {
+      ClusterResourceManager clusterResourceManager, LocalityManager 
localityManager) {
     this.failovers = new ConcurrentHashMap<>();
+    this.localityManager = localityManager;
     this.standbyContainerConstraints = new HashMap<>();
     this.samzaApplicationState = samzaApplicationState;
     JobModel jobModel = samzaApplicationState.jobModelManager.jobModel();
@@ -297,12 +302,13 @@ public class StandbyContainerManager {
     // We iterate over the list of last-known standbyHosts to check if anyone 
of them has not already been tried
     for (String standbyContainerID : 
this.standbyContainerConstraints.get(activeContainerID)) {
 
-      String standbyHost = 
this.samzaApplicationState.jobModelManager.jobModel().
-          getContainerToHostValue(standbyContainerID, 
SetContainerHostMapping.HOST_KEY);
+      String standbyHost =
+          
Optional.ofNullable(localityManager.readLocality().getProcessorLocality(standbyContainerID))
+              .map(ProcessorLocality::host)
+              .orElse(null);
 
-      if (standbyHost == null || standbyHost.isEmpty()) {
+      if (StringUtils.isNotBlank(standbyHost)) {
         log.info("No last known standbyHost for container {}", 
standbyContainerID);
-
       } else if (failoverMetadata.isPresent() && 
failoverMetadata.get().isStandbyHostUsed(standbyHost)) {
 
         log.info("Not using standby host {} for active container {} because it 
had already been selected", standbyHost,
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 34baad0..6f6951f 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -19,18 +19,22 @@
 
 package org.apache.samza.container;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.serializers.Serde;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Used for persisting and reading the container-to-host assignment 
information into the metadata store.
+ * Used for persisting and reading the locality information into the metadata 
store. Currently, we store the
+ * processor-to-host assignment.
  * */
 public class LocalityManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalityManager.class);
@@ -53,47 +57,46 @@ public class LocalityManager {
   }
 
   /**
-   * Method to allow read container locality information from the {@link 
MetadataStore}.
-   * This method is used in {@link 
org.apache.samza.coordinator.JobModelManager}.
+   * Fetch the processor locality information from the {@link MetadataStore}. 
In YARN deployment model, the
+   * processor refers to the samza container.
    *
-   * @return the map of containerId: (hostname)
+   * @return the {@code LocalityModel} for the job
    */
-  public Map<String, Map<String, String>> readContainerLocality() {
-    Map<String, Map<String, String>> allMappings = new HashMap<>();
-    metadataStore.all().forEach((containerId, valueBytes) -> {
+  public LocalityModel readLocality() {
+    Map<String, ProcessorLocality> containerLocalityMap = new HashMap<>();
+
+    metadataStore.all().forEach((processorId, valueBytes) -> {
       if (valueBytes != null) {
         String locationId = valueSerde.fromBytes(valueBytes);
-        Map<String, String> values = new HashMap<>();
-        values.put(SetContainerHostMapping.HOST_KEY, locationId);
-        allMappings.put(containerId, values);
+        containerLocalityMap.put(processorId, new 
ProcessorLocality(processorId, locationId));
       }
     });
     if (LOG.isDebugEnabled()) {
-      for (Map.Entry<String, Map<String, String>> entry : 
allMappings.entrySet()) {
-        LOG.debug(String.format("Locality for container %s: %s", 
entry.getKey(), entry.getValue()));
+      for (Map.Entry<String, ProcessorLocality> entry : 
containerLocalityMap.entrySet()) {
+        LOG.debug(String.format("Locality for container %s: %s", 
entry.getKey(), entry.getValue().host()));
       }
     }
 
-    return Collections.unmodifiableMap(allMappings);
+    return new LocalityModel(containerLocalityMap);
   }
 
   /**
    * Method to write locality information to the {@link MetadataStore}. This 
method is used in {@link SamzaContainer}.
    *
-   * @param containerId  the {@link SamzaContainer} ID
+   * @param processorId a.k.a logical container ID
    * @param hostName  the hostname
    */
-  public void writeContainerToHostMapping(String containerId, String hostName) 
{
-    Map<String, Map<String, String>> containerToHostMapping = 
readContainerLocality();
-    Map<String, String> existingMappings = 
containerToHostMapping.get(containerId);
-    String existingHostMapping = existingMappings != null ? 
existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
-    if (existingHostMapping != null && !existingHostMapping.equals(hostName)) {
-      LOG.info("Container {} moved from {} to {}", containerId, 
existingHostMapping, hostName);
+  public void writeContainerToHostMapping(String processorId, String hostName) 
{
+    String existingHostMapping = 
Optional.ofNullable(readLocality().getProcessorLocality(processorId))
+        .map(ProcessorLocality::host)
+        .orElse(null);
+    if (StringUtils.isNotBlank(existingHostMapping) && 
!existingHostMapping.equals(hostName)) {
+      LOG.info("Container {} moved from {} to {}", processorId, 
existingHostMapping, hostName);
     } else {
-      LOG.info("Container {} started at {}", containerId, hostName);
+      LOG.info("Container {} started at {}", processorId, hostName);
     }
 
-    metadataStore.put(containerId, valueSerde.toBytes(hostName));
+    metadataStore.put(processorId, valueSerde.toBytes(hostName));
     metadataStore.flush();
   }
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java 
b/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
index ee6b492..4199d39 100644
--- a/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
@@ -20,17 +20,20 @@ package org.apache.samza.context;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.job.model.JobModel;
 
 
 public class JobContextImpl implements JobContext {
   private final Config config;
+  private final JobModel jobModel;
   private final String jobName;
   private final String jobId;
 
-  private JobContextImpl(Config config, String jobName, String jobId) {
+  private JobContextImpl(Config config, String jobName, String jobId, JobModel 
jobModel) {
     this.config = config;
     this.jobName = jobName;
     this.jobId = jobId;
+    this.jobModel = jobModel;
   }
 
   /**
@@ -38,15 +41,16 @@ public class JobContextImpl implements JobContext {
    * This extracts some information like job name and job id.
    *
    * @param config used to extract job information
+   * @param jobModel job model
    * @return {@link JobContextImpl} corresponding to the {@code config}
    * @throws IllegalArgumentException if job name is not defined in the {@code 
config}
    */
-  public static JobContextImpl fromConfigWithDefaults(Config config) {
+  public static JobContextImpl fromConfigWithDefaults(Config config, JobModel 
jobModel) {
     JobConfig jobConfig = new JobConfig(config);
     String jobName = jobConfig.getName()
         .orElseThrow(() -> new IllegalArgumentException(String.format("Config 
%s is missing", JobConfig.JOB_NAME)));
     String jobId = jobConfig.getJobId();
-    return new JobContextImpl(config, jobName, jobId);
+    return new JobContextImpl(config, jobName, jobId, jobModel);
   }
 
   @Override
@@ -63,4 +67,9 @@ public class JobContextImpl implements JobContext {
   public String getJobId() {
     return this.jobId;
   }
+
+  @Override
+  public JobModel getJobModel() {
+    return this.jobModel;
+  }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
new file mode 100644
index 0000000..24e12ff
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/server/LocalityServlet.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.server;
+
+import java.io.IOException;
+import java.util.Optional;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+
+
+/**
+ * A servlet for locality information of a job. The servlet is hosted 
alongside of the {@link JobServlet} which hosts
+ * job model and configuration. Historically, locality information was part of 
job model but we extracted the locality
+ * as job model is static within the lifecycle of an application attempt while 
locality changes in the event of container
+ * movements. The locality information is served under
+ * {@link org.apache.samza.coordinator.JobModelManager#server()}/locality. The 
server and the port information are
+ * dynamic and is determined at the start AM. YARN dashboard or 
job-coordinator logs contains the server
+ * and the port information.
+ *
+ * This separation enables us to achieve performance benefits by caching job 
model when it is served by the AM as it
+ * can incur significant penalty in the job start time for jobs with large 
number of containers.
+ */
+public class LocalityServlet extends HttpServlet {
+  private static final String PROCESSOR_ID_PARAM = "processorId";
+  private final ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+  private final LocalityManager localityManager;
+
+  public LocalityServlet(LocalityManager localityManager) {
+    this.localityManager = localityManager;
+  }
+
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response) 
throws IOException {
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    LocalityModel localityModel = localityManager.readLocality();
+
+    if (request.getParameterMap().size() == 1) {
+      String processorId = request.getParameter(PROCESSOR_ID_PARAM);
+      ProcessorLocality processorLocality = 
Optional.ofNullable(localityModel.getProcessorLocality(processorId))
+          .orElse(new ProcessorLocality(processorId, ""));
+      mapper.writeValue(response.getWriter(), processorLocality);
+    } else {
+      mapper.writeValue(response.getWriter(), localityModel);
+    }
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
index f55f02f..79b52e6 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -97,7 +97,6 @@ public class LocalJobPlanner extends JobPlanner {
     }
 
     // 2. create the necessary streams
-    // TODO: System generated intermediate streams should have robust naming 
scheme. See SAMZA-1391
     // TODO: this works for single-job applications. For multi-job 
applications, ExecutionPlan should return an AppConfig
     // to be used for the whole application
     JobConfig jobConfig = jobConfigs.get(0);
diff --git 
a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java 
b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
index c51fd85..9dd85b5 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/RemoteJobPlanner.java
@@ -48,7 +48,6 @@ public class RemoteJobPlanner extends JobPlanner {
   @Override
   public List<JobConfig> prepareJobs() {
     // for high-level DAG, generate the plan and job configs
-    // TODO: run.id needs to be set for standalone: SAMZA-1531
     // run.id is based on current system time with the most significant bits 
in UUID (8 digits) to avoid collision
     String runId = String.valueOf(System.currentTimeMillis()) + "-" + 
UUID.randomUUID().toString().substring(0, 8);
     LOG.info("The run id for this run is {}", runId);
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index ed0c875..47c1754 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -405,7 +405,7 @@ public class StreamProcessor {
     }
 
     return SamzaContainer.apply(processorId, jobModel, 
ScalaJavaUtil.toScalaMap(this.customMetricsReporter),
-        this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config),
+        this.taskFactory, JobContextImpl.fromConfigWithDefaults(this.config, 
jobModel),
         
Option.apply(this.applicationDefinedContainerContextFactoryOptional.orElse(null)),
         
Option.apply(this.applicationDefinedTaskContextFactoryOptional.orElse(null)),
         Option.apply(this.externalContextOptional.orElse(null)), null, 
startpointManager,
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java 
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index 4470ae7..a5148fb 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -128,7 +128,7 @@ public class ContainerLaunchUtil {
           containerId, jobModel,
           ScalaJavaUtil.toScalaMap(metricsReporters),
           taskFactory,
-          JobContextImpl.fromConfigWithDefaults(config),
+          JobContextImpl.fromConfigWithDefaults(config, jobModel),
           
Option.apply(appDesc.getApplicationContainerContextFactory().orElse(null)),
           
Option.apply(appDesc.getApplicationTaskContextFactory().orElse(null)),
           Option.apply(externalContextOptional.orElse(null)),
diff --git a/samza-api/src/main/java/org/apache/samza/context/JobContext.java 
b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonLocalityModelMixIn.java
similarity index 53%
copy from samza-api/src/main/java/org/apache/samza/context/JobContext.java
copy to 
samza-core/src/main/java/org/apache/samza/serializers/model/JsonLocalityModelMixIn.java
index 8e41980..79b1367 100644
--- a/samza-api/src/main/java/org/apache/samza/context/JobContext.java
+++ 
b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonLocalityModelMixIn.java
@@ -16,34 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.samza.context;
+package org.apache.samza.serializers.model;
 
-import org.apache.samza.config.Config;
+import java.util.Map;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
 
 
 /**
- * The framework-provided context for the job.
+ * A mix-in Jackson class to convert {@link 
org.apache.samza.job.model.LocalityModel} to/from JSON
  */
-public interface JobContext {
+@JsonIgnoreProperties(ignoreUnknown = true)
+public abstract class JsonLocalityModelMixIn {
+  @JsonCreator
+  public JsonLocalityModelMixIn(@JsonProperty("processor-localities") 
Map<String, ProcessorLocality> processorLocalities) {
 
-  /**
-   * Gets the final configuration for this job.
-   *
-   * @return the configuration for this job
-   */
-  Config getConfig();
+  }
 
-  /**
-   * Gets the name of the job.
-   *
-   * @return the name of this job
-   */
-  String getJobName();
-
-  /**
-   * Gets the id for this job.
-   *
-   * @return the id for this job
-   */
-  String getJobId();
+  @JsonProperty("processor-localities")
+  abstract Map<String, ProcessorLocality> processorLocalities();
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonProcessorLocalityMixIn.java
 
b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonProcessorLocalityMixIn.java
new file mode 100644
index 0000000..cce573e
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonProcessorLocalityMixIn.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.serializers.model;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * A mix-in Jackson class to convert {@link 
org.apache.samza.job.model.ProcessorLocality} to/from JSON
+ * <b>NOTE:</b> In YARN deployment model, the id refers to the logical 
container id.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public abstract class JsonProcessorLocalityMixIn {
+  @JsonCreator
+  public JsonProcessorLocalityMixIn(@JsonProperty("id") String id, 
@JsonProperty("host") String host,
+      @JsonProperty("jmx-url") String jmxUrl, 
@JsonProperty("jmx-tunneling-url") String jmxTunnelingUrl) {
+  }
+
+  @JsonProperty("id")
+  abstract String id();
+
+  @JsonProperty("host")
+  abstract String host();
+
+  @JsonProperty("jmx-url")
+  abstract String jmxUrl();
+
+  @JsonProperty("jmx-tunneling-url")
+  abstract String jmxTunnelingUrl();
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
 
b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 694987f..db147f0 100644
--- 
a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ 
b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -28,7 +28,9 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.system.SystemStream;
@@ -130,6 +132,11 @@ public class SamzaObjectMapper {
       }
     });
 
+    mapper.getSerializationConfig().addMixInAnnotations(LocalityModel.class, 
JsonLocalityModelMixIn.class);
+    mapper.getDeserializationConfig().addMixInAnnotations(LocalityModel.class, 
JsonLocalityModelMixIn.class);
+    
mapper.getSerializationConfig().addMixInAnnotations(ProcessorLocality.class, 
JsonProcessorLocalityMixIn.class);
+    
mapper.getDeserializationConfig().addMixInAnnotations(ProcessorLocality.class, 
JsonProcessorLocalityMixIn.class);
+
     // Convert camel case to hyphenated field names, and register the module.
     mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
     mapper.registerModule(module);
diff --git 
a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java 
b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
index f352bd0..9d1896e 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
@@ -77,6 +77,7 @@ public class StorageRecovery {
   private final Map<String, ContainerStorageManager> containerStorageManagers 
= new HashMap<>();
 
   private int maxPartitionNumber = 0;
+  private JobModel jobModel;
   private Map<String, ContainerModel> containers = new HashMap<>();
 
   /**
@@ -145,6 +146,7 @@ public class StorageRecovery {
           JobModelManager.apply(configFromCoordinatorStream, 
changelogStreamManager.readPartitionMapping(),
               coordinatorStreamStore, metricsRegistryMap);
       JobModel jobModel = jobModelManager.jobModel();
+      this.jobModel = jobModel;
       containers = jobModel.getContainers();
     } finally {
       coordinatorStreamStore.close();
@@ -249,7 +251,7 @@ public class StorageRecovery {
               jobConfig,
               new HashMap<>(),
               new SamzaContainerMetrics(containerModel.getId(), new 
MetricsRegistryMap(), ""),
-              JobContextImpl.fromConfigWithDefaults(jobConfig),
+              JobContextImpl.fromConfigWithDefaults(jobConfig, jobModel),
               containerContext,
               new HashMap<>(),
               storeBaseDir,
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index 37162c4..c7e7c7c 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -35,8 +35,7 @@ import 
org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping
 import org.apache.samza.container.LocalityManager
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
-import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.coordinator.server.{HttpServer, JobServlet, 
LocalityServlet}
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping
 import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
@@ -95,14 +94,15 @@ object JobModelManager extends Logging {
       val grouperMetadata: GrouperMetadata = getGrouperMetadata(config, 
localityManager, taskAssignmentManager, taskPartitionAssignmentManager)
 
       val jobModel: JobModel = readJobModel(config, changelogPartitionMapping, 
streamMetadataCache, grouperMetadata)
-      jobModelRef.set(new JobModel(jobModel.getConfig, jobModel.getContainers, 
localityManager))
+      jobModelRef.set(new JobModel(jobModel.getConfig, jobModel.getContainers))
 
       updateTaskAssignments(jobModel, taskAssignmentManager, 
taskPartitionAssignmentManager, grouperMetadata)
 
       val server = new HttpServer
       server.addServlet("/", new JobServlet(jobModelRef))
+      server.addServlet("/locality", new LocalityServlet(localityManager))
 
-      currentJobModelManager = new JobModelManager(jobModelRef.get(), server, 
localityManager)
+      currentJobModelManager = new JobModelManager(jobModelRef.get(), server)
       currentJobModelManager
     } finally {
       systemAdmins.stop()
@@ -167,15 +167,18 @@ object JobModelManager extends Logging {
     */
   def getProcessorLocality(config: Config, localityManager: LocalityManager) = 
{
     val containerToLocationId: util.Map[String, LocationId] = new 
util.HashMap[String, LocationId]()
-    val existingContainerLocality = localityManager.readContainerLocality()
+    val existingContainerLocality = 
localityManager.readLocality().getProcessorLocalities
 
     for (containerId <- 0 until new JobConfig(config).getContainerCount) {
-      val localityMapping = existingContainerLocality.get(containerId.toString)
+      val preferredHost = 
Option.apply(existingContainerLocality.get(containerId.toString))
+        .map(containerLocality => containerLocality.host())
+        .filter(host => host.nonEmpty)
+        .orNull
       // To handle the case when the container count is increased between two 
different runs of a samza-yarn job,
       // set the locality of newly added containers to any_host.
       var locationId: LocationId = new LocationId("ANY_HOST")
-      if (localityMapping != null && 
localityMapping.containsKey(SetContainerHostMapping.HOST_KEY)) {
-        locationId = new 
LocationId(localityMapping.get(SetContainerHostMapping.HOST_KEY))
+      if (preferredHost != null) {
+        locationId = new LocationId(preferredHost)
       }
       containerToLocationId.put(containerId.toString, locationId)
     }
@@ -366,6 +369,7 @@ object JobModelManager extends Logging {
 
     // processor list is required by some of the groupers. So, let's pass them 
as part of the config.
     // Copy the config and add the processor list to the config copy.
+    // TODO: It is non-ideal to have config as a medium to transmit the 
locality information; especially, if the locality information evolves. Evaluate 
options on using context objects to pass dependent components.
     val configMap = new util.HashMap[String, String](config)
     configMap.put(JobConfig.PROCESSOR_LIST, String.join(",", 
grouperMetadata.getProcessorLocality.keySet()))
     val grouper = getSystemStreamPartitionGrouper(new MapConfig(configMap))
@@ -444,12 +448,7 @@ class JobModelManager(
   /**
    * HTTP server used to serve a Samza job's container model to 
SamzaContainers when they start up.
    */
-  val server: HttpServer = null,
-
-  /**
-   * LocalityManager employed to read and write container and task locality 
information to metadata store.
-   */
-  val localityManager: LocalityManager = null) extends Logging {
+  val server: HttpServer = null) extends Logging {
 
   debug("Got job model: %s." format jobModel)
 
@@ -466,11 +465,6 @@ class JobModelManager(
       debug("Stopping HTTP server.")
       server.stop
       info("Stopped HTTP server.")
-      if (localityManager != null) {
-        info("Stopping localityManager")
-        localityManager.close()
-        info("Stopped localityManager")
-      }
     }
   }
 }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 79bd181..9b5a073 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -149,7 +149,7 @@ class ThreadJobFactory extends StreamJobFactory with 
Logging {
         jobModel,
         Map[String, MetricsReporter](),
         taskFactory,
-        JobContextImpl.fromConfigWithDefaults(config),
+        JobContextImpl.fromConfigWithDefaults(config, jobModel),
         Option(appDesc.getApplicationContainerContextFactory.orElse(null)),
         Option(appDesc.getApplicationTaskContextFactory.orElse(null)),
         buildExternalContext(config)
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index d5819eb..2b4a4b0 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -36,7 +36,8 @@ import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.JobModelManagerTestUtil;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -58,7 +59,7 @@ import static org.mockito.Mockito.*;
 public class TestContainerAllocatorWithHostAffinity {
 
   private final Config config = getConfig();
-  private final JobModelManager jobModelManager = 
initializeJobModelManager(config, 1);
+  private final JobModelManager jobModelManager = 
initializeJobModelManager(getConfig(), 1);
   private final MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
   private final SamzaApplicationState state = new 
SamzaApplicationState(jobModelManager);
 
@@ -67,14 +68,7 @@ public class TestContainerAllocatorWithHostAffinity {
   private ContainerManager containerManager;
 
   private JobModelManager initializeJobModelManager(Config config, int 
containerCount) {
-    Map<String, Map<String, String>> localityMap = new HashMap<>();
-    localityMap.put("0", new HashMap<String, String>() { {
-        put(SetContainerHostMapping.HOST_KEY, "abc");
-      } });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-
-    return 
JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(), 
containerCount, mockLocalityManager,
+    return JobModelManagerTestUtil.getJobModelManager(config, containerCount,
         new MockHttpServer("/", 7777, null, new 
ServletHolder(DefaultServlet.class)));
   }
 
@@ -87,12 +81,15 @@ public class TestContainerAllocatorWithHostAffinity {
 
   @Before
   public void setup() throws Exception {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of("0", new 
ProcessorLocality("0", "abc"))));
     CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(config);
     CoordinatorStreamStore coordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
     coordinatorStreamStore.init();
     containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
-    containerManager = new ContainerManager(containerPlacementMetadataStore, 
state, clusterResourceManager, true, false);
+    containerManager = new ContainerManager(containerPlacementMetadataStore, 
state, clusterResourceManager, true, false, mockLocalityManager);
     containerAllocator =
         new ContainerAllocator(clusterResourceManager, config, state, true, 
containerManager);
     requestState = new MockContainerRequestState(clusterResourceManager, true);
@@ -372,7 +369,7 @@ public class TestContainerAllocatorWithHostAffinity {
     ClusterResourceManager.Callback mockCPM = 
mock(MockClusterResourceManagerCallback.class);
     ClusterResourceManager mockClusterResourceManager = new 
MockClusterResourceManager(mockCPM, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
mockClusterResourceManager, true, false);
+        new ContainerManager(containerPlacementMetadataStore, state, 
mockClusterResourceManager, true, false, mock(LocalityManager.class));
     // Mock the callback from ClusterManager to add resources to the allocator
     doAnswer((InvocationOnMock invocation) -> {
       SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, 
List.class).get(0);
@@ -419,7 +416,7 @@ public class TestContainerAllocatorWithHostAffinity {
   public void testExpiredRequestAllocationOnAnyHost() throws Exception {
     MockClusterResourceManager spyManager = spy(new 
MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, 
spyManager, true, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
spyManager, true, false, mock(LocalityManager.class)));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyManager, config, state, true, 
spyContainerManager));
     // Request Preferred Resources
@@ -463,7 +460,7 @@ public class TestContainerAllocatorWithHostAffinity {
     // Add Extra Resources
     MockClusterResourceManager spyClusterResourceManager = spy(new 
MockClusterResourceManager(callback, state));
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, 
spyClusterResourceManager, true, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
spyClusterResourceManager, true, false, mock(LocalityManager.class)));
 
     spyAllocator = Mockito.spy(
         new ContainerAllocator(spyClusterResourceManager, config, state, true, 
spyContainerManager));
@@ -516,7 +513,7 @@ public class TestContainerAllocatorWithHostAffinity {
     ClusterResourceManager.Callback mockCPM = 
mock(MockClusterResourceManagerCallback.class);
     MockClusterResourceManager mockClusterResourceManager = new 
MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, 
mockClusterResourceManager, true, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
mockClusterResourceManager, true, false, mock(LocalityManager.class)));
 
     SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, 
"host-0", "id0",
         System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index f9104bd..b808296 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -26,10 +26,12 @@ import java.util.Map;
 import 
org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.JobModelManagerTestUtil;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -72,13 +74,15 @@ public class TestContainerAllocatorWithoutHostAffinity {
 
   @Before
   public void setup() throws Exception {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new 
HashMap<>()));
     CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(config);
     CoordinatorStreamStore coordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
     coordinatorStreamStore.init();
     containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(coordinatorStreamStore);
     containerPlacementMetadataStore.start();
     containerAllocator = new ContainerAllocator(manager, config, state, false,
-        new ContainerManager(containerPlacementMetadataStore, state, manager, 
false, false));
+        new ContainerManager(containerPlacementMetadataStore, state, manager, 
false, false, mockLocalityManager));
     requestState = new MockContainerRequestState(manager, false);
     Field requestStateField = 
containerAllocator.getClass().getDeclaredField("resourceRequestState");
     requestStateField.setAccessible(true);
@@ -276,7 +280,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
     ClusterResourceManager.Callback mockCPM = 
mock(ClusterResourceManager.Callback.class);
     ClusterResourceManager mockManager = new 
MockClusterResourceManager(mockCPM, state);
     ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, 
mockManager, false, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
mockManager, false, false, mock(LocalityManager.class)));
     spyAllocator = Mockito.spy(
         new ContainerAllocator(mockManager, config, state, false, 
spyContainerManager));
     // Mock the callback from ClusterManager to add resources to the allocator
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index 0ec635d..53bd5b0 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -45,8 +45,8 @@ import org.apache.samza.coordinator.JobModelManagerTestUtil;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
 import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
@@ -103,6 +103,7 @@ public class TestContainerPlacementActions {
   private ContainerManager containerManager;
   private MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity;
   private ContainerProcessManager cpm;
+  private LocalityManager localityManager;
   private ClusterResourceManager.Callback callback;
 
   private Config getConfig() {
@@ -122,30 +123,8 @@ public class TestContainerPlacementActions {
     return new MapConfig(map);
   }
 
-  private JobModelManager getJobModelManagerWithHostAffinity(Map<String, 
String> containerIdToHost) {
-    Map<String, Map<String, String>> localityMap = new HashMap<>();
-    containerIdToHost.forEach((containerId, host) -> {
-      localityMap.put(containerId,
-          ImmutableMap.of(SetContainerHostMapping.HOST_KEY, 
containerIdToHost.get(containerId)));
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-
-    return 
JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(), 
containerIdToHost.size(),
-        mockLocalityManager, this.server);
-  }
-
-  private JobModelManager 
getJobModelManagerWithHostAffinityWithStandby(Map<String, String> 
containerIdToHost) {
-    Map<String, Map<String, String>> localityMap = new HashMap<>();
-    containerIdToHost.forEach((containerId, host) -> {
-      localityMap.put(containerId,
-          ImmutableMap.of(SetContainerHostMapping.HOST_KEY, 
containerIdToHost.get(containerId)));
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-    // Generate JobModel for standby containers
-    JobModel standbyJobModel = TestStandbyAllocator.getJobModelWithStandby(2, 
2, 2, Optional.of(mockLocalityManager));
-    return new JobModelManager(standbyJobModel, server, null);
+  private JobModelManager getJobModelManagerWithStandby() {
+    return new JobModelManager(TestStandbyAllocator.getJobModelWithStandby(2, 
2, 2), server);
   }
 
   @Before
@@ -159,14 +138,19 @@ public class TestContainerPlacementActions {
     containerPlacementMetadataStore.start();
     // Utils Related to Cluster manager:
     config = new MapConfig(configVals, 
getConfigWithHostAffinityAndRetries(true, 1, true));
-    state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host-1", "1", "host-2")));
+    state = new 
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 
2, server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
-    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false));
+    localityManager = mock(LocalityManager.class);
+    when(localityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of(
+            "0", new ProcessorLocality("0", "host-1"),
+            "1", new ProcessorLocality("1", "host-2"))));
+    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, localityManager));
     allocatorWithHostAffinity = new 
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, 
containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(),
-            clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager);
+            clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager, localityManager);
   }
 
   @After
@@ -177,15 +161,22 @@ public class TestContainerPlacementActions {
   }
 
   public void setupStandby() throws Exception {
-    state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinityWithStandby(ImmutableMap.of("0",
 "host-1", "1", "host-2", "0-0", "host-2", "1-0", "host-1")));
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of(
+            "0", new ProcessorLocality("0", "host-1"),
+            "1", new ProcessorLocality("1", "host-2"),
+            "0-0", new ProcessorLocality("0", "host-2"),
+            "1-0", new ProcessorLocality("0", "host-1"))));
+    state = new SamzaApplicationState(getJobModelManagerWithStandby());
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
     // Enable standby
-    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, true));
+    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, true, mockLocalityManager));
     allocatorWithHostAffinity = new 
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, 
containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(),
-        clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager);
+        clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager, mockLocalityManager);
   }
 
   @Test(timeout = 10000)
@@ -558,14 +549,14 @@ public class TestContainerPlacementActions {
   public void testContainerPlacementsForJobRunningInDegradedState() throws 
Exception {
     // Set failure after retries to false to enable job running in degraded 
state
     config = new MapConfig(configVals, 
getConfigWithHostAffinityAndRetries(true, 1, false));
-    state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host-1", "1", "host-2")));
+    state = new 
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 
2, this.server));
     callback = mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
-    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false));
+    containerManager = spy(new 
ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, localityManager));
     allocatorWithHostAffinity = new 
MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, 
containerManager);
     cpm = new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(),
-        clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager);
+        clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager, localityManager);
 
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
@@ -672,18 +663,18 @@ public class TestContainerPlacementActions {
     Map<String, String> conf = new HashMap<>();
     conf.putAll(getConfigWithHostAffinityAndRetries(false, 1, true));
     SamzaApplicationState state =
-        new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host-1", "1", "host-2")));
+        new 
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 
2, this.server));
     ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, false, false);
+        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, false, false, localityManager);
     MockContainerAllocatorWithoutHostAffinity allocatorWithoutHostAffinity =
         new MockContainerAllocatorWithoutHostAffinity(clusterResourceManager, 
new MapConfig(conf), state,
             containerManager);
 
     ContainerProcessManager cpm = new ContainerProcessManager(
         new ClusterManagerConfig(new MapConfig(getConfig(), 
getConfigWithHostAffinityAndRetries(false, 1, true))), state,
-        new MetricsRegistryMap(), clusterResourceManager, 
Optional.of(allocatorWithoutHostAffinity), containerManager);
+        new MetricsRegistryMap(), clusterResourceManager, 
Optional.of(allocatorWithoutHostAffinity), containerManager, localityManager);
 
     // Mimic Cluster Manager returning any request
     doAnswer(new Answer<Void>() {
@@ -807,16 +798,16 @@ public class TestContainerPlacementActions {
   @Test(expected = NullPointerException.class)
   public void testBadControlRequestRejected() throws Exception {
     SamzaApplicationState state =
-        new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host-1", "1", "host-2")));
+        new 
SamzaApplicationState(JobModelManagerTestUtil.getJobModelManager(getConfig(), 
2, this.server));
     ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ContainerManager containerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false));
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, localityManager));
     MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity =
         new MockContainerAllocatorWithHostAffinity(clusterResourceManager, 
config, state, containerManager);
     ContainerProcessManager cpm = new ContainerProcessManager(
         new ClusterManagerConfig(new MapConfig(getConfig(), 
getConfigWithHostAffinityAndRetries(true, 1, true))), state,
-        new MetricsRegistryMap(), clusterResourceManager, 
Optional.of(allocatorWithHostAffinity), containerManager);
+        new MetricsRegistryMap(), clusterResourceManager, 
Optional.of(allocatorWithHostAffinity), containerManager, localityManager);
 
     doAnswer(new Answer<Void>() {
       public Void answer(InvocationOnMock invocation) {
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index a5dbe77..5e550cf 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -38,7 +38,8 @@ import org.apache.samza.coordinator.JobModelManagerTestUtil;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
 import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
@@ -108,20 +109,7 @@ public class TestContainerProcessManager {
 
   private HttpServer server = null;
 
-
-  private JobModelManager getJobModelManagerWithHostAffinity(Map<String, 
String> containerIdToHost) {
-    Map<String, Map<String, String>> localityMap = new HashMap<>();
-    containerIdToHost.forEach((containerId, host) -> {
-      localityMap.put(containerId, 
ImmutableMap.of(SetContainerHostMapping.HOST_KEY, 
containerIdToHost.get(containerId)));
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap);
-
-    return 
JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(),
-        containerIdToHost.size(), mockLocalityManager, this.server);
-  }
-
-  private JobModelManager getJobModelManagerWithoutHostAffinity(int 
containerCount) {
+  private JobModelManager getJobModelManager(int containerCount) {
     return JobModelManagerTestUtil.getJobModelManager(getConfig(), 
containerCount, this.server);
   }
 
@@ -149,11 +137,14 @@ public class TestContainerProcessManager {
     conf.put("cluster-manager.container.memory.mb", "500");
     conf.put("cluster-manager.container.cpu.cores", "5");
 
-    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of("0", new 
ProcessorLocality("0", "host1"))));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false);
+        buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager, true, false, mockLocalityManager);
     ContainerProcessManager cpm =
         buildContainerProcessManager(new ClusterManagerConfig(new 
MapConfig(conf)), state, clusterResourceManager, Optional.empty());
 
@@ -169,7 +160,7 @@ public class TestContainerProcessManager {
     conf.put("cluster-manager.container.memory.mb", "500");
     conf.put("cluster-manager.container.cpu.cores", "5");
 
-    state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host1")));
+    state = new SamzaApplicationState(getJobModelManager(1));
     callback = new MockClusterResourceManagerCallback();
     clusterResourceManager = new MockClusterResourceManager(callback, state);
     cpm = new ContainerProcessManager(
@@ -178,7 +169,8 @@ public class TestContainerProcessManager {
         new MetricsRegistryMap(),
         clusterResourceManager,
         Optional.empty(),
-        containerManager
+        containerManager,
+        mockLocalityManager
     );
 
     allocator =
@@ -192,12 +184,12 @@ public class TestContainerProcessManager {
   @Test
   public void testOnInit() throws Exception {
     Config conf = getConfig();
-    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     ClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     ContainerProcessManager cpm =
@@ -237,7 +229,7 @@ public class TestContainerProcessManager {
   @Test
   public void testOnShutdown() throws Exception {
     Config conf = getConfig();
-    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
@@ -260,12 +252,12 @@ public class TestContainerProcessManager {
   @Test
   public void testCpmShouldStopWhenContainersFinish() throws Exception {
     Config conf = getConfig();
-    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
@@ -308,12 +300,12 @@ public class TestContainerProcessManager {
   @Test
   public void testNewContainerRequestedOnFailureWithUnknownCode() throws 
Exception {
     Config conf = getConfig();
-    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
@@ -408,11 +400,11 @@ public class TestContainerProcessManager {
     int maxRetries = 3;
     String processorId = "0";
     ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity, 
maxRetries, failAfterRetries));
-    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
@@ -494,12 +486,22 @@ public class TestContainerProcessManager {
     int maxRetries = 3;
     String processorId = "0";
     ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(getConfigWithHostAffinityAndRetries(withHostAffinity, 
maxRetries, failAfterRetries));
-    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+
+    if (withHostAffinity) {
+      when(mockLocalityManager.readLocality())
+          .thenReturn(new LocalityModel(ImmutableMap.of("0", new 
ProcessorLocality("0", "host1"))));
+    } else {
+      when(mockLocalityManager.readLocality())
+          .thenReturn(new LocalityModel(new HashMap<>()));
+    }
+
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
-            clusterManagerConfig.getHostAffinityEnabled(), false);
+        buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+            clusterManagerConfig.getHostAffinityEnabled(), false, 
mockLocalityManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -508,7 +510,7 @@ public class TestContainerProcessManager {
         containerManager);
 
     ContainerProcessManager cpm =
-        buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.of(allocator));
+        buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, Optional.of(allocator), mockLocalityManager);
 
     // start triggers a request
     cpm.start();
@@ -603,12 +605,12 @@ public class TestContainerProcessManager {
   public void testInvalidNotificationsAreIgnored() throws Exception {
     Config conf = getConfig();
 
-    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(conf));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
@@ -644,13 +646,16 @@ public class TestContainerProcessManager {
 
   @Test
   public void testRerequestOnAnyHostIfContainerStartFails() throws Exception {
-    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("1", 
"host1")));
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     Map<String, String> configMap = new HashMap<>();
     configMap.putAll(getConfig());
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new 
ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        
Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of("0", new 
ProcessorLocality("1", "host1"))));
+    ContainerManager containerManager = 
buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+        
Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false, 
mockLocalityManager);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
         clusterResourceManager,
@@ -660,7 +665,7 @@ public class TestContainerProcessManager {
 
     ContainerProcessManager manager =
         new ContainerProcessManager(new ClusterManagerConfig(config), state, 
new MetricsRegistryMap(), clusterResourceManager,
-            Optional.of(allocator), containerManager);
+            Optional.of(allocator), containerManager, mockLocalityManager);
 
     manager.start();
     SamzaResource resource = new SamzaResource(1, 1024, "host1", "resource-1");
@@ -680,12 +685,14 @@ public class TestContainerProcessManager {
     config.put("cluster-manager.container.request.timeout.ms", "10000");
     Config cfg = new MapConfig(config);
     // 1. Request two containers on hosts - host1 and host2
-    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", 
"host1",
-        "1", "host2")));
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(2));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
-    ContainerManager containerManager = new 
ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
-        
Boolean.valueOf(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), false);
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of("0", new 
ProcessorLocality("0", "host1"), "1", new ProcessorLocality("1", "host2"))));
+    ContainerManager containerManager = 
buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+        
Boolean.parseBoolean(config.get(ClusterManagerConfig.HOST_AFFINITY_ENABLED)), 
false, mockLocalityManager);
 
     MockContainerAllocatorWithHostAffinity allocator = new 
MockContainerAllocatorWithHostAffinity(
         clusterResourceManager,
@@ -694,7 +701,7 @@ public class TestContainerProcessManager {
         containerManager);
 
     ContainerProcessManager cpm =
-        spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, 
clusterResourceManager, Optional.of(allocator)));
+        spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, 
clusterResourceManager, Optional.of(allocator), mockLocalityManager));
 
     cpm.start();
     assertFalse(cpm.shouldShutdown());
@@ -744,12 +751,12 @@ public class TestContainerProcessManager {
 
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfig());
-    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(new MapConfig(conf)));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
@@ -819,12 +826,12 @@ public class TestContainerProcessManager {
 
     Map<String, String> config = new HashMap<>();
     config.putAll(getConfig());
-    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
+    SamzaApplicationState state = new 
SamzaApplicationState(getJobModelManager(1));
     MockClusterResourceManagerCallback callback = new 
MockClusterResourceManagerCallback();
     MockClusterResourceManager clusterResourceManager = new 
MockClusterResourceManager(callback, state);
     ClusterManagerConfig clusterManagerConfig = spy(new 
ClusterManagerConfig(new MapConfig(config)));
     ContainerManager containerManager =
-        new ContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
+        buildContainerManager(containerPlacementMetadataStore, state, 
clusterResourceManager,
             clusterManagerConfig.getHostAffinityEnabled(), false);
 
     MockContainerAllocatorWithoutHostAffinity allocator = new 
MockContainerAllocatorWithoutHostAffinity(
@@ -915,10 +922,32 @@ public class TestContainerProcessManager {
     server.stop();
   }
 
+  private ContainerManager 
buildContainerManager(ContainerPlacementMetadataStore 
containerPlacementMetadataStore,
+      SamzaApplicationState samzaApplicationState, ClusterResourceManager 
clusterResourceManager,
+      boolean hostAffinityEnabled, boolean standByEnabled) {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new 
HashMap<>()));
+    return buildContainerManager(containerPlacementMetadataStore, 
samzaApplicationState, clusterResourceManager,
+        hostAffinityEnabled, standByEnabled, mockLocalityManager);
+  }
+
+  private ContainerManager 
buildContainerManager(ContainerPlacementMetadataStore 
containerPlacementMetadataStore,
+      SamzaApplicationState samzaApplicationState, ClusterResourceManager 
clusterResourceManager,
+      boolean hostAffinityEnabled, boolean standByEnabled, LocalityManager 
localityManager) {
+    return new ContainerManager(containerPlacementMetadataStore, 
samzaApplicationState, clusterResourceManager,
+        hostAffinityEnabled, standByEnabled, localityManager);
+  }
   private ContainerProcessManager 
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, 
SamzaApplicationState state,
       ClusterResourceManager clusterResourceManager, 
Optional<ContainerAllocator> allocator) {
+    LocalityManager mockLocalityManager = mock(LocalityManager.class);
+    when(mockLocalityManager.readLocality()).thenReturn(new LocalityModel(new 
HashMap<>()));
+    return buildContainerProcessManager(clusterManagerConfig, state, 
clusterResourceManager, allocator, mockLocalityManager);
+  }
+
+  private ContainerProcessManager 
buildContainerProcessManager(ClusterManagerConfig clusterManagerConfig, 
SamzaApplicationState state,
+      ClusterResourceManager clusterResourceManager, 
Optional<ContainerAllocator> allocator, LocalityManager localityManager) {
     return new ContainerProcessManager(clusterManagerConfig, state, new 
MetricsRegistryMap(), clusterResourceManager,
-        allocator, new ContainerManager(containerPlacementMetadataStore, 
state, clusterResourceManager,
-        clusterManagerConfig.getHostAffinityEnabled(), false));
+        allocator, buildContainerManager(containerPlacementMetadataStore, 
state, clusterResourceManager,
+        clusterManagerConfig.getHostAffinityEnabled(), false, 
localityManager), localityManager);
   }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
index c5f3ec1..cb3a7a7 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestStandbyAllocator.java
@@ -22,10 +22,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import org.apache.samza.Partition;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
@@ -40,7 +38,7 @@ public class TestStandbyAllocator {
 
   @Test
   public void testWithNoStandby() {
-    JobModel jobModel = getJobModelWithStandby(1, 1, 1, Optional.empty());
+    JobModel jobModel = getJobModelWithStandby(1, 1, 1);
     List<String> containerConstraints = 
StandbyTaskUtil.getStandbyContainerConstraints("0", jobModel);
     Assert.assertEquals("Constrained container count should be 0", 0, 
containerConstraints.size());
   }
@@ -59,7 +57,7 @@ public class TestStandbyAllocator {
 
 
   public void testWithStandby(int nContainers, int nTasks, int 
replicationFactor) {
-    JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, 
replicationFactor, Optional.empty());
+    JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, 
replicationFactor);
 
     for (String containerID : jobModel.getContainers().keySet()) {
       List<String> containerConstraints = 
StandbyTaskUtil.getStandbyContainerConstraints(containerID, jobModel);
@@ -81,7 +79,7 @@ public class TestStandbyAllocator {
   }
 
   // Helper method to create a jobmodel with given number of containers, tasks 
and replication factor
-  public static JobModel getJobModelWithStandby(int nContainers, int nTasks, 
int replicationFactor, Optional<LocalityManager> localityManager) {
+  public static JobModel getJobModelWithStandby(int nContainers, int nTasks, 
int replicationFactor) {
     Map<String, ContainerModel> containerModels = new HashMap<>();
     int taskID = 0;
 
@@ -104,7 +102,7 @@ public class TestStandbyAllocator {
     }
 
     containerModels.putAll(standbyContainerModels);
-    return new JobModel(new MapConfig(), containerModels, 
localityManager.orElse(null));
+    return new JobModel(new MapConfig(), containerModels);
   }
 
   // Helper method that creates a taskmodel with one input ssp
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java 
b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
index d18ad67..f2819c7 100644
--- 
a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
@@ -34,7 +34,10 @@ import 
org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 
 public class TestLocalityManager {
 
@@ -59,7 +62,7 @@ public class TestLocalityManager {
     LocalityManager localityManager = new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetContainerHostMapping.TYPE));
 
     localityManager.writeContainerToHostMapping("0", "localhost");
-    Map<String, Map<String, String>> localMap = 
localityManager.readContainerLocality();
+    Map<String, Map<String, String>> localMap = 
readContainerLocality(localityManager);
     Map<String, Map<String, String>> expectedMap =
       new HashMap<String, Map<String, String>>() {
         {
@@ -87,9 +90,9 @@ public class TestLocalityManager {
 
     localityManager.writeContainerToHostMapping("1", "localhost");
 
-    assertEquals(localityManager.readContainerLocality().size(), 1);
+    assertEquals(readContainerLocality(localityManager).size(), 1);
 
-    assertEquals(ImmutableMap.of("1", ImmutableMap.of("host", "localhost")), 
localityManager.readContainerLocality());
+    assertEquals(ImmutableMap.of("1", ImmutableMap.of("host", "localhost")), 
readContainerLocality(localityManager));
 
     localityManager.close();
 
@@ -98,4 +101,13 @@ public class TestLocalityManager {
     assertTrue(producer.isStopped());
     assertTrue(consumer.isStopped());
   }
+
+  static Map<String, Map<String, String>> 
readContainerLocality(LocalityManager localityManager) {
+    Map<String, Map<String, String>> containerLocalityMap = new HashMap<>();
+    
localityManager.readLocality().getProcessorLocalities().forEach((containerId, 
containerLocality) -> {
+      containerLocalityMap.put(containerId, ImmutableMap.of("host", 
containerLocality.host()));
+    });
+
+    return containerLocalityMap;
+  }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
index 6e29d81..3affa71 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
@@ -22,13 +22,9 @@ package org.apache.samza.coordinator;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
-import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.grouper.task.GrouperMetadataImpl;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.runtime.LocationId;
-import org.apache.samza.system.StreamMetadataCache;
 
 /**
  * Utils to create instances of {@link JobModelManager} in unit tests
@@ -36,23 +32,12 @@ import org.apache.samza.system.StreamMetadataCache;
 public class JobModelManagerTestUtil {
 
   public static JobModelManager getJobModelManager(Config config, int 
containerCount, HttpServer server) {
-    return getJobModelManagerWithLocalityManager(config, containerCount, null, 
server);
-  }
-
-  public static JobModelManager getJobModelManagerWithLocalityManager(Config 
config, int containerCount, LocalityManager localityManager, HttpServer server) 
{
     Map<String, ContainerModel> containers = new java.util.HashMap<>();
     for (int i = 0; i < containerCount; i++) {
       ContainerModel container = new ContainerModel(String.valueOf(i), new 
HashMap<>());
       containers.put(String.valueOf(i), container);
     }
-    JobModel jobModel = new JobModel(config, containers, localityManager);
-    return new JobModelManager(jobModel, server, null);
-  }
-
-  public static JobModelManager getJobModelManagerUsingReadModel(Config 
config, StreamMetadataCache streamMetadataCache,
-      HttpServer server, LocalityManager localityManager, Map<String, 
LocationId> processorLocality) {
-    JobModel jobModel = JobModelManager.readJobModel(config, new HashMap<>(), 
streamMetadataCache,
-        new GrouperMetadataImpl(processorLocality, new HashMap<>(), new 
HashMap<>(), new HashMap<>()));
-    return new JobModelManager(new JobModel(jobModel.getConfig(), 
jobModel.getContainers(), localityManager), server, localityManager);
+    JobModel jobModel = new JobModel(config, containers);
+    return new JobModelManager(jobModel, server);
   }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
index 9908da5..83de0cf 100644
--- 
a/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/coordinator/TestJobModelManager.java
@@ -39,9 +39,10 @@ import 
org.apache.samza.container.grouper.task.GrouperMetadataImpl;
 import org.apache.samza.container.grouper.task.TaskAssignmentManager;
 import org.apache.samza.container.grouper.task.TaskPartitionAssignmentManager;
 import org.apache.samza.coordinator.server.HttpServer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.runtime.LocationId;
@@ -57,10 +58,12 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentMatcher;
@@ -104,72 +107,6 @@ public class TestJobModelManager {
   }
 
   @Test
-  public void testLocalityMapWithHostAffinity() {
-    Config config = new MapConfig(new HashMap<String, String>() {
-      {
-        put("cluster-manager.container.count", "1");
-        put("cluster-manager.container.memory.mb", "512");
-        put("cluster-manager.container.retry.count", "1");
-        put("cluster-manager.container.retry.window.ms", "1999999999");
-        put("cluster-manager.allocator.sleep.ms", "10");
-        put("yarn.package.path", "/foo");
-        put("task.inputs", "test-system.test-stream");
-        put("systems.test-system.samza.factory", 
"org.apache.samza.system.MockSystemFactory");
-        put("systems.test-system.samza.key.serde", 
"org.apache.samza.serializers.JsonSerde");
-        put("systems.test-system.samza.msg.serde", 
"org.apache.samza.serializers.JsonSerde");
-        put("job.host-affinity.enabled", "true");
-      }
-    });
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-
-    localityMappings.put("0", new HashMap<String, String>() { {
-        put(SetContainerHostMapping.HOST_KEY, "abc-affinity");
-      } });
-    
when(mockLocalityManager.readContainerLocality()).thenReturn(this.localityMappings);
-
-    Map<String, LocationId> containerLocality = ImmutableMap.of("0", new 
LocationId("abc-affinity"));
-    this.jobModelManager =
-        JobModelManagerTestUtil.getJobModelManagerUsingReadModel(config, 
mockStreamMetadataCache, server,
-            mockLocalityManager, containerLocality);
-
-    assertEquals(jobModelManager.jobModel().getAllContainerLocality(), 
ImmutableMap.of("0", "abc-affinity"));
-  }
-
-  @Test
-  public void testLocalityMapWithoutHostAffinity() {
-    Config config = new MapConfig(new HashMap<String, String>() {
-      {
-        put("cluster-manager.container.count", "1");
-        put("cluster-manager.container.memory.mb", "512");
-        put("cluster-manager.container.retry.count", "1");
-        put("cluster-manager.container.retry.window.ms", "1999999999");
-        put("cluster-manager.allocator.sleep.ms", "10");
-        put("yarn.package.path", "/foo");
-        put("task.inputs", "test-system.test-stream");
-        put("systems.test-system.samza.factory", 
"org.apache.samza.system.MockSystemFactory");
-        put("systems.test-system.samza.key.serde", 
"org.apache.samza.serializers.JsonSerde");
-        put("systems.test-system.samza.msg.serde", 
"org.apache.samza.serializers.JsonSerde");
-        put("job.host-affinity.enabled", "false");
-      }
-    });
-
-    LocalityManager mockLocalityManager = mock(LocalityManager.class);
-
-    localityMappings.put("0", new HashMap<String, String>() { {
-        put(SetContainerHostMapping.HOST_KEY, "abc-affinity");
-      } });
-    when(mockLocalityManager.readContainerLocality()).thenReturn(new 
HashMap<>());
-
-    Map<String, LocationId> containerLocality = ImmutableMap.of("0", new 
LocationId("abc-affinity"));
-
-    this.jobModelManager =
-        JobModelManagerTestUtil.getJobModelManagerUsingReadModel(config, 
mockStreamMetadataCache, server,
-            mockLocalityManager, containerLocality);
-
-    assertEquals(jobModelManager.jobModel().getAllContainerLocality(), 
Collections.singletonMap("0", null));
-  }
-
-  @Test
   public void testGetGrouperMetadata() {
     // Mocking setup.
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
@@ -179,17 +116,13 @@ public class TestJobModelManager {
     SystemStreamPartition testSystemStreamPartition1 = new 
SystemStreamPartition(new SystemStream("test-system-0", "test-stream-0"), new 
Partition(1));
     SystemStreamPartition testSystemStreamPartition2 = new 
SystemStreamPartition(new SystemStream("test-system-1", "test-stream-1"), new 
Partition(2));
 
-    Map<String, Map<String, String>> localityMappings = new HashMap<>();
-    localityMappings.put("0", 
ImmutableMap.of(SetContainerHostMapping.HOST_KEY, "abc-affinity"));
+    when(mockLocalityManager.readLocality()).thenReturn(new 
LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", 
"abc-affinity"))));
 
     Map<SystemStreamPartition, List<String>> taskToSSPAssignments = 
ImmutableMap.of(testSystemStreamPartition1, ImmutableList.of("task-0", 
"task-1"),
                                                                                
     testSystemStreamPartition2, ImmutableList.of("task-2", "task-3"));
 
     Map<String, String> taskAssignment = ImmutableMap.of("task-0", "0");
 
-    // Mock the container locality assignment.
-    
when(mockLocalityManager.readContainerLocality()).thenReturn(localityMappings);
-
     // Mock the task to partition assignment.
     
when(mockTaskPartitionAssignmentManager.readTaskPartitionAssignments()).thenReturn(taskToSSPAssignments);
 
@@ -199,8 +132,8 @@ public class TestJobModelManager {
 
     GrouperMetadataImpl grouperMetadata = 
JobModelManager.getGrouperMetadata(new MapConfig(), mockLocalityManager, 
mockTaskAssignmentManager, mockTaskPartitionAssignmentManager);
 
-    Mockito.verify(mockLocalityManager).readContainerLocality();
-    Mockito.verify(mockTaskAssignmentManager).readTaskAssignment();
+    verify(mockLocalityManager).readLocality();
+    verify(mockTaskAssignmentManager).readTaskAssignment();
 
     Assert.assertEquals(ImmutableMap.of("0", new LocationId("abc-affinity")), 
grouperMetadata.getProcessorLocality());
     Assert.assertEquals(ImmutableMap.of(new TaskName("task-0"), new 
LocationId("abc-affinity")), grouperMetadata.getTaskLocality());
@@ -216,15 +149,14 @@ public class TestJobModelManager {
   public void testGetProcessorLocalityAllEntriesExisting() {
     Config config = new 
MapConfig(ImmutableMap.of(JobConfig.JOB_CONTAINER_COUNT, "2"));
 
-    Map<String, Map<String, String>> localityMappings = new HashMap<>();
-    localityMappings.put("0", 
ImmutableMap.of(SetContainerHostMapping.HOST_KEY, "0-affinity"));
-    localityMappings.put("1", 
ImmutableMap.of(SetContainerHostMapping.HOST_KEY, "1-affinity"));
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    
when(mockLocalityManager.readContainerLocality()).thenReturn(localityMappings);
+    when(mockLocalityManager.readLocality()).thenReturn(new 
LocalityModel(ImmutableMap.of(
+        "0", new ProcessorLocality("0", "0-affinity"),
+        "1", new ProcessorLocality("1", "1-affinity"))));
 
     Map<String, LocationId> processorLocality = 
JobModelManager.getProcessorLocality(config, mockLocalityManager);
 
-    Mockito.verify(mockLocalityManager).readContainerLocality();
+    verify(mockLocalityManager).readLocality();
     ImmutableMap<String, LocationId> expected =
         ImmutableMap.of("0", new LocationId("0-affinity"), "1", new 
LocationId("1-affinity"));
     Assert.assertEquals(expected, processorLocality);
@@ -234,15 +166,13 @@ public class TestJobModelManager {
   public void testGetProcessorLocalityNewContainer() {
     Config config = new 
MapConfig(ImmutableMap.of(JobConfig.JOB_CONTAINER_COUNT, "2"));
 
-    Map<String, Map<String, String>> localityMappings = new HashMap<>();
-    // 2 containers, but only return 1 existing mapping
-    localityMappings.put("0", 
ImmutableMap.of(SetContainerHostMapping.HOST_KEY, "abc-affinity"));
     LocalityManager mockLocalityManager = mock(LocalityManager.class);
-    
when(mockLocalityManager.readContainerLocality()).thenReturn(localityMappings);
+    // 2 containers, but only return 1 existing mapping
+    when(mockLocalityManager.readLocality()).thenReturn(new 
LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", 
"abc-affinity"))));
 
     Map<String, LocationId> processorLocality = 
JobModelManager.getProcessorLocality(config, mockLocalityManager);
 
-    Mockito.verify(mockLocalityManager).readContainerLocality();
+    verify(mockLocalityManager).readLocality();
     ImmutableMap<String, LocationId> expected = ImmutableMap.of(
         // found entry in existing locality
         "0", new LocationId("abc-affinity"),
@@ -291,16 +221,16 @@ public class TestJobModelManager {
     systemStreamPartitions.add(new SystemStreamPartition(new 
SystemStream("test-system-3", "test-stream-3"), new Partition(2)));
 
     // Verifications
-    Mockito.verify(mockJobModel, atLeast(1)).getContainers();
-    
Mockito.verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any());
-    
Mockito.verify(mockTaskAssignmentManager).writeTaskContainerMappings(ImmutableMap.of("test-container-id",
+    verify(mockJobModel, atLeast(1)).getContainers();
+    
verify(mockTaskAssignmentManager).deleteTaskContainerMappings(Mockito.any());
+    
verify(mockTaskAssignmentManager).writeTaskContainerMappings(ImmutableMap.of("test-container-id",
         ImmutableMap.of("task-1", TaskMode.Active, "task-2", TaskMode.Active, 
"task-3", TaskMode.Active, "task-4", TaskMode.Active)));
 
     // Verify that the old, stale partition mappings had been purged in the 
coordinator stream.
-    
Mockito.verify(mockTaskPartitionAssignmentManager).delete(systemStreamPartitions);
+    verify(mockTaskPartitionAssignmentManager).delete(systemStreamPartitions);
 
     // Verify that the new task to partition assignment is stored in the 
coordinator stream.
-    
Mockito.verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignments(ImmutableMap.of(
+    
verify(mockTaskPartitionAssignmentManager).writeTaskPartitionAssignments(ImmutableMap.of(
         testSystemStreamPartition1, ImmutableList.of("task-1"),
         testSystemStreamPartition2, ImmutableList.of("task-2"),
         testSystemStreamPartition3, ImmutableList.of("task-3"),
diff --git 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index ea57479..104b0ba 100644
--- 
a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ 
b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -442,7 +442,7 @@ public class TestOperatorImplGraph {
     cms.put(cm0.getId(), cm0);
     cms.put(cm1.getId(), cm1);
 
-    JobModel jobModel = new JobModel(config, cms, null);
+    JobModel jobModel = new JobModel(config, cms);
     Multimap<SystemStream, String> streamToTasks = 
OperatorImplGraph.getStreamToConsumerTasks(jobModel);
     assertEquals(streamToTasks.get(ssp0.getSystemStream()).size(), 2);
     assertEquals(streamToTasks.get(ssp2.getSystemStream()).size(), 1);
diff --git 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
index b86da1f..c13da8d 100644
--- 
a/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
+++ 
b/samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
@@ -26,6 +26,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -38,9 +39,9 @@ import 
org.apache.samza.container.grouper.task.TaskAssignmentManager;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
+import org.apache.samza.job.model.ProcessorLocality;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.rest.model.Task;
 import org.apache.samza.rest.proxy.installation.InstallationFinder;
@@ -137,7 +138,7 @@ public class SamzaTaskProxy implements TaskProxy {
   protected List<Task> 
readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
     CoordinatorStreamStore coordinatorStreamStore = new 
CoordinatorStreamStore(consumer.getConfig(), new MetricsRegistryMap());
     LocalityManager localityManager = new 
LocalityManager(coordinatorStreamStore);
-    Map<String, Map<String, String>> containerIdToHostMapping = 
localityManager.readContainerLocality();
+    Map<String, ProcessorLocality> containerLocalities = 
localityManager.readLocality().getProcessorLocalities();
     TaskAssignmentManager taskAssignmentManager = new 
TaskAssignmentManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetTaskContainerMapping.TYPE), new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetTaskModeMapping.TYPE));
     Map<String, String> taskNameToContainerIdMapping = 
taskAssignmentManager.readTaskAssignment();
     StorageConfig storageConfig = new StorageConfig(consumer.getConfig());
@@ -145,7 +146,9 @@ public class SamzaTaskProxy implements TaskProxy {
     return taskNameToContainerIdMapping.entrySet()
         .stream()
         .map(entry -> {
-          String hostName = 
containerIdToHostMapping.get(entry.getValue()).get(SetContainerHostMapping.HOST_KEY);
+          String hostName = 
Optional.ofNullable(containerLocalities.get(entry.getValue()))
+              .map(ProcessorLocality::host)
+              .orElse(null);
           return new Task(hostName, entry.getKey(), entry.getValue(), new 
ArrayList<>(), storeNames);
         }).collect(Collectors.toList());
   }
diff --git 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 46f345d..3f7056e 100644
--- 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -142,7 +142,7 @@ object TestKeyValuePerformance extends Logging {
           new TaskInstanceCollector(producerMultiplexer),
           new MetricsRegistryMap,
           null,
-          JobContextImpl.fromConfigWithDefaults(storageConfig),
+          JobContextImpl.fromConfigWithDefaults(storageConfig, null),
           new ContainerContextImpl(new ContainerModel("0", tasks.asJava), new 
MetricsRegistryMap), StoreMode.ReadWrite
         )
 
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
index 2b31977..74fef67 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
@@ -19,10 +19,10 @@
 
 package org.apache.samza.validation;
 
-import java.util.Map;
 import joptsimple.OptionParser;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -35,17 +35,18 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
-import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
 import org.apache.samza.job.yarn.ClientHelper;
 import org.apache.samza.metrics.JmxMetricsAccessor;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsValidator;
-import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.util.CommandLine;
-import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.ReflectionUtil;
 import org.apache.samza.util.hadoop.HttpFileSystem;
 import org.slf4j.Logger;
@@ -158,23 +159,24 @@ public class YarnJobValidationTool {
     CoordinatorStreamStore coordinatorStreamStore = new 
CoordinatorStreamStore(config, metricsRegistry);
     coordinatorStreamStore.init();
     try {
-      Config configFromCoordinatorStream = 
CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore);
-      ChangelogStreamManager changelogStreamManager = new 
ChangelogStreamManager(coordinatorStreamStore);
-      JobModelManager jobModelManager =
-          JobModelManager.apply(configFromCoordinatorStream, 
changelogStreamManager.readPartitionMapping(),
-              coordinatorStreamStore, metricsRegistry);
+      LocalityManager localityManager =
+          new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE));
       validator.init(config);
-      Map<String, String> jmxUrls = 
jobModelManager.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY);
-      for (Map.Entry<String, String> entry : jmxUrls.entrySet()) {
-        String containerId = entry.getKey();
-        String jmxUrl = entry.getValue();
-        log.info("validate container " + containerId + " metrics with JMX: " + 
jmxUrl);
-        JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
-        jmxMetrics.connect();
-        validator.validate(jmxMetrics);
-        jmxMetrics.close();
-        log.info("validate container " + containerId + " successfully");
+      LocalityModel localityModel = localityManager.readLocality();
+
+      for (ProcessorLocality processorLocality : 
localityModel.getProcessorLocalities().values()) {
+        String containerId = processorLocality.id();
+        String jmxUrl = processorLocality.jmxTunnelingUrl();
+        if (StringUtils.isNotBlank(jmxUrl)) {
+          log.info("validate container " + containerId + " metrics with JMX: " 
+ jmxUrl);
+          JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl);
+          jmxMetrics.connect();
+          validator.validate(jmxMetrics);
+          jmxMetrics.close();
+          log.info("validate container " + containerId + " successfully");
+        }
       }
+
       validator.complete();
     } finally {
       coordinatorStreamStore.close();
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml 
b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index 7fe6305..d01b20f 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -137,8 +137,7 @@
               %td
                 Up time: #{container.upTimeStr()}
               %td
-                Ordinary: 
#{samzaAppState.jobModelManager.jobModel.getContainerToHostValue(processorId, 
org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_URL_KEY)}
-                Tunneling: 
#{samzaAppState.jobModelManager.jobModel.getContainerToHostValue(processorId, 
org.apache.samza.coordinator.stream.messages.SetContainerHostMapping.JMX_TUNNELING_URL_KEY)}
+                %a(target="_blank" 
href="#{state.coordinatorUrl.toString}locality?processorId=#{processorId.toString}")
 JMX
 
       %h2 Failed Containers
       %table.table.table-striped.table-bordered.tablesorter#containers-table
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/webapp/TestLocalityServlet.java 
b/samza-yarn/src/test/java/org/apache/samza/webapp/TestLocalityServlet.java
new file mode 100644
index 0000000..56d7ae1
--- /dev/null
+++ b/samza-yarn/src/test/java/org/apache/samza/webapp/TestLocalityServlet.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.webapp;
+
+import com.google.common.collect.ImmutableMap;
+import java.net.URL;
+import java.util.Collections;
+import org.apache.samza.container.LocalityManager;
+import org.apache.samza.coordinator.server.HttpServer;
+import org.apache.samza.coordinator.server.LocalityServlet;
+import org.apache.samza.job.model.ProcessorLocality;
+import org.apache.samza.job.model.LocalityModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.util.ExponentialSleepStrategy;
+import org.apache.samza.util.HttpUtil;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.eclipse.jetty.servlet.DefaultServlet;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * A test class for {@link LocalityServlet}. It validates the servlet directly 
and Serde Mix-In of {@link ProcessorLocality}
+ * indirectly.
+ */
+public class TestLocalityServlet {
+  private static final String PROCESSOR_ID1 = "1";
+  private static final String PROCESSOR_ID2 = "2";
+  private static final String HOST1 = "host1";
+  private static final String HOST2 = "host2";
+  private static final String JMX_URL = "jmx";
+  private static final String TUNNELING_URL = "tunneling";
+
+  private static final ProcessorLocality PROCESSOR_1_LOCALITY =
+      new ProcessorLocality(PROCESSOR_ID1, HOST1, JMX_URL, TUNNELING_URL);
+  private static final ProcessorLocality PROCESSOR_2_LOCALITY =
+      new ProcessorLocality("2", HOST2, JMX_URL, TUNNELING_URL);
+
+  private final ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
+  private HttpServer webApp;
+  private LocalityManager localityManager;
+
+
+
+  @Before
+  public void setup()
+      throws Exception {
+    localityManager = mock(LocalityManager.class);
+    when(localityManager.readLocality())
+        .thenReturn(new LocalityModel(ImmutableMap.of(PROCESSOR_ID1, 
PROCESSOR_1_LOCALITY, PROCESSOR_ID2, PROCESSOR_2_LOCALITY)));
+    webApp = new HttpServer("/", 0, "", new ServletHolder(new 
DefaultServlet()));
+    webApp.addServlet("/locality", new LocalityServlet(localityManager));
+    webApp.start();
+  }
+
+  @After
+  public void cleanup()
+      throws Exception {
+    webApp.stop();
+  }
+
+  @Test
+  public void testReadContainerLocality() throws Exception {
+    URL url = new URL(webApp.getUrl().toString() + "locality");
+
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
+    LocalityModel locality = mapper.readValue(response, LocalityModel.class);
+
+    assertEquals("Expected locality for two containers", 2, 
locality.getProcessorLocalities().size());
+    assertEquals("Mismatch in locality for processor " + PROCESSOR_ID1,
+        locality.getProcessorLocality(PROCESSOR_ID1), PROCESSOR_1_LOCALITY);
+    assertEquals("Mismatch in locality for processor " + PROCESSOR_ID2,
+        locality.getProcessorLocality(PROCESSOR_ID2), PROCESSOR_2_LOCALITY);
+  }
+
+  @Test
+  public void testReadContainerLocalityWithNoLocality() throws Exception {
+    final LocalityModel expectedLocality = new 
LocalityModel(Collections.emptyMap());
+    URL url = new URL(webApp.getUrl().toString() + "locality");
+    when(localityManager.readLocality()).thenReturn(new 
LocalityModel(ImmutableMap.of()));
+
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
+    LocalityModel locality = mapper.readValue(response, LocalityModel.class);
+
+    assertEquals("Expected empty response but got " + locality, locality, 
expectedLocality);
+  }
+
+  @Test
+  public void testReadProcessorLocality() throws Exception {
+    URL url = new URL(webApp.getUrl().toString() + "locality?processorId=" + 
PROCESSOR_ID1);
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
+
+    assertEquals("Mismatch in the locality for processor " + PROCESSOR_ID1,
+        mapper.readValue(response, ProcessorLocality.class), 
PROCESSOR_1_LOCALITY);
+  }
+
+  @Test
+  public void testReadProcessorLocalityWithNoLocality() throws Exception {
+    final ProcessorLocality expectedProcessorLocality = new 
ProcessorLocality(PROCESSOR_ID2, "");
+    URL url = new URL(webApp.getUrl().toString() + "locality?processorId=" + 
PROCESSOR_ID2);
+    when(localityManager.readLocality()).thenReturn(new 
LocalityModel(ImmutableMap.of()));
+
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
+    ProcessorLocality processorLocality = mapper.readValue(response, 
ProcessorLocality.class);
+
+    assertEquals("Expected empty response for processor locality " + 
PROCESSOR_ID2 + " but got " + processorLocality,
+        processorLocality, expectedProcessorLocality);
+  }
+}

Reply via email to