Repository: incubator-twill Updated Branches: refs/heads/feature/TWILL-131 [created] 1f831c299
WIP Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/1f831c29 Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/1f831c29 Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/1f831c29 Branch: refs/heads/feature/TWILL-131 Commit: 1f831c2993bb16dd695baca5bee3436bfee4ed10 Parents: f6d2b6c Author: Terence Yim <[email protected]> Authored: Wed Oct 14 13:39:32 2015 -0700 Committer: Terence Yim <[email protected]> Committed: Wed Oct 14 13:39:32 2015 -0700 ---------------------------------------------------------------------- .../org/apache/twill/internal/Constants.java | 83 +++++++++++++++++++ .../twill/internal/AbstractTwillService.java | 2 +- .../internal/AbstractZKServiceController.java | 3 +- .../org/apache/twill/internal/Constants.java | 76 ------------------ .../twill/discovery/ZKDiscoveryService.java | 4 +- .../org/apache/twill/internal/ServiceMain.java | 19 +++-- .../appmaster/ApplicationMasterMain.java | 84 +++++++++++++++++++- .../appmaster/ApplicationMasterService.java | 6 +- .../internal/container/TwillContainerMain.java | 5 +- .../apache/twill/yarn/YarnTwillPreparer.java | 8 +- .../twill/yarn/YarnTwillRunnerService.java | 4 +- 11 files changed, 190 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-common/src/main/java/org/apache/twill/internal/Constants.java ---------------------------------------------------------------------- diff --git a/twill-common/src/main/java/org/apache/twill/internal/Constants.java b/twill-common/src/main/java/org/apache/twill/internal/Constants.java new file mode 100644 index 0000000..dd04eb1 --- /dev/null +++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal; + +/** + * This class contains collection of common constants used in Twill. + */ +public final class Constants { + + public static final String LOG_TOPIC = "log"; + + /** Maximum number of seconds for AM to start. */ + public static final int APPLICATION_MAX_START_SECONDS = 60; + /** Maximum number of seconds for AM to stop. */ + public static final int APPLICATION_MAX_STOP_SECONDS = 60; + + public static final long PROVISION_TIMEOUT = 30000; + + /** + * Milliseconds AM should wait for RM to allocate a constrained provision request. + * On timeout, AM relaxes the request constraints. + */ + public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000; + + public static final double HEAP_MIN_RATIO = 0.7d; + + /** Memory size of AM. */ + public static final int APP_MASTER_MEMORY_MB = 512; + + public static final int APP_MASTER_RESERVED_MEMORY_MB = 150; + + public static final String CLASSPATH = "classpath"; + public static final String APPLICATION_CLASSPATH = "application-classpath"; + + /** Command names for the restart runnable instances. */ + public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances"; + public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances"; + + /** + * Common ZK paths constants + */ + public static final String DISCOVERY_PATH_PREFIX = "/discoverable"; + public static final String INSTANCES_PATH_PREFIX = "/instances"; + + + /** + * Constants for names of internal files that are shared between client, AM and containers. + */ + public static final class Files { + + public static final String LAUNCHER_JAR = "launcher.jar"; + public static final String APP_MASTER_JAR = "appMaster.jar"; + public static final String CONTAINER_JAR = "container.jar"; + public static final String LOCALIZE_FILES = "localizeFiles.json"; + public static final String TWILL_SPEC = "twillSpec.json"; + public static final String ARGUMENTS = "arguments.json"; + public static final String ENVIRONMENTS = "environments.json"; + public static final String LOGBACK_TEMPLATE = "logback-template.xml"; + public static final String JVM_OPTIONS = "jvm.opts"; + public static final String CREDENTIALS = "credentials.store"; + + private Files() { + } + } + + private Constants() { + } +} http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java index 2f95e0e..8688d0b 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractTwillService.java @@ -359,7 +359,7 @@ public abstract class AbstractTwillService extends AbstractExecutionThreadServic } private String getLiveNodePath() { - return "/instances/" + runId.getId(); + return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, runId.getId()); } private <T> byte[] toJson(T obj) { http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java index 0cf92ea..9b30823 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java +++ b/twill-core/src/main/java/org/apache/twill/internal/AbstractZKServiceController.java @@ -21,7 +21,6 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; - import org.apache.twill.api.Command; import org.apache.twill.api.RunId; import org.apache.twill.api.ServiceController; @@ -239,7 +238,7 @@ public abstract class AbstractZKServiceController extends AbstractExecutionServi * Returns the zookeeper node path for the ephemeral instance node for this runId. */ protected final String getInstancePath() { - return String.format("/instances/%s", getRunId().getId()); + return String.format("%s/%s", Constants.INSTANCES_PATH_PREFIX, getRunId().getId()); } private String getZKPath(String path) { http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-core/src/main/java/org/apache/twill/internal/Constants.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java deleted file mode 100644 index 39de851..0000000 --- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.twill.internal; - -/** - * This class contains collection of common constants used in Twill. - */ -public final class Constants { - - public static final String LOG_TOPIC = "log"; - - /** Maximum number of seconds for AM to start. */ - public static final int APPLICATION_MAX_START_SECONDS = 60; - /** Maximum number of seconds for AM to stop. */ - public static final int APPLICATION_MAX_STOP_SECONDS = 60; - - public static final long PROVISION_TIMEOUT = 30000; - - /** - * Milliseconds AM should wait for RM to allocate a constrained provision request. - * On timeout, AM relaxes the request constraints. - */ - public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000; - - public static final double HEAP_MIN_RATIO = 0.7d; - - /** Memory size of AM. */ - public static final int APP_MASTER_MEMORY_MB = 512; - - public static final int APP_MASTER_RESERVED_MEMORY_MB = 150; - - public static final String CLASSPATH = "classpath"; - public static final String APPLICATION_CLASSPATH = "application-classpath"; - - /** Command names for the restart runnable instances. */ - public static final String RESTART_ALL_RUNNABLE_INSTANCES = "restartAllRunnableInstances"; - public static final String RESTART_RUNNABLES_INSTANCES = "restartRunnablesInstances"; - - /** - * Constants for names of internal files that are shared between client, AM and containers. - */ - public static final class Files { - - public static final String LAUNCHER_JAR = "launcher.jar"; - public static final String APP_MASTER_JAR = "appMaster.jar"; - public static final String CONTAINER_JAR = "container.jar"; - public static final String LOCALIZE_FILES = "localizeFiles.json"; - public static final String TWILL_SPEC = "twillSpec.json"; - public static final String ARGUMENTS = "arguments.json"; - public static final String ENVIRONMENTS = "environments.json"; - public static final String LOGBACK_TEMPLATE = "logback-template.xml"; - public static final String JVM_OPTIONS = "jvm.opts"; - public static final String CREDENTIALS = "credentials.store"; - - private Files() { - } - } - - private Constants() { - } -} http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java ---------------------------------------------------------------------- diff --git a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java index 3f0db34..c563bab 100644 --- a/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java +++ b/twill-discovery-core/src/main/java/org/apache/twill/discovery/ZKDiscoveryService.java @@ -31,6 +31,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import org.apache.twill.common.Cancellable; import org.apache.twill.common.Threads; +import org.apache.twill.internal.Constants; import org.apache.twill.zookeeper.NodeChildren; import org.apache.twill.zookeeper.NodeData; import org.apache.twill.zookeeper.OperationFuture; @@ -94,7 +95,6 @@ import java.util.concurrent.locks.ReentrantLock; */ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceClient { private static final Logger LOG = LoggerFactory.getLogger(ZKDiscoveryService.class); - private static final String NAMESPACE = "/discoverable"; private static final long RETRY_MILLIS = 1000; @@ -112,7 +112,7 @@ public class ZKDiscoveryService implements DiscoveryService, DiscoveryServiceCli * @param zkClient The {@link ZKClient} for interacting with zookeeper. */ public ZKDiscoveryService(ZKClient zkClient) { - this(zkClient, NAMESPACE); + this(zkClient, Constants.DISCOVERY_PATH_PREFIX); } /** http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java index a6af3d3..cafd375 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java @@ -157,12 +157,16 @@ public abstract class ServiceMain { /** * Creates a {@link ZKClientService}. */ - protected static ZKClientService createZKClient(String zkConnectStr) { + protected static ZKClientService createZKClient(String zkConnectStr, String appName) { return ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure( - ZKClientService.Builder.of(zkConnectStr).build(), - RetryStrategies.fixDelay(1, TimeUnit.SECONDS)))); + ZKClients.namespace( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure( + ZKClientService.Builder.of(zkConnectStr).build(), + RetryStrategies.fixDelay(1, TimeUnit.SECONDS) + ) + ), "/" + appName + )); } private void configureLogger() { @@ -256,10 +260,11 @@ public abstract class ServiceMain { /** * A simple service for creating/remove ZK paths needed for {@link AbstractTwillService}. */ - protected static final class TwillZKPathService extends AbstractIdleService { + protected static class TwillZKPathService extends AbstractIdleService { + + protected static final long TIMEOUT_SECONDS = 5L; private static final Logger LOG = LoggerFactory.getLogger(TwillZKPathService.class); - private static final long TIMEOUT_SECONDS = 5L; private final ZKClient zkClient; private final String path; http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java index 3e8cb93..f373947 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java @@ -19,6 +19,7 @@ package org.apache.twill.internal.appmaster; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AbstractIdleService; +import com.google.common.util.concurrent.Futures; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -32,6 +33,7 @@ import org.apache.twill.internal.logging.Loggings; import org.apache.twill.internal.utils.Networks; import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory; import org.apache.twill.internal.yarn.YarnAMClient; +import org.apache.twill.zookeeper.OperationFuture; import org.apache.twill.zookeeper.ZKClient; import org.apache.twill.zookeeper.ZKClientService; import org.apache.twill.zookeeper.ZKOperations; @@ -43,7 +45,10 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** @@ -65,7 +70,7 @@ public final class ApplicationMasterMain extends ServiceMain { File twillSpec = new File(Constants.Files.TWILL_SPEC); RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID)); - ZKClientService zkClientService = createZKClient(zkConnect); + ZKClientService zkClientService = createZKClient(zkConnect, System.getenv(EnvKeys.TWILL_APP_NAME)); Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration())); setRMSchedulerAddress(conf); @@ -74,12 +79,12 @@ public final class ApplicationMasterMain extends ServiceMain { twillSpec, amClient, createAppLocation(conf)); TrackerService trackerService = new TrackerService(service); - new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnect, runId.getId())) + new ApplicationMasterMain(service.getKafkaZKConnect()) .doMain( service, new YarnAMClientService(amClient, trackerService), zkClientService, - new TwillZKPathService(zkClientService, runId), + new AppMasterTwillZKPathService(zkClientService, runId), new ApplicationKafkaService(zkClientService, runId) ); } @@ -229,4 +234,77 @@ public final class ApplicationMasterMain extends ServiceMain { } } } + + private static final class AppMasterTwillZKPathService extends TwillZKPathService { + + private static final Logger LOG = LoggerFactory.getLogger(AppMasterTwillZKPathService.class); + private final ZKClient zkClient; + + public AppMasterTwillZKPathService(ZKClient zkClient, RunId runId) { + super(zkClient, runId); + this.zkClient = zkClient; + } + + @Override + protected void shutDown() throws Exception { + super.shutDown(); + + // Try to delete the /instances path. It may throws NotEmptyException if there are other instances of the + // same app running, which can safely ignore and return. + if (!delete(Constants.INSTANCES_PATH_PREFIX)) { + return; + } + + // Try to delete children under /discovery. It may fail with NotEmptyException if there are other instances + // of the same app running that has discovery services running. + List<String> children = zkClient.getChildren(Constants.DISCOVERY_PATH_PREFIX) + .get(TIMEOUT_SECONDS, TimeUnit.SECONDS).getChildren(); + List<OperationFuture<?>> deleteFutures = new ArrayList<>(); + for (String child : children) { + String path = Constants.DISCOVERY_PATH_PREFIX + "/" + child; + LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path); + deleteFutures.add(zkClient.delete(path)); + } + Futures.successfulAsList(deleteFutures).get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + for (OperationFuture<?> future : deleteFutures) { + try { + future.get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof KeeperException.NotEmptyException) { + return; + } + throw e; + } + } + + // Delete the /discovery. It may fail with NotEmptyException (due to race between apps), + // which can safely ignore and return. + if (!delete(Constants.DISCOVERY_PATH_PREFIX)) { + return; + } + + // Delete the ZK path for the app namespace. + delete("/"); + } + + /** + * Deletes the given ZK path. + * + * @param path path to delete + * @return true if the path is delete, false if failed to delete due to {@link KeeperException.NotEmptyException}. + * @throws Exception if failed to delete + */ + private boolean delete(String path) throws Exception { + try { + LOG.info("Removing ZK path: {}{}", zkClient.getConnectString(), path); + zkClient.delete(path).get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + return true; + } catch (ExecutionException e) { + if (e.getCause() instanceof KeeperException.NotEmptyException) { + return false; + } + throw e; + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index e1523d6..c376de4 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -236,7 +236,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp // Creates ZK path for runnable zkClient.create("/" + runId.getId() + "/runnables", null, CreateMode.PERSISTENT).get(); - runningContainers.addWatcher("/discoverable"); + runningContainers.addWatcher(Constants.DISCOVERY_PATH_PREFIX); runnableContainerRequests = initContainerRequests(); } @@ -648,7 +648,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId()); env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName()); env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL)); - env.put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString()); + env.put(EnvKeys.TWILL_ZK_CONNECT, System.getenv(EnvKeys.TWILL_ZK_CONNECT)); env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect()); ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(), @@ -703,7 +703,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp return String.format("/%s/runnables/%s", runId.getId(), runnableName); } - private String getKafkaZKConnect() { + String getKafkaZKConnect() { return String.format("%s/%s/kafka", zkClient.getConnectString(), runId.getId()); } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java index 51837a7..3ea786a 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java @@ -18,7 +18,6 @@ package org.apache.twill.internal.container; import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.io.Files; import com.google.common.util.concurrent.AbstractService; @@ -28,9 +27,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.twill.api.LocalFile; import org.apache.twill.api.RunId; -import org.apache.twill.api.RuntimeSpecification; import org.apache.twill.api.TwillRunnableSpecification; import org.apache.twill.api.TwillSpecification; import org.apache.twill.discovery.ZKDiscoveryService; @@ -80,7 +77,7 @@ public final class TwillContainerMain extends ServiceMain { int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID)); int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT)); - ZKClientService zkClientService = createZKClient(zkConnectStr); + ZKClientService zkClientService = createZKClient(zkConnectStr, System.getenv(EnvKeys.TWILL_APP_NAME)); ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService); ZKClient appRunZkClient = getAppRunZKClient(zkClientService, appRunId); http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index d4edfeb..d04cdab 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -116,7 +116,7 @@ final class YarnTwillPreparer implements TwillPreparer { private final YarnConfiguration yarnConfig; private final TwillSpecification twillSpec; private final YarnAppClient yarnAppClient; - private final ZKClient zkClient; + private final String zkConnectString; private final LocationFactory locationFactory; private final YarnTwillControllerFactory controllerFactory; private final RunId runId; @@ -139,13 +139,13 @@ final class YarnTwillPreparer implements TwillPreparer { private LogEntry.Level logLevel; YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec, - YarnAppClient yarnAppClient, ZKClient zkClient, + YarnAppClient yarnAppClient, String zkConnectString, LocationFactory locationFactory, String extraOptions, LogEntry.Level logLevel, YarnTwillControllerFactory controllerFactory) { this.yarnConfig = yarnConfig; this.twillSpec = twillSpec; this.yarnAppClient = yarnAppClient; - this.zkClient = ZKClients.namespace(zkClient, "/" + twillSpec.getName()); + this.zkConnectString = zkConnectString; this.locationFactory = locationFactory; this.controllerFactory = controllerFactory; this.runId = RunIds.generate(); @@ -345,7 +345,7 @@ final class YarnTwillPreparer implements TwillPreparer { ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder() .put(EnvKeys.TWILL_FS_USER, fsUser) .put(EnvKeys.TWILL_APP_DIR, getAppLocation().toURI().toASCIIString()) - .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString()) + .put(EnvKeys.TWILL_ZK_CONNECT, zkConnectString) .put(EnvKeys.TWILL_RUN_ID, runId.getId()) .put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory)) .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName()) http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/1f831c29/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java index 8a13017..c5853d6 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java @@ -277,8 +277,8 @@ public final class YarnTwillRunnerService implements TwillRunnerService { final TwillSpecification twillSpec = application.configure(); final String appName = twillSpec.getName(); - return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService, locationFactory, jvmOptions, - LogEntry.Level.INFO, new YarnTwillControllerFactory() { + return new YarnTwillPreparer(yarnConfig, twillSpec, yarnAppClient, zkClientService.getConnectString(), + locationFactory, jvmOptions, LogEntry.Level.INFO, new YarnTwillControllerFactory() { @Override public YarnTwillController create(RunId runId, Iterable<LogHandler> logHandlers, Callable<ProcessController<YarnApplicationReport>> startUp) {
