Repository: samza Updated Branches: refs/heads/master ed621b269 -> 666835186
SAMZA-1824: Handle errors from the async-NMClient when launching containers - Updated internal state that tracks "pending" containers correctly - Refactored `YarnClusterResourceManager` for testability. Add an unit test Author: Jagadish <jvenkatra...@linkedin.com> Reviewers: Jake Maes<jm...@linkedin.com> Closes #615 from vjagadish1989/container-launch-error Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/66683518 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/66683518 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/66683518 Branch: refs/heads/master Commit: 6668351867a0548a944660f72f953baf74d35707 Parents: ed621b2 Author: Jagadish <jvenkatra...@linkedin.com> Authored: Mon Sep 10 09:48:35 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Mon Sep 10 09:48:35 2018 -0700 ---------------------------------------------------------------------- .../job/yarn/YarnClusterResourceManager.java | 41 ++++++---- .../yarn/TestYarnClusterResourceManager.java | 81 ++++++++++++++++++++ 2 files changed, 107 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/66683518/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java index 6f175ea..53b61d9 100644 --- a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java +++ b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java @@ -30,11 +30,9 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; @@ -114,17 +112,30 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement private final ConcurrentHashMap<SamzaResource, Container> allocatedResources = new ConcurrentHashMap<>(); private final ConcurrentHashMap<SamzaResourceRequest, AMRMClient.ContainerRequest> requestsMap = new ConcurrentHashMap<>(); - private final ConcurrentHashMap<ContainerId, Container> containersPendingStartup = new ConcurrentHashMap<>(); - private final SamzaAppMasterMetrics metrics; - final AtomicBoolean started = new AtomicBoolean(false); + private final AtomicBoolean started = new AtomicBoolean(false); private final Object lock = new Object(); private final NMClientAsync nmClientAsync; private static final Logger log = LoggerFactory.getLogger(YarnClusterResourceManager.class); private final Config config; + YarnClusterResourceManager(AMRMClientAsync amClientAsync, NMClientAsync nmClientAsync, Callback callback, + YarnAppState yarnAppState, SamzaYarnAppMasterLifecycle lifecycle, SamzaYarnAppMasterService service, + SamzaAppMasterMetrics metrics, YarnConfiguration yarnConfiguration, Config config) { + super(callback); + this.yarnConfiguration = yarnConfiguration; + this.metrics = metrics; + this.yarnConfig = new YarnConfig(config); + this.config = config; + this.amClient = amClientAsync; + this.state = yarnAppState; + this.lifecycle = lifecycle; + this.service = service; + this.nmClientAsync = nmClientAsync; + } + /** * Creates an YarnClusterResourceManager from config, a jobModelReader and a callback. * @param config to instantiate the container manager with @@ -513,18 +524,20 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement } @Override - public void onStartContainerError(ContainerId containerId, Throwable t) { - log.error(String.format("Container: %s could not start.", containerId), t); + public void onStartContainerError(ContainerId yarnContainerId, Throwable t) { + log.error(String.format("Yarn Container: %s could not start.", yarnContainerId), t); - Container container = containersPendingStartup.remove(containerId); + String samzaContainerId = getPendingSamzaContainerId(yarnContainerId); - if (container != null) { - SamzaResource resource = new SamzaResource(container.getResource().getVirtualCores(), - container.getResource().getMemory(), container.getNodeId().getHost(), containerId.toString()); - log.info("Invoking failure callback for container: {}", containerId); + if (samzaContainerId != null) { + YarnContainer container = state.pendingYarnContainers.remove(samzaContainerId); + log.info("Failed Yarn Container: {} had Samza ContainerId: {} ", yarnContainerId, samzaContainerId); + SamzaResource resource = new SamzaResource(container.resource().getVirtualCores(), + container.resource().getMemory(), container.nodeId().getHost(), yarnContainerId.toString()); + log.info("Invoking failure callback for container: {}", yarnContainerId); clusterManagerCallback.onStreamProcessorLaunchFailure(resource, new SamzaContainerLaunchException(t)); } else { - log.info("Got an invalid notification for container: {}", containerId); + log.info("Got an invalid notification for container: {}", yarnContainerId); } } @@ -680,7 +693,6 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement log.info("Container ID {} using environment variables: {}", samzaContainerId, sb.toString()); } - /** * Gets the environment variables from the specified {@link CommandBuilder} and escapes certain characters. * @@ -726,5 +738,4 @@ public class YarnClusterResourceManager extends ClusterResourceManager implement return null; } - } http://git-wip-us.apache.org/repos/asf/samza/blob/66683518/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java new file mode 100644 index 0000000..7503c5b --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.job.yarn; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.client.api.async.NMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.samza.clustermanager.ClusterResourceManager; +import org.apache.samza.config.Config; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.*; + +public class TestYarnClusterResourceManager { + + @Test + public void testErrorInStartContainerShouldUpdateState() { + // create mocks + final int samzaContainerId = 1; + YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class); + SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class); + Config config = mock(Config.class); + AMRMClientAsync asyncClient = mock(AMRMClientAsync.class); + YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), "host", 8080, 8081); + SamzaYarnAppMasterLifecycle lifecycle = mock(SamzaYarnAppMasterLifecycle.class); + SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class); + NMClientAsync asyncNMClient = mock(NMClientAsync.class); + ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class); + + // start the cluster manager + YarnClusterResourceManager yarnClusterResourceManager = new YarnClusterResourceManager(asyncClient, asyncNMClient, + callback, yarnAppState, lifecycle, service, metrics, yarnConfiguration, config); + + yarnAppState.pendingYarnContainers.put(String.valueOf(samzaContainerId), + new YarnContainer(Container.newInstance( + ContainerId.newContainerId( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(10000l, 1), 1), 1), + NodeId.newInstance("host1", 8088), "http://host1", + Resource.newInstance(1024, 1), Priority.newInstance(1), + Token.newInstance("id".getBytes(), "read", "password".getBytes(), "service")))); + + yarnClusterResourceManager.start(); + assertEquals(1, yarnAppState.pendingYarnContainers.size()); + + yarnClusterResourceManager.onStartContainerError(ContainerId.newContainerId( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(10000l, 1), 1), 1), + new Exception()); + + assertEquals(0, yarnAppState.pendingYarnContainers.size()); + verify(callback, times(1)).onStreamProcessorLaunchFailure(anyObject(), any(Exception.class)); + } +} \ No newline at end of file