Repository: incubator-twill Updated Branches: refs/heads/master f6d2b6c42 -> e4a36762e
(TWILL-131) Remove ZK node when application finished. - Remove the application ZK node when the application terminates This closes #70 on Github Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/e4a36762 Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/e4a36762 Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/e4a36762 Branch: refs/heads/master Commit: e4a36762e8df34aa4971e29863714f199cb8ddcd Parents: f6d2b6c Author: Terence Yim <[email protected]> Authored: Wed Oct 14 13:39:32 2015 -0700 Committer: Terence Yim <[email protected]> Committed: Tue Oct 20 08:47:20 2015 -0700 ---------------------------------------------------------------------- .../org/apache/twill/internal/Constants.java | 83 ++++++++++++++++++ .../twill/internal/AbstractTwillService.java | 2 +- .../internal/AbstractZKServiceController.java | 3 +- .../org/apache/twill/internal/Constants.java | 76 ----------------- .../twill/discovery/ZKDiscoveryService.java | 4 +- .../twill/discovery/ZKDiscoveryServiceTest.java | 2 +- .../org/apache/twill/internal/ServiceMain.java | 19 +++-- .../appmaster/ApplicationMasterMain.java | 89 +++++++++++++++++++- .../appmaster/ApplicationMasterService.java | 6 +- .../internal/container/TwillContainerMain.java | 5 +- .../apache/twill/yarn/YarnTwillPreparer.java | 8 +- .../twill/yarn/YarnTwillRunnerService.java | 4 +- .../org/apache/twill/yarn/BaseYarnTest.java | 4 + .../apache/twill/yarn/EchoServerTestRun.java | 68 +++++++++++++-- .../java/org/apache/twill/yarn/TwillTester.java | 4 + 15 files changed, 266 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-common/src/main/java/org/apache/twill/internal/Constants.java ---------------------------------------------------------------------- diff --git a/twill-common/src/main/java/org/apache/twill/internal/Constants.java b/twill-common/src/main/java/org/apache/twill/internal/Constants.java new file mode 100644 index 0000000..dd04eb1 --- /dev/null +++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal; + +/** + * This class contains collection of common constants used in Twill. + */ +public final class Constants { + + public static final String LOG_TOPIC = "log"; + + /** Maximum number of seconds for AM to start. */ + public static final int APPLICATION_MAX_START_SECONDS = 60; + /** Maximum number of seconds for AM to stop. */ + public static final int APPLICATION_MAX_STOP_SECONDS = 60; + + public static final long PROVISION_TIMEOUT = 30000; + + /** + * Milliseconds AM should wait for RM to allocate a constrained provision request. + * On timeout, AM relaxes the request constraints. + */ + public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000; + + public static final double HEAP_MIN_RATIO = 0.7d; + + /** Memory size of AM. */ + public static final int APP_MASTER_MEMORY_MB = 512; + + public static final int APP_MASTER_RESERVED_MEMORY_MB = 150; + + public static final String CLASSPATH = "classpath"; + public static final String APPLICATION_CLASSPATH = "application-classpath"; + + /** Command names for the restart runnable instances. */ + public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances"; + public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances"; + + /** + * Common ZK paths constants + */ + public static final String DISCOVERY_PATH_PREFIX = "/discoverable"; + public static final String INSTANCES_PATH_PREFIX = "/instances"; + + + /** + * Constants for names of internal files that are shared between client, AM and containers. + */ + public static final class Files { + + public static final String LAUNCHER_JAR = "launcher.jar"; + public static final String APP_MASTER_JAR = "appMaster.jar"; + public static final String CONTAINER_JAR = "container.jar"; + public static final String LOCALIZE_FILES = "localizeFiles.json"; + public static final String TWILL_SPEC = "twillSpec.json"; + public static final String ARGUMENTS = "arguments.json"; + public static final String ENVIRONMENTS = "environments.json"; + public static final String LOGBACK_TEMPLATE = "logback-template.xml"; + public static final String JVM_OPTIONS = "jvm.opts"; + public static final String CREDENTIALS = "credentials.store"; + + private Files() { + } + } + + private Constants() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java index 2f95e0e..8688d0b 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java @@ -359,7 +359,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic } private String getLiveNodePath() { - return "/instances/" + runId.getId(); + return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, runId.getId()); } private <T> byte[] toJson(T obj) { http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java index 0cf92ea..9b30823 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java @@ -21,7 +21,6 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; - import org.apache.twill.api.Command; import org.apache.twill.api.RunId; import org.apache.twill.api.ServiceController; @@ -239,7 +238,7 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi * Returns the zookeeper node path for the ephemeral instance node for this runId. */ protected final String getInstancePath() { - return String.format("/instances/%s", getRunId().getId()); + return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, getRunId().getId()); } private String getZKPath(String path) { http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-core/src/main/java/org/apache/twill/internal/Constants.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java deleted file mode 100644 index 39de851..0000000 --- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.twill.internal; - -/** - * This class contains collection of common constants used in Twill. - */ -public final class Constants { - - public static final String LOG_TOPIC = "log"; - - /** Maximum number of seconds for AM to start. */ - public static final int APPLICATION_MAX_START_SECONDS = 60; - /** Maximum number of seconds for AM to stop. */ - public static final int APPLICATION_MAX_STOP_SECONDS = 60; - - public static final long PROVISION_TIMEOUT = 30000; - - /** - * Milliseconds AM should wait for RM to allocate a constrained provision request. - * On timeout, AM relaxes the request constraints. - */ - public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000; - - public static final double HEAP_MIN_RATIO = 0.7d; - - /** Memory size of AM. */ - public static final int APP_MASTER_MEMORY_MB = 512; - - public static final int APP_MASTER_RESERVED_MEMORY_MB = 150; - - public static final String CLASSPATH = "classpath"; - public static final String APPLICATION_CLASSPATH = "application-classpath"; - - /** Command names for the restart runnable instances. */ - public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances"; - public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances"; - - /** - * Constants for names of internal files that are shared between client, AM and containers. - */ - public static final class Files { - - public static final String LAUNCHER_JAR = "launcher.jar"; - public static final String APP_MASTER_JAR = "appMaster.jar"; - public static final String CONTAINER_JAR = "container.jar"; - public static final String LOCALIZE_FILES = "localizeFiles.json"; - public static final String TWILL_SPEC = "twillSpec.json"; - public static final String ARGUMENTS = "arguments.json"; - public static final String ENVIRONMENTS = "environments.json"; - public static final String LOGBACK_TEMPLATE = "logback-template.xml"; - public static final String JVM_OPTIONS = "jvm.opts"; - public static final String CREDENTIALS = "credentials.store"; - - private Files() { - } - } - - private Constants() { - } -} http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java ---------------------------------------------------------------------- diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java index 3f0db34..c563bab 100644 --- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java +++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java @@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.apache.twill.common.Cancellable; import org.apache.twill.common.Threads; +import org.apache.twill.internal.Constants; import org.apache.twill.zookeeper.NodeChildren; import org.apache.twill.zookeeper.NodeData; import org.apache.twill.zookeeper.OperationFuture; @@ -94,7 +95,6 @@ import java.util.concurrent.locks.ReentrantLock; */ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient { private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class); - private static final String NAMESPACE = "/discoverable"; private static final long RETRY_MILLIS = 1000; @@ -112,7 +112,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli * @param zkClient The {@link ZKClient} for interacting with zookeeper. */ public ZKDiscoveryService(ZKClient zkClient) { - this(zkClient, NAMESPACE); + this(zkClient, Constants.DISCOVERY_PATH_PREFIX); } /** http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java ---------------------------------------------------------------------- diff --git a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java index 7707c5b..7d6e369 100644 --- a/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java +++ b/twill-discovery-core/src/test/java/org/apache/twill/discovery/ZKDiscoveryServiceTest.java @@ -63,7 +63,7 @@ public class ZKDiscoveryServiceTest extends DiscoveryServiceTestBase { zkServer.stopAndWait(); } - @Test (timeout = 10000) + @Test (timeout = 30000) public void testDoubleRegister() throws Exception { Map.Entry<DiscoveryService, DiscoveryServiceClient> entry = create(); DiscoveryService discoveryService = entry.getKey(); http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java index a6af3d3..cafd375 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java @@ -157,12 +157,16 @@ public abstract class ServiceMain { /** * Creates a {@link ZKClientService}. */ - protected static ZKClientService createZKClient(String zkConnectStr) { + protected static ZKClientService createZKClient(String zkConnectStr, String appName) { return ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure( - ZKClientService.Builder.of(zkConnectStr).build(), - RetryStrategies.fixDelay(1, TimeUnit.SECONDS)))); + ZKClients.namespace( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + ZKClientService.Builder.of(zkConnectStr).build(), + RetryStrategies.fixDelay(1, TimeUnit.SECONDS) + ) + ), "/" + appName + )); } private void configureLogger() { @@ -256,10 +260,11 @@ public abstract class ServiceMain { /** * A simple service for creating/remove ZK paths needed for {@link AbstractTwillService}. */ - protected static final class TwillZKPathService extends AbstractIdleService { + protected static class TwillZKPathService extends AbstractIdleService { + + protected static final long TIMEOUT_SECONDS = 5L; private static final Logger LOG = LoggerFactory.getLogger(TwillZKPathService.class); - private static final long TIMEOUT_SECONDS = 5L; private final ZKClient zkClient; private final String path; http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java index 3e8cb93..38a2463 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java @@ -19,6 +19,7 @@ package org.apache.twill.internal.appmaster; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AbstractIdleService; +import com.google.common.util.concurrent.Futures; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -32,6 +33,7 @@ import org.apache.twill.internal.logging.Loggings; import org.apache.twill.internal.utils.Networks; import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory; import org.apache.twill.internal.yarn.YarnAMClient; +import org.apache.twill.zookeeper.OperationFuture; import org.apache.twill.zookeeper.ZKClient; import org.apache.twill.zookeeper.ZKClientService; import org.apache.twill.zookeeper.ZKOperations; @@ -43,7 +45,10 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** @@ -65,7 +70,7 @@ public final class ApplicationMasterMain extends ServiceMain { File twillSpec = new File(Constants.Files.TWILL_SPEC); RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID)); - ZKClientService zkClientService = createZKClient(zkConnect); + ZKClientService zkClientService = createZKClient(zkConnect, System.getenv(EnvKeys.TWILL_APP_NAME)); Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration())); setRMSchedulerAddress(conf); @@ -74,12 +79,12 @@ public final class ApplicationMasterMain extends ServiceMain { twillSpec, amClient, createAppLocation(conf)); TrackerService trackerService = new TrackerService(service); - new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId())) + new ApplicationMasterMain(service.getKafkaZKConnect()) .doMain( service, new YarnAMClientService(amClient, trackerService), zkClientService, - new TwillZKPathService(zkClientService, runId), + new AppMasterTwillZKPathService(zkClientService, runId), new ApplicationKafkaService(zkClientService, runId) ); } @@ -229,4 +234,82 @@ public final class ApplicationMasterMain extends ServiceMain { } } } + + private static final class AppMasterTwillZKPathService extends TwillZKPathService { + + private static final Logger LOG = LoggerFactory.getLogger(AppMasterTwillZKPathService.class); + private final ZKClient zkClient; + + public AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) { + super(zkClient, runId); + this.zkClient = zkClient; + } + + @Override + protected void shutDown() throws Exception { + super.shutDown(); + + // Deletes ZK nodes created for the application execution. + // We don't have to worry about a race condition if another instance of the same app starts at the same time + // as when removal is performed. This is because we always create nodes with "createParent == true", + // which takes care of the parent node recreation if it is removed from here. + + // Try to delete the /instances path. It may throws NotEmptyException if there are other instances of the + // same app running, which we can safely ignore and return. + if (!delete(Constants.INSTANCES_PATH_PREFIX)) { + return; + } + + // Try to delete children under /discovery. It may fail with NotEmptyException if there are other instances + // of the same app running that has discovery services running. + List<String> children = zkClient.getChildren(Constants.DISCOVERY_PATH_PREFIX) + .get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren(); + List<OperationFuture<?>> deleteFutures = new ArrayList<>(); + for (String child : children) { + String path = Constants.DISCOVERY_PATH_PREFIX + "/" + child; + LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path); + deleteFutures.add(zkClient.delete(path)); + } + Futures.successfulAsList(deleteFutures).get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + for (OperationFuture<?> future : deleteFutures) { + try { + future.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof KeeperException.NotEmptyException) { + return; + } + throw e; + } + } + + // Delete the /discovery. It may fail with NotEmptyException (due to race between apps), + // which can safely ignore and return. + if (!delete(Constants.DISCOVERY_PATH_PREFIX)) { + return; + } + + // Delete the ZK path for the app namespace. + delete("/"); + } + + /** + * Deletes the given ZK path. + * + * @param path path to delete + * @return true if the path was deleted, false if failed to delete due to {@link KeeperException.NotEmptyException}. + * @throws Exception if failed to delete the path + */ + private boolean delete(String path) throws Exception { + try { + LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path); + zkClient.delete(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + return true; + } catch (ExecutionException e) { + if (e.getCause() instanceof KeeperException.NotEmptyException) { + return false; + } + throw e; + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index e1523d6..c376de4 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -236,7 +236,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp // Creates ZK path for runnable zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get(); - runningContainers.addWatcher("/discoverable"); + runningContainers.addWatcher(Constants.DISCOVERY_PATH_PREFIX); runnableContainerRequests = initContainerRequests(); } @@ -648,7 +648,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId()); env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName()); env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL)); - env.put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString()); + env.put(EnvKeys.TWILL_ZK_CONNECT, System.getenv(EnvKeys.TWILL_ZK_CONNECT)); env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect()); ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(), @@ -703,7 +703,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp return String.format("/%s/runnables/%s", runId.getId(), runnableName); } - private String getKafkaZKConnect() { + String getKafkaZKConnect() { return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId()); } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java index 51837a7..3ea786a 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java @@ -18,7 +18,6 @@ package org.apache.twill.internal.container; import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.io.Files; import com.google.common.util.concurrent.AbstractService; @@ -28,9 +27,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.twill.api.LocalFile; import org.apache.twill.api.RunId; -import org.apache.twill.api.RuntimeSpecification; import org.apache.twill.api.TwillRunnableSpecification; import org.apache.twill.api.TwillSpecification; import org.apache.twill.discovery.ZKDiscoveryService; @@ -80,7 +77,7 @@ public final class TwillContainerMain extends ServiceMain { int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID)); int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT)); - ZKClientService zkClientService = createZKClient(zkConnectStr); + ZKClientService zkClientService = createZKClient(zkConnectStr, System.getenv(EnvKeys.TWILL_APP_NAME)); ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService); ZKClient appRunZkClient = getAppRunZKClient(zkClientService, appRunId); http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index d4edfeb..d04cdab 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -116,7 +116,7 @@ final class YarnTwillPreparer implements TwillPreparer { private final YarnConfiguration yarnConfig; private final TwillSpecification twillSpec; private final YarnAppClient yarnAppClient; - private final ZKClient zkClient; + private final String zkConnectString; private final LocationFactory locationFactory; private final YarnTwillControllerFactory controllerFactory; private final RunId runId; @@ -139,13 +139,13 @@ final class YarnTwillPreparer implements TwillPreparer { private LogEntry.Level logLevel; YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec, - YarnAppClient yarnAppClient, ZKClient zkClient, + YarnAppClient yarnAppClient, String zkConnectString, LocationFactory locationFactory, String extraOptions, LogEntry.Level logLevel, YarnTwillControllerFactory controllerFactory) { this.yarnConfig = yarnConfig; this.twillSpec = twillSpec; this.yarnAppClient = yarnAppClient; - this.zkClient = ZKClients.namespace(zkClient, "/" + twillSpec.getName()); + this.zkConnectString = zkConnectString; this.locationFactory = locationFactory; this.controllerFactory = controllerFactory; this.runId = RunIds.generate(); @@ -345,7 +345,7 @@ final class YarnTwillPreparer implements TwillPreparer { ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder() .put(EnvKeys.TWILL_FS_USER, fsUser) .put(EnvKeys.TWILL_APP_DIR, getAppLocation().toURI().toASCIIString()) - .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString()) + .put(EnvKeys.TWILL_ZK_CONNECT, zkConnectString) .put(EnvKeys.TWILL_RUN_ID, runId.getId()) .put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory)) .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName()) http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java index 8a13017..c5853d6 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java @@ -277,8 +277,8 @@ public final class YarnTwillRunnerService implements TwillRunnerService { final TwillSpecification twillSpec = application.configure(); final String appName = twillSpec.getName(); - return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService, locationFactory, jvmOptions, - LogEntry.Level.INFO, new YarnTwillControllerFactory() { + return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService.getConnectString(), + locationFactory, jvmOptions, LogEntry.Level.INFO, new YarnTwillControllerFactory() { @Override public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers, Callable<ProcessController<YarnApplicationReport>> startUp) { http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java index b5c7f58..a9cf2ed 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/BaseYarnTest.java @@ -109,4 +109,8 @@ public abstract class BaseYarnTest { public List<NodeReport> getNodeReports() throws Exception { return TWILL_TESTER.getNodeReports(); } + + public String getZKConnectionString() { + return TWILL_TESTER.getZKConnectionString(); + } } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java index 3f0f20c..13c07b1 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java @@ -31,23 +31,22 @@ import org.apache.twill.api.TwillRunnerService; import org.apache.twill.api.logging.PrinterLogHandler; import org.apache.twill.common.Threads; import org.apache.twill.discovery.Discoverable; +import org.apache.twill.zookeeper.ZKClientService; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.net.Socket; -import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; /** @@ -58,8 +57,7 @@ public final class EchoServerTestRun extends BaseYarnTest { private static final Logger LOG = LoggerFactory.getLogger(EchoServerTestRun.class); @Test - public void testEchoServer() throws InterruptedException, ExecutionException, IOException, - URISyntaxException, TimeoutException { + public void testEchoServer() throws Exception { TwillRunner runner = getTwillRunner(); TwillController controller = runner.prepare(new EchoServer(), @@ -158,6 +156,64 @@ public final class EchoServerTestRun extends BaseYarnTest { TimeUnit.SECONDS.sleep(2); } + @Test + public void testZKCleanup() throws Exception { + ZKClientService zkClient = ZKClientService.Builder.of(getZKConnectionString() + "/twill").build(); + zkClient.startAndWait(); + + try { + TwillRunner runner = getTwillRunner(); + + // Start an application and stop it. + TwillController controller = runner.prepare(new EchoServer()) + .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) + .withApplicationArguments("echo") + .withArguments("EchoServer", "echo2") + .start(); + + Iterable<Discoverable> echoServices = controller.discoverService("echo"); + Assert.assertTrue(waitForSize(echoServices, 1, 120)); + + controller.terminate().get(); + + // Verify the ZK node gets cleanup + Assert.assertNull(zkClient.exists("/EchoServer").get()); + + // Start two instances of the application and stop one of it + List<TwillController> controllers = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + controllers.add(runner.prepare(new EchoServer()) + .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) + .withApplicationArguments("echo") + .withArguments("EchoServer", "echo2") + .start()); + } + + // There should be two instances up and running. + echoServices = controller.discoverService("echo"); + Assert.assertTrue(waitForSize(echoServices, 2, 120)); + + // Stop one instance of the app + controllers.get(0).terminate().get(); + + // Verify the ZK node should still be there + Assert.assertNotNull(zkClient.exists("/EchoServer").get()); + + // We should still be able to do discovery, which depends on the ZK node. + echoServices = controller.discoverService("echo"); + Assert.assertTrue(waitForSize(echoServices, 1, 120)); + + // Stop second instance of the app + controllers.get(1).terminate().get(); + + // Verify the ZK node gets cleanup + Assert.assertNull(zkClient.exists("/EchoServer").get()); + + } finally { + zkClient.stopAndWait(); + } + } + /** * Need helper method here to wait for getting resource report because {@link TwillController#getResourceReport()} * could return null if the application has not fully started. http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e4a36762/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java index f669b83..e604cec 100644 --- a/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/TwillTester.java @@ -160,6 +160,10 @@ public class TwillTester extends ExternalResource { return yarnAppClient.getNodeReports(); } + public String getZKConnectionString() { + return zkServer.getConnectionStr(); + } + private void stopQuietly(Service service) { try { service.stopAndWait();
