This is an automated email from the ASF dual-hosted git repository. bharathkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new c212cac SAMZA-2604: Datamodel change to capture physical container id for AM HA (#1445) c212cac is described below commit c212cac420864d09f2663da2e51c23e4718d6812 Author: lakshmi-manasa-g <mgadup...@linkedin.com> AuthorDate: Mon Nov 30 23:37:21 2020 -0800 SAMZA-2604: Datamodel change to capture physical container id for AM HA (#1445) Feature: Main feature is Cluster based Job coordinator (aka AM) high availability. The feature ensures that the new AM can establish connection with already running containers to avoid restarting all running containers when AM dies. This PR enables capturing of the physical execution environment container id (ex: yarn container id "container_123_123") mapping to Samza logical processor id (ex: "0"). In future PRs, this mapping will be used by the new AM. Changes: Introduce new Coordinator Stream Message and manager to read/write this message Container upon launch will write to c-stream, its logical and physical id Job Coordinator(AM) upon launch will read the mapping of all containers from c-stream. Tests: added unit test for new manager working on tests for other classes as they have no coverage for relevant code. API changes: New c-stream message Usage instructions: None Upgrade instructions: Backwards compatible. N/A --- .../clustermanager/ClusterBasedJobCoordinator.java | 8 ++ .../clustermanager/ContainerProcessManager.java | 3 - .../clustermanager/SamzaApplicationState.java | 11 +++ .../container/ExecutionContainerIdManager.java | 79 ++++++++++++++++ .../stream/CoordinatorStreamValueSerde.java | 12 ++- .../SetExecutionEnvContainerIdMapping.java | 65 +++++++++++++ .../apache/samza/runtime/ContainerLaunchUtil.java | 14 ++- .../container/TestExecutionContainerIdManager.java | 105 +++++++++++++++++++++ 8 files changed, 290 insertions(+), 7 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java index 63ee3c7..b98c727 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java @@ -34,6 +34,7 @@ import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.StorageConfig; import org.apache.samza.config.TaskConfig; +import org.apache.samza.container.ExecutionContainerIdManager; import org.apache.samza.container.LocalityManager; import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.InputStreamsDiscoveredException; @@ -45,6 +46,7 @@ import org.apache.samza.coordinator.StreamRegexMonitor; import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; import org.apache.samza.coordinator.stream.messages.SetChangelogMapping; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; +import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping; import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.job.model.JobModelUtil; @@ -206,6 +208,12 @@ public class ClusterBasedJobCoordinator { this.localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(metadataStore, SetContainerHostMapping.TYPE)); + if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) { + ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager( + new NamespaceAwareCoordinatorStreamStore(metadataStore, SetExecutionEnvContainerIdMapping.TYPE)); + + state.processorToExecutionId.putAll(executionContainerIdManager.readExecutionEnvironmentContainerIdMapping()); + } // build metastore for container placement messages containerPlacementMetadataStore = new ContainerPlacementMetadataStore(metadataStore); diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index 1bc1669..a2ad540 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -241,7 +241,6 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback state.processorCount.set(state.jobModelManager.jobModel().getContainers().size()); state.neededProcessors.set(state.jobModelManager.jobModel().getContainers().size()); - // Request initial set of containers LocalityModel localityModel = localityManager.readLocality(); Map<String, String> processorToHost = new HashMap<>(); @@ -334,7 +333,6 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback return; } state.runningProcessors.remove(processorId); - int exitStatus = resourceStatus.getExitCode(); switch (exitStatus) { case SamzaResourceStatus.SUCCESS: @@ -413,7 +411,6 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback processorId, containerId, containerHost); state.pendingProcessors.remove(processorId); state.runningProcessors.put(processorId, resource); - if (state.neededProcessors.decrementAndGet() == 0) { state.jobHealthy.set(true); } diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java index 784f0b4..930d366 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java @@ -106,6 +106,17 @@ public class SamzaApplicationState { public final ConcurrentMap<String, SamzaResource> runningProcessors = new ConcurrentHashMap<>(0); /** + * Map of Samza processor Id (aka logical id) to execution environment container id (aka physical id ex: yarn container id). + * This map will be used during the start up phase of new AM in AM-HA. + * + * This map is populated at startup of ClusterBasedJobCoordinator. + * It initially holds the processorId to execution id mapping (if any) present in the coordinator stream. + * This could correspond to processors currently running or from previous attempt or previous deploy. + * TODO: SAMZA-2607 : remove this map and all its usages. + */ + public final ConcurrentMap<String, String> processorToExecutionId = new ConcurrentHashMap<>(0); + + /** * Map of the failed Samza processor ID to resource status of the last attempted of the container. * This map is only used when {@link org.apache.samza.config.ClusterManagerConfig#CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES} * is set to false, this map tracks the containers which have exhausted all retires for restart and JobCoordinator is diff --git a/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java b/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java new file mode 100644 index 0000000..557ef77 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/container/ExecutionContainerIdManager.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde; +import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping; +import org.apache.samza.metadatastore.MetadataStore; +import org.apache.samza.serializers.Serde; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used for persisting and reading the execution environment container id information into the metadata store. + * Processor id (logical Samza processor id) to execution environment container id (ex: yarn container id) is written. + **/ +public class ExecutionContainerIdManager { + private static final Logger LOG = LoggerFactory.getLogger(ExecutionContainerIdManager.class); + + private final Serde<String> valueSerde; + private final MetadataStore metadataStore; + + /** + * Builds the ExecutionContainerIdManager based upon the provided {@link MetadataStore} that is instantiated. + * Uses the {@link CoordinatorStreamValueSerde} to serialize messages before reading/writing into metadata store. + * @param metadataStore an instance of {@link MetadataStore} to read/write the processor container id mapping. + */ + public ExecutionContainerIdManager(MetadataStore metadataStore) { + this.metadataStore = metadataStore; + this.valueSerde = new CoordinatorStreamValueSerde(SetExecutionEnvContainerIdMapping.TYPE); + } + + public void writeExecutionEnvironmentContainerIdMapping(String processorId, String executionEnvContainerId) { + Preconditions.checkNotNull(processorId, "Container's logical processor id can not be null."); + Preconditions.checkNotNull(executionEnvContainerId, "Container's physical execution environment container id can not be null."); + LOG.info("Processor {} has executionEnvContainerId as {}", processorId, executionEnvContainerId); + metadataStore.put(processorId, valueSerde.toBytes(executionEnvContainerId)); + metadataStore.flush(); + } + + public Map<String, String> readExecutionEnvironmentContainerIdMapping() { + Map<String, String> executionEnvironmentContainerIdMapping = new HashMap<>(); + metadataStore.all().forEach((processorId, valueBytes) -> { + if (valueBytes != null) { + String executionEnvContainerId = valueSerde.fromBytes(valueBytes); + executionEnvironmentContainerIdMapping.put(processorId, executionEnvContainerId); + } + }); + if (LOG.isDebugEnabled()) { + for (Map.Entry<String, String> entry : executionEnvironmentContainerIdMapping.entrySet()) { + LOG.debug("Processor {} has executionEnvContainerId as {}", entry.getKey(), entry.getValue()); + } + } + return executionEnvironmentContainerIdMapping; + } + + public void close() { + metadataStore.close(); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java index 9b862bd..86983f1 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage; import org.apache.samza.coordinator.stream.messages.SetChangelogMapping; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; +import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping; import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping; import org.apache.samza.coordinator.stream.messages.SetConfig; import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping; @@ -51,7 +52,10 @@ public class CoordinatorStreamValueSerde implements Serde<String> { public String fromBytes(byte[] bytes) { Map<String, Object> values = messageSerde.fromBytes(bytes); CoordinatorStreamMessage message = new CoordinatorStreamMessage(new Object[]{}, values); - if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) { + if (type.equalsIgnoreCase(SetExecutionEnvContainerIdMapping.TYPE)) { + SetExecutionEnvContainerIdMapping executionContainerIdMapping = new SetExecutionEnvContainerIdMapping(message); + return executionContainerIdMapping.getExecutionEnvironmentContainerId(); + } else if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) { SetContainerHostMapping hostMapping = new SetContainerHostMapping(message); return hostMapping.getHostLocality(); } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) { @@ -76,7 +80,11 @@ public class CoordinatorStreamValueSerde implements Serde<String> { @Override public byte[] toBytes(String value) { - if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) { + if (type.equalsIgnoreCase(SetExecutionEnvContainerIdMapping.TYPE)) { + SetExecutionEnvContainerIdMapping + executionEnvContainerIdMapping = new SetExecutionEnvContainerIdMapping(SOURCE, "", value); + return messageSerde.toBytes(executionEnvContainerIdMapping.getMessageMap()); + } else if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) { SetContainerHostMapping hostMapping = new SetContainerHostMapping(SOURCE, "", value, "", ""); return messageSerde.toBytes(hostMapping.getMessageMap()); } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) { diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java new file mode 100644 index 0000000..508b1df --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetExecutionEnvContainerIdMapping.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.stream.messages; + +/** + * SetExecutionEnvContainerIdMapping is used internally by the Samza framework to + * persist the processorId-to-executionEnvContainerId mappings. + * + * Structure of the message looks like: + * { + * Key: $ProcessorId + * Type: set-execution-env-container-id-mapping + * Source: "SamzaContainer-$ProcessorId" + * MessageMap: + * { + * execution-env-container-id: execution environment container id + * } + * } + * */ +public class SetExecutionEnvContainerIdMapping extends CoordinatorStreamMessage { + public static final String TYPE = "set-execution-env-container-id-mapping"; + public static final String EXEC_ENV_ID_KEY = "execution-env-container-id"; + + /** + * SetExecutionEnvContainerIdMapping is used to set the processor id to execution environment id mapping information. + * @param message which holds the processor id to execution environment id mapping information. + */ + public SetExecutionEnvContainerIdMapping(CoordinatorStreamMessage message) { + super(message.getKeyArray(), message.getMessageMap()); + } + + /** + * SetExecutionEnvContainerIdMapping is used to set the processor id to execution environment id mapping information. + * @param source the source of the message + * @param key the key which is used to persist the message + * @param executionEnvContainerId the execution environment container id + */ + public SetExecutionEnvContainerIdMapping(String source, String key, String executionEnvContainerId) { + super(source); + setType(TYPE); + setKey(key); + putMessageValue(EXEC_ENV_ID_KEY, executionEnvContainerId); + } + + public String getExecutionEnvironmentContainerId() { + return getMessageValue(EXEC_ENV_ID_KEY); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java index ec477c9..459ad89 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java @@ -31,6 +31,7 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.MetricsConfig; import org.apache.samza.config.ShellCommandConfig; import org.apache.samza.container.ContainerHeartbeatMonitor; +import org.apache.samza.container.ExecutionContainerIdManager; import org.apache.samza.container.LocalityManager; import org.apache.samza.container.SamzaContainer; import org.apache.samza.container.SamzaContainer$; @@ -40,6 +41,7 @@ import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; import org.apache.samza.coordinator.stream.messages.SetConfig; import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; +import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping; import org.apache.samza.diagnostics.DiagnosticsManager; import org.apache.samza.job.model.JobModel; import org.apache.samza.metadatastore.MetadataStore; @@ -98,7 +100,7 @@ public class ContainerLaunchUtil { String jobName, String jobId, String containerId, - Optional<String> execEnvContainerId, + Optional<String> execEnvContainerIdOptional, JobModel jobModel, Config config, Optional<ExternalContext> externalContextOptional) { @@ -118,7 +120,7 @@ public class ContainerLaunchUtil { Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config); // Creating diagnostics manager and reporter, and wiring it respectively - Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerId, config); + Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerIdOptional, config); Option<DiagnosticsManager> diagnosticsManager = Option.empty(); if (diagnosticsManagerReporterPair.isPresent()) { diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey()); @@ -153,6 +155,14 @@ public class ContainerLaunchUtil { heartbeatMonitor.start(); } + if (new JobConfig(config).getApplicationMasterHighAvailabilityEnabled()) { + execEnvContainerIdOptional.ifPresent(execEnvContainerId -> { + ExecutionContainerIdManager executionContainerIdManager = new ExecutionContainerIdManager( + new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetExecutionEnvContainerIdMapping.TYPE)); + executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(containerId, execEnvContainerId); + }); + } + container.run(); if (heartbeatMonitor != null) { heartbeatMonitor.stop(); diff --git a/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java b/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java new file mode 100644 index 0000000..2ecd88c --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/container/TestExecutionContainerIdManager.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; +import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil; +import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore; +import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde; +import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory; +import org.apache.samza.coordinator.stream.messages.SetExecutionEnvContainerIdMapping; +import org.apache.samza.metadatastore.MetadataStore; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class TestExecutionContainerIdManager { + + private static final Config + CONFIG = new MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", "test-kafka")); + + private CoordinatorStreamStore coordinatorStreamStore; + private CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil; + private MetadataStore store; + private ExecutionContainerIdManager executionContainerIdManager; + + @Before + public void setup() { + coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(CONFIG); + coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore(); + store = Mockito.spy(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, + SetExecutionEnvContainerIdMapping.TYPE)); + executionContainerIdManager = new ExecutionContainerIdManager(store); + + } + + @After + public void tearDown() { + MockCoordinatorStreamSystemFactory.disableMockConsumerCache(); + } + + @Test + public void testExecutionContainerIdManager() { + String physicalId = "container_123_123_123"; + String processorId = "0"; + + executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId, physicalId); + Map<String, String> localMap = executionContainerIdManager.readExecutionEnvironmentContainerIdMapping(); + + Map<String, String> expectedMap = ImmutableMap.of(processorId, physicalId); + assertEquals(expectedMap, localMap); + + executionContainerIdManager.close(); + + MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer producer = coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemProducer(); + MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer consumer = coordinatorStreamStoreTestUtil.getMockCoordinatorStreamSystemConsumer(); + assertTrue(producer.isStopped()); + assertTrue(consumer.isStopped()); + + ArgumentCaptor<byte[]> argument1 = ArgumentCaptor.forClass(byte[].class); + Mockito.verify(store).put(Mockito.eq(processorId), argument1.capture()); + CoordinatorStreamValueSerde valueSerde = new CoordinatorStreamValueSerde(SetExecutionEnvContainerIdMapping.TYPE); + assertEquals(physicalId, valueSerde.fromBytes(argument1.getValue())); + } + + @Test(expected = NullPointerException.class) + public void testInvalidKeyExecutionContainerIdManager() { + String physicalId = "container_123_123_123"; + String processorId = null; + executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId, physicalId); + } + @Test(expected = NullPointerException.class) + public void testInvalidValueExecutionContainerIdManager() { + String physicalId = null; + String processorId = "0"; + executionContainerIdManager.writeExecutionEnvironmentContainerIdMapping(processorId, physicalId); + } +} +