(TWILL-148) Allow setting of env variables - Added methods to TwillPreparer for setting env for runnables
This closes #69 on github Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/e95c6a49 Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/e95c6a49 Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/e95c6a49 Branch: refs/heads/site Commit: e95c6a495e0faef7569adbeae2d768f57391b44f Parents: ef8b1ea Author: Terence Yim <[email protected]> Authored: Fri Oct 9 18:27:17 2015 -0700 Committer: Terence Yim <[email protected]> Committed: Fri Oct 9 19:34:02 2015 -0700 ---------------------------------------------------------------------- .../org/apache/twill/api/TwillPreparer.java | 20 ++++ .../org/apache/twill/internal/Constants.java | 1 + .../appmaster/ApplicationMasterService.java | 46 +++++--- .../apache/twill/yarn/YarnTwillPreparer.java | 55 ++++++++- .../apache/twill/yarn/EnvironmentTestRun.java | 111 +++++++++++++++++++ 5 files changed, 219 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java index f60080a..d7d5529 100644 --- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java +++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java @@ -21,6 +21,7 @@ import org.apache.twill.api.logging.LogEntry; import org.apache.twill.api.logging.LogHandler; import java.net.URI; +import java.util.Map; /** * This interface exposes methods to set up the Twill runtime environment and start a Twill application. @@ -179,6 +180,25 @@ public interface TwillPreparer { TwillPreparer withClassPaths(Iterable<String> classPaths); /** + * Adds the set of environment variables that will be set as container environment variables for all runnables. + * + * @param env set of environment variables + * @return This {@link TwillPreparer} + */ + TwillPreparer withEnv(Map<String, String> env); + + /** + * Adds the set of environment variables that will be set as container environment variables for the given runnable. + * Environment variables set through this method has higher precedence than the one set through {@link #withEnv(Map)} + * if there is a key clash. + * + * @param runnableName Name of the {@link TwillRunnable}. + * @param env set of environment variables + * @return This {@link TwillPreparer} + */ + TwillPreparer withEnv(String runnableName, Map<String, String> env); + + /** * Adds the set of paths to the classpath on the target machine for ApplicationMaster and all runnables. * @return This {@link TwillPreparer} */ http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-core/src/main/java/org/apache/twill/internal/Constants.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java index f897bfa..39de851 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java +++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java @@ -62,6 +62,7 @@ public final class Constants { public static final String LOCALIZE_FILES = "localizeFiles.json"; public static final String TWILL_SPEC = "twillSpec.json"; public static final String ARGUMENTS = "arguments.json"; + public static final String ENVIRONMENTS = "environments.json"; public static final String LOGBACK_TEMPLATE = "logback-template.xml"; public static final String JVM_OPTIONS = "jvm.opts"; public static final String CREDENTIALS = "credentials.store"; http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index 355cea3..e1523d6 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -25,7 +25,6 @@ import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.DiscreteDomains; import com.google.common.collect.HashMultiset; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -88,7 +87,9 @@ import java.io.FileReader; import java.io.IOException; import java.io.Reader; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Queue; @@ -122,6 +123,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp private final EventHandler eventHandler; private final Location applicationLocation; private final PlacementPolicyManager placementPolicyManager; + private final Map<String, Map<String, String>> environments; private volatile boolean stopped; private Queue<RunnableContainerRequest> runnableContainerRequests; @@ -140,6 +142,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp this.jvmOpts = loadJvmOptions(); this.reservedMemory = getReservedMemory(); this.placementPolicyManager = new PlacementPolicyManager(twillSpec.getPlacementPolicies()); + this.environments = getEnvironments(); this.amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)), Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)), @@ -634,18 +637,22 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp int containerCount = expectedContainers.getExpected(runnableName); - ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch( - ImmutableMap.<String, String>builder() - .put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR)) - .put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER)) - .put(EnvKeys.TWILL_APP_RUN_ID, runId.getId()) - .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName()) - .put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL)) - .put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString()) - .put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect()) - .build() - , getLocalizeFiles(), credentials - ); + // Setup container environment variables + Map<String, String> env = new LinkedHashMap<>(); + if (environments.containsKey(runnableName)) { + env.putAll(environments.get(runnableName)); + } + // Override with system env + env.put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR)); + env.put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER)); + env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId()); + env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName()); + env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL)); + env.put(EnvKeys.TWILL_ZK_CONNECT, zkClient.getConnectString()); + env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect()); + + ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(), + credentials); TwillContainerLauncher launcher = new TwillContainerLauncher( twillSpec.getRunnables().get(runnableName), processLauncher.getContainerInfo(), launchContext, @@ -679,6 +686,19 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp } } + private Map<String, Map<String, String>> getEnvironments() { + File envFile = new File(Constants.Files.ENVIRONMENTS); + if (!envFile.exists()) { + return new HashMap<>(); + } + + try (Reader reader = Files.newReader(envFile, Charsets.UTF_8)) { + return new Gson().fromJson(reader, new TypeToken<Map<String, Map<String, String>>>() { }.getType()); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + private String getZKNamespace(String runnableName) { return String.format("/%s/runnables/%s", runId.getId(), runnableName); } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index ea77116..6da2f8b 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -36,6 +36,7 @@ import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; import com.google.common.io.OutputSupplier; import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; import com.google.gson.GsonBuilder; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -52,7 +53,6 @@ import org.apache.twill.api.TwillPreparer; import org.apache.twill.api.TwillSpecification; import org.apache.twill.api.logging.LogEntry; import org.apache.twill.api.logging.LogHandler; -import org.apache.twill.filesystem.HDFSLocationFactory; import org.apache.twill.filesystem.Location; import org.apache.twill.filesystem.LocationFactory; import org.apache.twill.internal.ApplicationBundler; @@ -97,6 +97,8 @@ import java.io.Writer; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -125,6 +127,7 @@ final class YarnTwillPreparer implements TwillPreparer { private final List<URI> resources = Lists.newArrayList(); private final List<String> classPaths = Lists.newArrayList(); private final ListMultimap<String, String> runnableArgs = ArrayListMultimap.create(); + private final Map<String, Map<String, String>> environments = new HashMap<>(); private final List<String> applicationClassPaths = Lists.newArrayList(); private final Credentials credentials; private final int reservedMemory; @@ -214,6 +217,8 @@ final class YarnTwillPreparer implements TwillPreparer { @Override public TwillPreparer withArguments(String runnableName, Iterable<String> args) { + Preconditions.checkArgument(twillSpec.getRunnables().containsKey(runnableName), + "Runnable %s is not defined in the application.", runnableName); runnableArgs.putAll(runnableName, args); return this; } @@ -252,6 +257,23 @@ final class YarnTwillPreparer implements TwillPreparer { } @Override + public TwillPreparer withEnv(Map<String, String> env) { + // Add the given environments to all runnables + for (String runnableName : twillSpec.getRunnables().keySet()) { + setEnv(runnableName, env, false); + } + return this; + } + + @Override + public TwillPreparer withEnv(String runnableName, Map<String, String> env) { + Preconditions.checkArgument(twillSpec.getRunnables().containsKey(runnableName), + "Runnable %s is not defined in the application.", runnableName); + setEnv(runnableName, env, true); + return this; + } + + @Override public TwillPreparer withApplicationClassPaths(String... classPaths) { return withApplicationClassPaths(ImmutableList.copyOf(classPaths)); } @@ -306,6 +328,7 @@ final class YarnTwillPreparer implements TwillPreparer { saveLauncher(localFiles); saveJvmOptions(localFiles); saveArguments(new Arguments(arguments, runnableArgs), localFiles); + saveEnvironments(localFiles); saveLocalFiles(localFiles, ImmutableSet.of(Constants.Files.TWILL_SPEC, Constants.Files.LOGBACK_TEMPLATE, Constants.Files.CONTAINER_JAR, @@ -360,6 +383,21 @@ final class YarnTwillPreparer implements TwillPreparer { } } + private void setEnv(String runnableName, Map<String, String> env, boolean overwrite) { + Map<String, String> environment = environments.get(runnableName); + if (environment == null) { + environment = new LinkedHashMap<>(env); + environments.put(runnableName, environment); + return; + } + + for (Map.Entry<String, String> entry : env.entrySet()) { + if (overwrite || !environment.containsKey(entry.getKey())) { + environment.put(entry.getKey(), entry.getValue()); + } + } + } + private Credentials createCredentials() { Credentials credentials = new Credentials(); @@ -596,6 +634,21 @@ final class YarnTwillPreparer implements TwillPreparer { localFiles.put(Constants.Files.ARGUMENTS, createLocalFile(Constants.Files.ARGUMENTS, location)); } + private void saveEnvironments(Map<String, LocalFile> localFiles) throws IOException { + if (environments.isEmpty()) { + return; + } + + LOG.debug("Create and copy {}", Constants.Files.ENVIRONMENTS); + final Location location = createTempLocation(Constants.Files.ENVIRONMENTS); + try (Writer writer = new OutputStreamWriter(location.getOutputStream(), Charsets.UTF_8)) { + new Gson().toJson(environments, writer); + } + LOG.debug("Done {}", Constants.Files.ENVIRONMENTS); + + localFiles.put(Constants.Files.ENVIRONMENTS, createLocalFile(Constants.Files.ENVIRONMENTS, location)); + } + /** * Serializes the list of files that needs to localize from AM to Container. */ http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/e95c6a49/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java new file mode 100644 index 0000000..4309cb4 --- /dev/null +++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EnvironmentTestRun.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.yarn; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.LineReader; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.twill.api.TwillApplication; +import org.apache.twill.api.TwillController; +import org.apache.twill.api.TwillRunner; +import org.apache.twill.api.TwillSpecification; +import org.apache.twill.api.logging.PrinterLogHandler; +import org.apache.twill.common.Threads; +import org.apache.twill.discovery.Discoverable; +import org.apache.twill.discovery.ServiceDiscovered; +import org.junit.Assert; +import org.junit.Test; + +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.net.Socket; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Unit test for testing environment settings. + */ +public class EnvironmentTestRun extends BaseYarnTest { + + @Test + public void testEnv() throws Exception { + TwillRunner runner = getTwillRunner(); + + TwillController controller = runner.prepare(new EchoApp()) + .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) + .withApplicationArguments("echo") + .withArguments("echo1", "echo1") + .withArguments("echo2", "echo2") + .withEnv(ImmutableMap.of("GREETING", "Hello")) + .withEnv("echo2", ImmutableMap.of("GREETING", "Hello2")) + .start(); + + // Service echo1 should returns "Hello" as greeting, echo2 should returns "Hello2" + Map<String, String> runnableGreetings = ImmutableMap.of("echo1", "Hello", "echo2", "Hello2"); + for (Map.Entry<String, String> entry : runnableGreetings.entrySet()) { + Discoverable discoverable = getDiscoverable(controller.discoverService(entry.getKey()), 60, TimeUnit.SECONDS); + try ( + Socket socket = new Socket(discoverable.getSocketAddress().getAddress(), + discoverable.getSocketAddress().getPort()) + ) { + PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), Charsets.UTF_8), true); + LineReader reader = new LineReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8)); + + writer.println("GREETING"); + Assert.assertEquals(entry.getValue(), reader.readLine()); + } + } + + controller.terminate().get(); + } + + private Discoverable getDiscoverable(ServiceDiscovered serviceDiscovered, + long timeout, TimeUnit unit) throws Exception { + final SettableFuture<Discoverable> completion = SettableFuture.create(); + serviceDiscovered.watchChanges(new ServiceDiscovered.ChangeListener() { + @Override + public void onChange(ServiceDiscovered serviceDiscovered) { + Iterator<Discoverable> itor = serviceDiscovered.iterator(); + if (itor.hasNext()) { + completion.set(itor.next()); + } + } + }, Threads.SAME_THREAD_EXECUTOR); + return completion.get(timeout, unit); + } + + /** + * Application to add two {@link EnvironmentEchoServer} for testing. + */ + public static final class EchoApp implements TwillApplication { + + @Override + public TwillSpecification configure() { + return TwillSpecification.Builder.with() + .setName("EchoApp") + .withRunnable() + .add("echo1", new EnvironmentEchoServer()).noLocalFiles() + .add("echo2", new EnvironmentEchoServer()).noLocalFiles() + .anyOrder() + .build(); + } + } +}
