YARN-8349. Remove YARN registry entries when a service is killed by the RM. (Billie Rinaldi via wangda)
Change-Id: Ia58db3637789a8921482f564aa9bdf99c45cc36c Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ff583d3f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ff583d3f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ff583d3f Branch: refs/heads/YARN-1011 Commit: ff583d3fa3325029bc691ec22d817aee37e5e85d Parents: 8956e5b Author: Wangda Tan <wan...@apache.org> Authored: Fri Jun 1 14:07:23 2018 -0700 Committer: Wangda Tan <wan...@apache.org> Committed: Fri Jun 1 14:49:18 2018 -0700 ---------------------------------------------------------------------- hadoop-project/pom.xml | 7 + .../hadoop-yarn-services-api/pom.xml | 16 ++ .../yarn/service/client/ApiServiceClient.java | 11 + .../yarn/service/TestCleanupAfterKill.java | 94 ++++++ .../src/test/resources/yarn-site.xml | 19 ++ .../yarn/service/client/ServiceClient.java | 24 ++ .../hadoop/yarn/service/ServiceTestUtils.java | 135 +++++++++ .../yarn/service/TestYarnNativeServices.java | 129 --------- .../hadoop/yarn/client/api/AppAdminClient.java | 273 ------------------ .../hadoop/yarn/client/api/AppAdminClient.java | 285 +++++++++++++++++++ .../registry/client/binding/RegistryUtils.java | 10 + .../server/resourcemanager/rmapp/RMAppImpl.java | 30 ++ 12 files changed, 631 insertions(+), 402 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff583d3f/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 59a9bd2..12897a7 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -452,6 +452,13 @@ <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-services-core</artifactId> + <version>${hadoop.version}</version> + <type>test-jar</type> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>${hadoop.version}</version> <type>test-jar</type> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff583d3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/pom.xml index 45168a9..ab76218 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/pom.xml @@ -139,6 +139,22 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-services-core</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff583d3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java index a8e2f51..18d45fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/client/ApiServiceClient.java @@ -588,6 +588,17 @@ public class ApiServiceClient extends AppAdminClient { return result; } + @Override + public int actionCleanUp(String appName, String userName) throws + IOException, YarnException { + ServiceClient sc = new ServiceClient(); + sc.init(getConfig()); + sc.start(); + int result = sc.actionCleanUp(appName, userName); + sc.close(); + return result; + } + private static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE = new JsonSerDeser<>(Container[].class, PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff583d3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestCleanupAfterKill.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestCleanupAfterKill.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestCleanupAfterKill.java new file mode 100644 index 0000000..51e834a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/java/org/apache/hadoop/yarn/service/TestCleanupAfterKill.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.service; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.client.ServiceClient; +import org.apache.hadoop.yarn.service.conf.YarnServiceConstants; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +/** + * Minicluster test that verifies registry cleanup when app lifetime is + * exceeded. + */ +public class TestCleanupAfterKill extends ServiceTestUtils { + private static final Logger LOG = + LoggerFactory.getLogger(TestCleanupAfterKill.class); + + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Before + public void setup() throws Exception { + File tmpYarnDir = new File("target", "tmp"); + FileUtils.deleteQuietly(tmpYarnDir); + } + + @After + public void tearDown() throws IOException { + shutdown(); + } + + @Test(timeout = 200000) + public void testRegistryCleanedOnLifetimeExceeded() throws Exception { + setupInternal(NUM_NMS); + ServiceClient client = createClient(getConf()); + Service exampleApp = createExampleApplication(); + exampleApp.setLifetime(30L); + client.actionCreate(exampleApp); + waitForServiceToBeStable(client, exampleApp); + String serviceZKPath = RegistryUtils.servicePath(RegistryUtils + .currentUser(), YarnServiceConstants.APP_TYPE, exampleApp.getName()); + Assert.assertTrue("Registry ZK service path doesn't exist", + getCuratorService().zkPathExists(serviceZKPath)); + + // wait for app to be killed by RM + ApplicationId exampleAppId = ApplicationId.fromString(exampleApp.getId()); + GenericTestUtils.waitFor(() -> { + try { + ApplicationReport ar = client.getYarnClient() + .getApplicationReport(exampleAppId); + return ar.getYarnApplicationState() == YarnApplicationState.KILLED; + } catch (YarnException | IOException e) { + throw new RuntimeException("while waiting", e); + } + }, 2000, 200000); + Assert.assertFalse("Registry ZK service path still exists after killed", + getCuratorService().zkPathExists(serviceZKPath)); + + LOG.info("Destroy the service"); + Assert.assertEquals(0, client.actionDestroy(exampleApp.getName())); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff583d3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/resources/yarn-site.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/resources/yarn-site.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/resources/yarn-site.xml new file mode 100644 index 0000000..daac23a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-api/src/test/resources/yarn-site.xml @@ -0,0 +1,19 @@ +<?xml version="1.0" encoding="UTF-8"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. See accompanying LICENSE file. +--> + +<configuration> + <!-- Dummy (invalid) config file to be overwritten by ServiceTestUtils with MiniCluster configuration. --> +</configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff583d3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java index c86f5de..3f6e896 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java @@ -308,6 +308,16 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, return actionUpgrade(persistedService, containersToUpgrade); } + @Override + public int actionCleanUp(String appName, String userName) throws + IOException, YarnException { + if (cleanUpRegistry(appName, userName)) { + return EXIT_SUCCESS; + } else { + return EXIT_FALSE; + } + } + public int actionUpgrade(Service service, List<Container> compInstances) throws IOException, YarnException { ApplicationReport appReport = @@ -639,9 +649,23 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes, } } + private boolean cleanUpRegistry(String serviceName, String user) throws + SliderException { + String encodedName = RegistryUtils.registryUser(user); + + String registryPath = RegistryUtils.servicePath(encodedName, + YarnServiceConstants.APP_TYPE, serviceName); + return cleanUpRegistryPath(registryPath, serviceName); + } + private boolean cleanUpRegistry(String serviceName) throws SliderException { String registryPath = ServiceRegistryUtils.registryPathForInstance(serviceName); + return cleanUpRegistryPath(registryPath, serviceName); + } + + private boolean cleanUpRegistryPath(String registryPath, String + serviceName) throws SliderException { try { if (getRegistryClient().exists(registryPath)) { getRegistryClient().delete(registryPath, true); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff583d3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java index 86b4cea..3d1412d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/ServiceTestUtils.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.service; import com.google.common.base.Throwables; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingCluster; import org.apache.hadoop.conf.Configuration; @@ -29,13 +31,17 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.http.HttpServer2; import org.apache.hadoop.registry.client.impl.zk.CuratorService; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.service.api.records.Component; +import org.apache.hadoop.yarn.service.api.records.Container; +import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.Resource; import org.apache.hadoop.yarn.service.api.records.Service; +import org.apache.hadoop.yarn.service.api.records.ServiceState; import org.apache.hadoop.yarn.service.client.ServiceClient; import org.apache.hadoop.yarn.service.conf.YarnServiceConf; import org.apache.hadoop.yarn.service.exceptions.SliderException; @@ -60,6 +66,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeoutException; import static org.apache.hadoop.registry.client.api.RegistryConstants.KEY_REGISTRY_ZK_QUORUM; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC; @@ -418,4 +425,132 @@ public class ServiceTestUtils { return serviceBasePath; } } + + /** + * Wait until all the containers for all components become ready state. + * + * @param client + * @param exampleApp + * @return all ready containers of a service. + * @throws TimeoutException + * @throws InterruptedException + */ + protected Multimap<String, String> waitForAllCompToBeReady(ServiceClient + client, Service exampleApp) throws TimeoutException, + InterruptedException { + int expectedTotalContainers = countTotalContainers(exampleApp); + + Multimap<String, String> allContainers = HashMultimap.create(); + + GenericTestUtils.waitFor(() -> { + try { + Service retrievedApp = client.getStatus(exampleApp.getName()); + int totalReadyContainers = 0; + allContainers.clear(); + LOG.info("Num Components " + retrievedApp.getComponents().size()); + for (Component component : retrievedApp.getComponents()) { + LOG.info("looking for " + component.getName()); + LOG.info(component.toString()); + if (component.getContainers() != null) { + if (component.getContainers().size() == exampleApp + .getComponent(component.getName()).getNumberOfContainers()) { + for (Container container : component.getContainers()) { + LOG.info( + "Container state " + container.getState() + ", component " + + component.getName()); + if (container.getState() == ContainerState.READY) { + totalReadyContainers++; + allContainers.put(component.getName(), container.getId()); + LOG.info("Found 1 ready container " + container.getId()); + } + } + } else { + LOG.info(component.getName() + " Expected number of containers " + + exampleApp.getComponent(component.getName()) + .getNumberOfContainers() + ", current = " + component + .getContainers()); + } + } + } + LOG.info("Exit loop, totalReadyContainers= " + totalReadyContainers + + " expected = " + expectedTotalContainers); + return totalReadyContainers == expectedTotalContainers; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + }, 2000, 200000); + return allContainers; + } + + /** + * Wait until service state becomes stable. A service is stable when all + * requested containers of all components are running and in ready state. + * + * @param client + * @param exampleApp + * @throws TimeoutException + * @throws InterruptedException + */ + protected void waitForServiceToBeStable(ServiceClient client, + Service exampleApp) throws TimeoutException, InterruptedException { + waitForServiceToBeStable(client, exampleApp, 200000); + } + + protected void waitForServiceToBeStable(ServiceClient client, + Service exampleApp, int waitForMillis) + throws TimeoutException, InterruptedException { + waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE, + waitForMillis); + } + + /** + * Wait until service is started. It does not have to reach a stable state. + * + * @param client + * @param exampleApp + * @throws TimeoutException + * @throws InterruptedException + */ + protected void waitForServiceToBeStarted(ServiceClient client, + Service exampleApp) throws TimeoutException, InterruptedException { + waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED); + } + + protected void waitForServiceToBeInState(ServiceClient client, + Service exampleApp, ServiceState desiredState) throws TimeoutException, + InterruptedException { + waitForServiceToBeInState(client, exampleApp, desiredState, 200000); + } + + /** + * Wait until service is started. It does not have to reach a stable state. + * + * @param client + * @param exampleApp + * @throws TimeoutException + * @throws InterruptedException + */ + protected void waitForServiceToBeInState(ServiceClient client, + Service exampleApp, ServiceState desiredState, int waitForMillis) throws + TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> { + try { + Service retrievedApp = client.getStatus(exampleApp.getName()); + System.out.println(retrievedApp); + return retrievedApp.getState() == desiredState; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + }, 2000, waitForMillis); + } + + private int countTotalContainers(Service service) { + int totalContainers = 0; + for (Component component : service.getComponents()) { + totalContainers += component.getNumberOfContainers(); + } + return totalContainers; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff583d3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java index ae209b9..8b13b24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/test/java/org/apache/hadoop/yarn/service/TestYarnNativeServices.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.service; -import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.Path; @@ -36,7 +35,6 @@ import org.apache.hadoop.yarn.service.api.records.Component; import org.apache.hadoop.yarn.service.api.records.ComponentState; import org.apache.hadoop.yarn.service.api.records.Configuration; import org.apache.hadoop.yarn.service.api.records.Container; -import org.apache.hadoop.yarn.service.api.records.ContainerState; import org.apache.hadoop.yarn.service.api.records.PlacementConstraint; import org.apache.hadoop.yarn.service.api.records.PlacementPolicy; import org.apache.hadoop.yarn.service.api.records.PlacementScope; @@ -806,131 +804,4 @@ public class TestYarnNativeServices extends ServiceTestUtils { i++; } } - - /** - * Wait until all the containers for all components become ready state. - * - * @param client - * @param exampleApp - * @return all ready containers of a service. - * @throws TimeoutException - * @throws InterruptedException - */ - private Multimap<String, String> waitForAllCompToBeReady(ServiceClient client, - Service exampleApp) throws TimeoutException, InterruptedException { - int expectedTotalContainers = countTotalContainers(exampleApp); - - Multimap<String, String> allContainers = HashMultimap.create(); - - GenericTestUtils.waitFor(() -> { - try { - Service retrievedApp = client.getStatus(exampleApp.getName()); - int totalReadyContainers = 0; - allContainers.clear(); - LOG.info("Num Components " + retrievedApp.getComponents().size()); - for (Component component : retrievedApp.getComponents()) { - LOG.info("looking for " + component.getName()); - LOG.info(component.toString()); - if (component.getContainers() != null) { - if (component.getContainers().size() == exampleApp - .getComponent(component.getName()).getNumberOfContainers()) { - for (Container container : component.getContainers()) { - LOG.info( - "Container state " + container.getState() + ", component " - + component.getName()); - if (container.getState() == ContainerState.READY) { - totalReadyContainers++; - allContainers.put(component.getName(), container.getId()); - LOG.info("Found 1 ready container " + container.getId()); - } - } - } else { - LOG.info(component.getName() + " Expected number of containers " - + exampleApp.getComponent(component.getName()) - .getNumberOfContainers() + ", current = " + component - .getContainers()); - } - } - } - LOG.info("Exit loop, totalReadyContainers= " + totalReadyContainers - + " expected = " + expectedTotalContainers); - return totalReadyContainers == expectedTotalContainers; - } catch (Exception e) { - e.printStackTrace(); - return false; - } - }, 2000, 200000); - return allContainers; - } - - /** - * Wait until service state becomes stable. A service is stable when all - * requested containers of all components are running and in ready state. - * - * @param client - * @param exampleApp - * @throws TimeoutException - * @throws InterruptedException - */ - private void waitForServiceToBeStable(ServiceClient client, - Service exampleApp) throws TimeoutException, InterruptedException { - waitForServiceToBeStable(client, exampleApp, 200000); - } - - private void waitForServiceToBeStable(ServiceClient client, - Service exampleApp, int waitForMillis) - throws TimeoutException, InterruptedException { - waitForServiceToBeInState(client, exampleApp, ServiceState.STABLE, - waitForMillis); - } - - /** - * Wait until service is started. It does not have to reach a stable state. - * - * @param client - * @param exampleApp - * @throws TimeoutException - * @throws InterruptedException - */ - private void waitForServiceToBeStarted(ServiceClient client, - Service exampleApp) throws TimeoutException, InterruptedException { - waitForServiceToBeInState(client, exampleApp, ServiceState.STARTED); - } - - private void waitForServiceToBeInState(ServiceClient client, - Service exampleApp, ServiceState desiredState) throws TimeoutException, - InterruptedException { - waitForServiceToBeInState(client, exampleApp, desiredState, 200000); - } - - /** - * Wait until service is started. It does not have to reach a stable state. - * - * @param client - * @param exampleApp - * @throws TimeoutException - * @throws InterruptedException - */ - private void waitForServiceToBeInState(ServiceClient client, - Service exampleApp, ServiceState desiredState, int waitForMillis) throws - TimeoutException, InterruptedException { - GenericTestUtils.waitFor(() -> { - try { - Service retrievedApp = client.getStatus(exampleApp.getName()); - System.out.println(retrievedApp); - return retrievedApp.getState() == desiredState; - } catch (Exception e) { - e.printStackTrace(); - return false; - } - }, 2000, waitForMillis); - } - - private int countTotalContainers(Service service) { - int totalContainers = 0; - for (Component component : service.getComponents()) { - totalContainers += component.getNumberOfContainers(); - } - return totalContainers; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff583d3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java deleted file mode 100644 index 91f899c..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java +++ /dev/null @@ -1,273 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.client.api; - -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Client for managing applications. - */ -@Public -@Unstable -public abstract class AppAdminClient extends CompositeService { - public static final String YARN_APP_ADMIN_CLIENT_PREFIX = "yarn" + - ".application.admin.client.class."; - public static final String DEFAULT_TYPE = "yarn-service"; - public static final String DEFAULT_CLASS_NAME = "org.apache.hadoop.yarn" + - ".service.client.ApiServiceClient"; - public static final String UNIT_TEST_TYPE = "unit-test"; - public static final String UNIT_TEST_CLASS_NAME = "org.apache.hadoop.yarn" + - ".service.client.ServiceClient"; - - @Private - protected AppAdminClient() { - super(AppAdminClient.class.getName()); - } - - /** - * <p> - * Create a new instance of AppAdminClient. - * </p> - * - * @param appType application type - * @param conf configuration - * @return app admin client - */ - @Public - @Unstable - public static AppAdminClient createAppAdminClient(String appType, - Configuration conf) { - Map<String, String> clientClassMap = - conf.getPropsWithPrefix(YARN_APP_ADMIN_CLIENT_PREFIX); - if (!clientClassMap.containsKey(DEFAULT_TYPE)) { - clientClassMap.put(DEFAULT_TYPE, DEFAULT_CLASS_NAME); - } - if (!clientClassMap.containsKey(UNIT_TEST_TYPE)) { - clientClassMap.put(UNIT_TEST_TYPE, UNIT_TEST_CLASS_NAME); - } - if (!clientClassMap.containsKey(appType)) { - throw new IllegalArgumentException("App admin client class name not " + - "specified for type " + appType); - } - String clientClassName = clientClassMap.get(appType); - Class<? extends AppAdminClient> clientClass; - try { - clientClass = (Class<? extends AppAdminClient>) Class.forName( - clientClassName); - } catch (ClassNotFoundException e) { - throw new YarnRuntimeException("Invalid app admin client class", e); - } - - AppAdminClient appAdminClient = ReflectionUtils.newInstance(clientClass, - conf); - appAdminClient.init(conf); - appAdminClient.start(); - return appAdminClient; - } - - /** - * <p> - * Launch a new YARN application. - * </p> - * - * @param fileName specification of application - * @param appName name of the application - * @param lifetime lifetime of the application - * @param queue queue of the application - * @return exit code - * @throws IOException IOException - * @throws YarnException exception in client or server - */ - @Public - @Unstable - public abstract int actionLaunch(String fileName, String appName, Long - lifetime, String queue) throws IOException, YarnException; - - /** - * <p> - * Stop a YARN application (attempt to stop gracefully before killing the - * application). In the case of a long-running service, the service may be - * restarted later. - * </p> - * - * @param appName the name of the application - * @return exit code - * @throws IOException IOException - * @throws YarnException exception in client or server - */ - @Public - @Unstable - public abstract int actionStop(String appName) throws IOException, - YarnException; - - /** - * <p> - * Start a YARN application from a previously saved specification. In the - * case of a long-running service, the service must have been previously - * launched/started and then stopped, or previously saved but not started. - * </p> - * - * @param appName the name of the application - * @return exit code - * @throws IOException IOException - * @throws YarnException exception in client or server - */ - @Public - @Unstable - public abstract int actionStart(String appName) throws IOException, - YarnException; - - /** - * <p> - * Save the specification for a YARN application / long-running service. - * The application may be started later. - * </p> - * - * @param fileName specification of application to save - * @param appName name of the application - * @param lifetime lifetime of the application - * @param queue queue of the application - * @return exit code - * @throws IOException IOException - * @throws YarnException exception in client or server - */ - @Public - @Unstable - public abstract int actionSave(String fileName, String appName, Long - lifetime, String queue) throws IOException, YarnException; - - /** - * <p> - * Remove the specification and all application data for a YARN application. - * The application cannot be running. - * </p> - * - * @param appName the name of the application - * @return exit code - * @throws IOException IOException - * @throws YarnException exception in client or server - */ - @Public - @Unstable - public abstract int actionDestroy(String appName) throws IOException, - YarnException; - - /** - * <p> - * Change the number of running containers for a component of a YARN - * application / long-running service. - * </p> - * - * @param appName the name of the application - * @param componentCounts map of component name to new component count or - * amount to change existing component count (e.g. - * 5, +5, -5) - * @return exit code - * @throws IOException IOException - * @throws YarnException exception in client or server - */ - @Public - @Unstable - public abstract int actionFlex(String appName, Map<String, String> - componentCounts) throws IOException, YarnException; - - /** - * <p> - * Upload AM dependencies to HDFS. This makes future application launches - * faster since the dependencies do not have to be uploaded on each launch. - * </p> - * - * @param destinationFolder - * an optional HDFS folder where dependency tarball will be uploaded - * @return exit code - * @throws IOException - * IOException - * @throws YarnException - * exception in client or server - */ - @Public - @Unstable - public abstract int enableFastLaunch(String destinationFolder) - throws IOException, YarnException; - - /** - * <p> - * Get detailed app specific status string for a YARN application. - * </p> - * - * @param appIdOrName appId or appName - * @return status string - * @throws IOException IOException - * @throws YarnException exception in client or server - */ - @Public - @Unstable - public abstract String getStatusString(String appIdOrName) throws - IOException, YarnException; - - /** - * Initiate upgrade of a long running service. - * - * @param appName the name of the application. - * @param fileName specification of application upgrade to save. - * @param autoFinalize when true, finalization of upgrade will be done - * automatically. - * @return exit code - * @throws IOException IOException - * @throws YarnException exception in client or server - */ - @Public - @Unstable - public abstract int initiateUpgrade(String appName, String fileName, - boolean autoFinalize) throws IOException, YarnException; - - /** - * Upgrade component instances of a long running service. - * - * @param appName the name of the application. - * @param componentInstances the name of the component instances. - */ - @Public - @Unstable - public abstract int actionUpgradeInstances(String appName, - List<String> componentInstances) throws IOException, YarnException; - - - /** - * Upgrade components of a long running service. - * - * @param appName the name of the application. - * @param components the name of the components. - */ - @Public - @Unstable - public abstract int actionUpgradeComponents(String appName, - List<String> components) throws IOException, YarnException; - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff583d3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java new file mode 100644 index 0000000..3cd1a78 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/AppAdminClient.java @@ -0,0 +1,285 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.client.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Client for managing applications. + */ +@Public +@Unstable +public abstract class AppAdminClient extends CompositeService { + public static final String YARN_APP_ADMIN_CLIENT_PREFIX = "yarn" + + ".application.admin.client.class."; + public static final String DEFAULT_TYPE = "yarn-service"; + public static final String DEFAULT_CLASS_NAME = "org.apache.hadoop.yarn" + + ".service.client.ApiServiceClient"; + public static final String UNIT_TEST_TYPE = "unit-test"; + public static final String UNIT_TEST_CLASS_NAME = "org.apache.hadoop.yarn" + + ".service.client.ServiceClient"; + + @Private + protected AppAdminClient() { + super(AppAdminClient.class.getName()); + } + + /** + * <p> + * Create a new instance of AppAdminClient. + * </p> + * + * @param appType application type + * @param conf configuration + * @return app admin client + */ + @Public + @Unstable + public static AppAdminClient createAppAdminClient(String appType, + Configuration conf) { + Map<String, String> clientClassMap = + conf.getPropsWithPrefix(YARN_APP_ADMIN_CLIENT_PREFIX); + if (!clientClassMap.containsKey(DEFAULT_TYPE)) { + clientClassMap.put(DEFAULT_TYPE, DEFAULT_CLASS_NAME); + } + if (!clientClassMap.containsKey(UNIT_TEST_TYPE)) { + clientClassMap.put(UNIT_TEST_TYPE, UNIT_TEST_CLASS_NAME); + } + if (!clientClassMap.containsKey(appType)) { + throw new IllegalArgumentException("App admin client class name not " + + "specified for type " + appType); + } + String clientClassName = clientClassMap.get(appType); + Class<? extends AppAdminClient> clientClass; + try { + clientClass = (Class<? extends AppAdminClient>) Class.forName( + clientClassName); + } catch (ClassNotFoundException e) { + throw new YarnRuntimeException("Invalid app admin client class", e); + } + + AppAdminClient appAdminClient = ReflectionUtils.newInstance(clientClass, + conf); + appAdminClient.init(conf); + appAdminClient.start(); + return appAdminClient; + } + + /** + * <p> + * Launch a new YARN application. + * </p> + * + * @param fileName specification of application + * @param appName name of the application + * @param lifetime lifetime of the application + * @param queue queue of the application + * @return exit code + * @throws IOException IOException + * @throws YarnException exception in client or server + */ + @Public + @Unstable + public abstract int actionLaunch(String fileName, String appName, Long + lifetime, String queue) throws IOException, YarnException; + + /** + * <p> + * Stop a YARN application (attempt to stop gracefully before killing the + * application). In the case of a long-running service, the service may be + * restarted later. + * </p> + * + * @param appName the name of the application + * @return exit code + * @throws IOException IOException + * @throws YarnException exception in client or server + */ + @Public + @Unstable + public abstract int actionStop(String appName) throws IOException, + YarnException; + + /** + * <p> + * Start a YARN application from a previously saved specification. In the + * case of a long-running service, the service must have been previously + * launched/started and then stopped, or previously saved but not started. + * </p> + * + * @param appName the name of the application + * @return exit code + * @throws IOException IOException + * @throws YarnException exception in client or server + */ + @Public + @Unstable + public abstract int actionStart(String appName) throws IOException, + YarnException; + + /** + * <p> + * Save the specification for a YARN application / long-running service. + * The application may be started later. + * </p> + * + * @param fileName specification of application to save + * @param appName name of the application + * @param lifetime lifetime of the application + * @param queue queue of the application + * @return exit code + * @throws IOException IOException + * @throws YarnException exception in client or server + */ + @Public + @Unstable + public abstract int actionSave(String fileName, String appName, Long + lifetime, String queue) throws IOException, YarnException; + + /** + * <p> + * Remove the specification and all application data for a YARN application. + * The application cannot be running. + * </p> + * + * @param appName the name of the application + * @return exit code + * @throws IOException IOException + * @throws YarnException exception in client or server + */ + @Public + @Unstable + public abstract int actionDestroy(String appName) throws IOException, + YarnException; + + /** + * <p> + * Change the number of running containers for a component of a YARN + * application / long-running service. + * </p> + * + * @param appName the name of the application + * @param componentCounts map of component name to new component count or + * amount to change existing component count (e.g. + * 5, +5, -5) + * @return exit code + * @throws IOException IOException + * @throws YarnException exception in client or server + */ + @Public + @Unstable + public abstract int actionFlex(String appName, Map<String, String> + componentCounts) throws IOException, YarnException; + + /** + * <p> + * Upload AM dependencies to HDFS. This makes future application launches + * faster since the dependencies do not have to be uploaded on each launch. + * </p> + * + * @param destinationFolder + * an optional HDFS folder where dependency tarball will be uploaded + * @return exit code + * @throws IOException + * IOException + * @throws YarnException + * exception in client or server + */ + @Public + @Unstable + public abstract int enableFastLaunch(String destinationFolder) + throws IOException, YarnException; + + /** + * <p> + * Get detailed app specific status string for a YARN application. + * </p> + * + * @param appIdOrName appId or appName + * @return status string + * @throws IOException IOException + * @throws YarnException exception in client or server + */ + @Public + @Unstable + public abstract String getStatusString(String appIdOrName) throws + IOException, YarnException; + + /** + * Initiate upgrade of a long running service. + * + * @param appName the name of the application. + * @param fileName specification of application upgrade to save. + * @param autoFinalize when true, finalization of upgrade will be done + * automatically. + * @return exit code + * @throws IOException IOException + * @throws YarnException exception in client or server + */ + @Public + @Unstable + public abstract int initiateUpgrade(String appName, String fileName, + boolean autoFinalize) throws IOException, YarnException; + + /** + * Upgrade component instances of a long running service. + * + * @param appName the name of the application. + * @param componentInstances the name of the component instances. + */ + @Public + @Unstable + public abstract int actionUpgradeInstances(String appName, + List<String> componentInstances) throws IOException, YarnException; + + + /** + * Upgrade components of a long running service. + * + * @param appName the name of the application. + * @param components the name of the components. + */ + @Public + @Unstable + public abstract int actionUpgradeComponents(String appName, + List<String> components) throws IOException, YarnException; + + /** + * Operation to be performed by the RM after an application has completed. + * + * @param appName the name of the application. + * @param userName the name of the user. + * @return exit code + */ + @Public + @Unstable + public abstract int actionCleanUp(String appName, String userName) throws + IOException, YarnException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff583d3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java index 4ef7b8d..fcfc5bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java @@ -296,6 +296,16 @@ public class RegistryUtils { */ public static String currentUser() { String shortUserName = currentUsernameUnencoded(); + return registryUser(shortUserName); + } + + /** + * Convert the given user name formatted for the registry. + * + * @param shortUserName + * @return converted user name + */ + public static String registryUser(String shortUserName) { String encodedName = encodeForRegistry(shortUserName); // DNS name doesn't allow "_", replace it with "-" encodedName = RegistryUtils.convertUsername(encodedName); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ff583d3f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index 6aee813..7319156 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.AppAdminClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; @@ -1470,6 +1471,33 @@ public class RMAppImpl implements RMApp, Recoverable { }; } + /** + * Attempt to perform a type-specific cleanup after application has completed. + * + * @param app application to clean up + */ + static void appAdminClientCleanUp(RMAppImpl app) { + try { + AppAdminClient client = AppAdminClient.createAppAdminClient(app + .applicationType, app.conf); + int result = client.actionCleanUp(app.name, app.user); + if (result == 0) { + LOG.info("Type-specific cleanup of application " + app.applicationId + + " of type " + app.applicationType + " succeeded"); + } else { + LOG.warn("Type-specific cleanup of application " + app.applicationId + + " of type " + app.applicationType + " did not succeed with exit" + + " code " + result); + } + } catch (IllegalArgumentException e) { + // no AppAdminClient class has been specified for the application type, + // so this does not need to be logged + } catch (Exception e) { + LOG.warn("Could not run type-specific cleanup on application " + + app.applicationId + " of type " + app.applicationType, e); + } + } + private static class FinalTransition extends RMAppTransition { private final RMAppState finalState; @@ -1504,6 +1532,8 @@ public class RMAppImpl implements RMApp, Recoverable { .appFinished(app, finalState, app.finishTime); // set the memory free app.clearUnusedFields(); + + appAdminClientCleanUp(app); }; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org