This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new c212cac  SAMZA-2604: Datamodel change to capture physical container id 
for AM HA (#1445)
c212cac is described below

commit c212cac420864d09f2663da2e51c23e4718d6812
Author: lakshmi-manasa-g <mgadup...@linkedin.com>
AuthorDate: Mon Nov 30 23:37:21 2020 -0800

    SAMZA-2604: Datamodel change to capture physical container id for AM HA 
(#1445)
    
    Feature:
    Main feature is Cluster based Job coordinator (aka AM) high availability. 
The feature ensures that the new AM can establish connection with already 
running containers to avoid restarting all running containers when AM dies. 
This PR enables capturing of the physical execution environment container id 
(ex: yarn container id "container_123_123") mapping to Samza logical processor 
id (ex: "0"). In future PRs, this mapping will be used by the new AM.
    
    Changes:
    Introduce new Coordinator Stream Message and manager to read/write this 
message
    Container upon launch will write to c-stream, its logical and physical id
    Job Coordinator(AM) upon launch will read the mapping of all containers 
from c-stream.
    
    Tests:
    added unit test for new manager
    working on tests for other classes as they have no coverage for relevant 
code.
    
    API changes:
    New c-stream message
    
    Usage instructions: None
    
    Upgrade instructions: Backwards compatible. N/A
---
 .../clustermanager/ClusterBasedJobCoordinator.java |   8 ++
 .../clustermanager/ContainerProcessManager.java    |   3 -
 .../clustermanager/SamzaApplicationState.java      |  11 +++
 .../container/ExecutionContainerIdManager.java     |  79 ++++++++++++++++
 .../stream/CoordinatorStreamValueSerde.java        |  12 ++-
 .../SetExecutionEnvContainerIdMapping.java         |  65 +++++++++++++
 .../apache/samza/runtime/ContainerLaunchUtil.java  |  14 ++-
 .../container/TestExecutionContainerIdManager.java | 105 +++++++++++++++++++++
 8 files changed, 290 insertions(+), 7 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
index 63ee3c7..b98c727 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
@@ -34,6 +34,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.ExecutionContainerIdManager;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.InputStreamsDiscoveredException;
@@ -45,6 +46,7 @@ import org.apache.samza.coordinator.StreamRegexMonitor;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import 
org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.JobModelUtil;
@@ -206,6 +208,12 @@ public class ClusterBasedJobCoordinator {
     this.localityManager =
         new LocalityManager(new 
NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetContainerHostMapping.TYPE));
 
+    if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) {
+      ExecutionContainerIdManager executionContainerIdManager = new 
ExecutionContainerIdManager(
+          new NamespaceAwareCoordinatorStreamStore(metadataStore, 
SetExecutionEnvContainerIdMapping.TYPE));
+
+      
state.processorToExecutionId.putAll(executionContainerIdManager.readExecutionEnvironmentContainerIdMapping());
+    }
     // build metastore for container placement messages
     containerPlacementMetadataStore = new 
ContainerPlacementMetadataStore(metadataStore);
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index 1bc1669..a2ad540 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -241,7 +241,6 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
 
     
state.processorCount.set(state.jobModelManager.jobModel().getContainers().size());
     
state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size());
-
     // Request initial set of containers
     LocalityModel localityModel = localityManager.readLocality();
     Map<String, String> processorToHost = new HashMap<>();
@@ -334,7 +333,6 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
       return;
     }
     state.runningProcessors.remove(processorId);
-
     int exitStatus = resourceStatus.getExitCode();
     switch (exitStatus) {
       case SamzaResourceStatus.SUCCESS:
@@ -413,7 +411,6 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
           processorId, containerId, containerHost);
       state.pendingProcessors.remove(processorId);
       state.runningProcessors.put(processorId, resource);
-
       if (state.neededProcessors.decrementAndGet() == 0) {
         state.jobHealthy.set(true);
       }
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
index 784f0b4..930d366 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java
@@ -106,6 +106,17 @@ public class SamzaApplicationState {
   public final ConcurrentMap<String, SamzaResource> runningProcessors = new 
ConcurrentHashMap<>(0);
 
   /**
+   * Map of Samza processor Id (aka logical id) to execution environment 
container id (aka physical id ex: yarn container id).
+   * This map will be used during the start up phase of new AM in AM-HA.
+   *
+   * This map is populated at startup of ClusterBasedJobCoordinator.
+   * It initially holds the processorId to execution id mapping (if any) 
present in the coordinator stream.
+   * This could correspond to processors currently running or from previous 
attempt or previous deploy.
+   * TODO: SAMZA-2607 : remove this map and all its usages.
+   */
+  public final ConcurrentMap<String, String> processorToExecutionId = new 
ConcurrentHashMap<>(0);
+
+  /**
    *  Map of the failed Samza processor ID to resource status of the last 
attempted of the container.
    *  This map is only used when {@link 
org.apache.samza.config.ClusterManagerConfig#CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES}
    *  is set to false, this map tracks the containers which have exhausted all 
retires for restart and JobCoordinator is
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
 
b/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
new file mode 100644
index 0000000..557ef77
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import 
org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.serializers.Serde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used for persisting and reading the execution environment container id 
information into the metadata store.
+ * Processor id (logical Samza processor id) to execution environment 
container id (ex: yarn container id) is written.
+ **/
+public class ExecutionContainerIdManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExecutionContainerIdManager.class);
+
+  private final Serde<String> valueSerde;
+  private final MetadataStore metadataStore;
+
+  /**
+   * Builds the ExecutionContainerIdManager based upon the provided {@link 
MetadataStore} that is instantiated.
+   * Uses the {@link CoordinatorStreamValueSerde} to serialize messages before 
reading/writing into metadata store.
+   * @param metadataStore an instance of {@link MetadataStore} to read/write 
the processor container id mapping.
+   */
+  public ExecutionContainerIdManager(MetadataStore metadataStore) {
+    this.metadataStore = metadataStore;
+    this.valueSerde = new 
CoordinatorStreamValueSerde(SetExecutionEnvContainerIdMapping.TYPE);
+  }
+
+  public void writeExecutionEnvironmentContainerIdMapping(String processorId, 
String executionEnvContainerId) {
+    Preconditions.checkNotNull(processorId, "Container's logical processor id 
can not be null.");
+    Preconditions.checkNotNull(executionEnvContainerId, "Container's physical 
execution environment container id can not be null.");
+    LOG.info("Processor {} has executionEnvContainerId as {}", processorId, 
executionEnvContainerId);
+    metadataStore.put(processorId, 
valueSerde.toBytes(executionEnvContainerId));
+    metadataStore.flush();
+  }
+
+  public Map<String, String> readExecutionEnvironmentContainerIdMapping() {
+    Map<String, String> executionEnvironmentContainerIdMapping = new 
HashMap<>();
+    metadataStore.all().forEach((processorId, valueBytes) -> {
+      if (valueBytes != null) {
+        String executionEnvContainerId = valueSerde.fromBytes(valueBytes);
+        executionEnvironmentContainerIdMapping.put(processorId, 
executionEnvContainerId);
+      }
+    });
+    if (LOG.isDebugEnabled()) {
+      for (Map.Entry<String, String> entry : 
executionEnvironmentContainerIdMapping.entrySet()) {
+        LOG.debug("Processor {} has executionEnvContainerId as {}", 
entry.getKey(), entry.getValue());
+      }
+    }
+    return executionEnvironmentContainerIdMapping;
+  }
+
+  public void close() {
+    metadataStore.close();
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
index 9b862bd..86983f1 100644
--- 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import 
org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
@@ -51,7 +52,10 @@ public class CoordinatorStreamValueSerde implements 
Serde<String> {
   public String fromBytes(byte[] bytes) {
     Map<String, Object> values = messageSerde.fromBytes(bytes);
     CoordinatorStreamMessage message = new CoordinatorStreamMessage(new 
Object[]{}, values);
-    if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
+    if (type.equalsIgnoreCase(SetExecutionEnvContainerIdMapping.TYPE)) {
+      SetExecutionEnvContainerIdMapping executionContainerIdMapping = new 
SetExecutionEnvContainerIdMapping(message);
+      return executionContainerIdMapping.getExecutionEnvironmentContainerId();
+    } else if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping hostMapping = new 
SetContainerHostMapping(message);
       return hostMapping.getHostLocality();
     } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) {
@@ -76,7 +80,11 @@ public class CoordinatorStreamValueSerde implements 
Serde<String> {
 
   @Override
   public byte[] toBytes(String value) {
-    if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
+    if (type.equalsIgnoreCase(SetExecutionEnvContainerIdMapping.TYPE)) {
+      SetExecutionEnvContainerIdMapping
+          executionEnvContainerIdMapping = new 
SetExecutionEnvContainerIdMapping(SOURCE, "", value);
+      return 
messageSerde.toBytes(executionEnvContainerIdMapping.getMessageMap());
+    } else if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping hostMapping = new 
SetContainerHostMapping(SOURCE, "", value, "", "");
       return messageSerde.toBytes(hostMapping.getMessageMap());
     } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) {
diff --git 
a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java
 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java
new file mode 100644
index 0000000..508b1df
--- /dev/null
+++ 
b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream.messages;
+
+/**
+ * SetExecutionEnvContainerIdMapping is used internally by the Samza framework 
to
+ * persist the processorId-to-executionEnvContainerId mappings.
+ *
+ * Structure of the message looks like:
+ * {
+ *     Key: $ProcessorId
+ *     Type: set-execution-env-container-id-mapping
+ *     Source: "SamzaContainer-$ProcessorId"
+ *     MessageMap:
+ *     {
+ *         execution-env-container-id: execution environment container id
+ *     }
+ * }
+ * */
+public class SetExecutionEnvContainerIdMapping extends 
CoordinatorStreamMessage {
+  public static final String TYPE = "set-execution-env-container-id-mapping";
+  public static final String EXEC_ENV_ID_KEY = "execution-env-container-id";
+
+  /**
+   * SetExecutionEnvContainerIdMapping is used to set the processor id to 
execution environment id mapping information.
+   * @param message which holds the processor id to execution environment id 
mapping information.
+   */
+  public SetExecutionEnvContainerIdMapping(CoordinatorStreamMessage message) {
+    super(message.getKeyArray(), message.getMessageMap());
+  }
+
+  /**
+   * SetExecutionEnvContainerIdMapping is used to set the processor id to 
execution environment id mapping information.
+   * @param source the source of the message
+   * @param key the key which is used to persist the message
+   * @param executionEnvContainerId the execution environment container id
+   */
+  public SetExecutionEnvContainerIdMapping(String source, String key, String 
executionEnvContainerId) {
+    super(source);
+    setType(TYPE);
+    setKey(key);
+    putMessageValue(EXEC_ENV_ID_KEY, executionEnvContainerId);
+  }
+
+  public String getExecutionEnvironmentContainerId() {
+    return getMessageValue(EXEC_ENV_ID_KEY);
+  }
+}
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java 
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index ec477c9..459ad89 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -31,6 +31,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.ContainerHeartbeatMonitor;
+import org.apache.samza.container.ExecutionContainerIdManager;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
@@ -40,6 +41,7 @@ import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import 
org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
 import org.apache.samza.diagnostics.DiagnosticsManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metadatastore.MetadataStore;
@@ -98,7 +100,7 @@ public class ContainerLaunchUtil {
       String jobName,
       String jobId,
       String containerId,
-      Optional<String> execEnvContainerId,
+      Optional<String> execEnvContainerIdOptional,
       JobModel jobModel,
       Config config,
       Optional<ExternalContext> externalContextOptional) {
@@ -118,7 +120,7 @@ public class ContainerLaunchUtil {
       Map<String, MetricsReporter> metricsReporters = 
loadMetricsReporters(appDesc, containerId, config);
 
       // Creating diagnostics manager and reporter, and wiring it respectively
-      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
diagnosticsManagerReporterPair = 
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, 
execEnvContainerId, config);
+      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
diagnosticsManagerReporterPair = 
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, 
execEnvContainerIdOptional, config);
       Option<DiagnosticsManager> diagnosticsManager = Option.empty();
       if (diagnosticsManagerReporterPair.isPresent()) {
         diagnosticsManager = 
Option.apply(diagnosticsManagerReporterPair.get().getKey());
@@ -153,6 +155,14 @@ public class ContainerLaunchUtil {
         heartbeatMonitor.start();
       }
 
+      if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) 
{
+        execEnvContainerIdOptional.ifPresent(execEnvContainerId -> {
+          ExecutionContainerIdManager executionContainerIdManager = new 
ExecutionContainerIdManager(
+              new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, 
SetExecutionEnvContainerIdMapping.TYPE));
+          
executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(containerId,
 execEnvContainerId);
+        });
+      }
+
       container.run();
       if (heartbeatMonitor != null) {
         heartbeatMonitor.stop();
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java
 
b/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java
new file mode 100644
index 0000000..2ecd88c
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import 
org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
+import 
org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import 
org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestExecutionContainerIdManager {
+
+  private static final Config
+      CONFIG = new MapConfig(ImmutableMap.of("job.name", "test-job", 
"job.coordinator.system", "test-kafka"));
+
+  private CoordinatorStreamStore coordinatorStreamStore;
+  private CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil;
+  private MetadataStore store;
+  private ExecutionContainerIdManager executionContainerIdManager;
+
+  @Before
+  public void setup() {
+    coordinatorStreamStoreTestUtil = new 
CoordinatorStreamStoreTestUtil(CONFIG);
+    coordinatorStreamStore = 
coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
+    store = Mockito.spy(new 
NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore,
+        SetExecutionEnvContainerIdMapping.TYPE));
+    executionContainerIdManager = new ExecutionContainerIdManager(store);
+
+  }
+
+  @After
+  public void tearDown() {
+    MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
+  }
+
+  @Test
+  public void testExecutionContainerIdManager() {
+    String physicalId = "container_123_123_123";
+    String processorId = "0";
+
+    
executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId,
 physicalId);
+    Map<String, String> localMap = 
executionContainerIdManager.readExecutionEnvironmentContainerIdMapping();
+
+    Map<String, String> expectedMap = ImmutableMap.of(processorId, physicalId);
+    assertEquals(expectedMap, localMap);
+
+    executionContainerIdManager.close();
+
+    MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer 
producer = 
coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemProducer();
+    MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer 
consumer = 
coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemConsumer();
+    assertTrue(producer.isStopped());
+    assertTrue(consumer.isStopped());
+
+    ArgumentCaptor<byte[]> argument1 = ArgumentCaptor.forClass(byte[].class);
+    Mockito.verify(store).put(Mockito.eq(processorId), argument1.capture());
+    CoordinatorStreamValueSerde valueSerde = new 
CoordinatorStreamValueSerde(SetExecutionEnvContainerIdMapping.TYPE);
+    assertEquals(physicalId, valueSerde.fromBytes(argument1.getValue()));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testInvalidKeyExecutionContainerIdManager() {
+    String physicalId = "container_123_123_123";
+    String processorId = null;
+    
executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId,
 physicalId);
+  }
+  @Test(expected = NullPointerException.class)
+  public void testInvalidValueExecutionContainerIdManager() {
+    String physicalId = null;
+    String processorId = "0";
+    
executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId,
 physicalId);
+  }
+}
+

Reply via email to