(TWILL-117) support adding set of classes to the Application Master and runnables jar using the TwillPreparer API. the classpath is archived under "application-classpath" in the launcher jar.
This closes #49 on GitHub. Signed-off-by: Terence Yim <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/d181b7ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/d181b7ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/d181b7ce Branch: refs/heads/site Commit: d181b7cef40735520b1e198876fa3d60bb47fa40 Parents: ecaf51b Author: shankar <[email protected]> Authored: Wed Jun 17 11:36:25 2015 -0700 Committer: Terence Yim <[email protected]> Committed: Wed Jul 1 11:34:25 2015 -0700 ---------------------------------------------------------------------- .../org/apache/twill/api/ClassAcceptor.java | 39 ++++++++++ .../org/apache/twill/api/TwillPreparer.java | 20 +++++ .../twill/internal/ApplicationBundler.java | 80 +++++++++++--------- .../org/apache/twill/internal/Constants.java | 2 + .../twill/internal/utils/Dependencies.java | 18 +---- .../apache/twill/launcher/TwillLauncher.java | 34 +++++---- .../apache/twill/example/yarn/HelloWorld.java | 29 ++++++- .../apache/twill/yarn/YarnTwillPreparer.java | 40 ++++++++-- 8 files changed, 185 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d181b7ce/twill-api/src/main/java/org/apache/twill/api/ClassAcceptor.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/api/ClassAcceptor.java b/twill-api/src/main/java/org/apache/twill/api/ClassAcceptor.java new file mode 100644 index 0000000..db8216f --- /dev/null +++ b/twill-api/src/main/java/org/apache/twill/api/ClassAcceptor.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.api; + +import java.net.URL; + +/** + * Class that can be used to determine if class can be accepted. + */ +public class ClassAcceptor { + /** + * Invoked to determine if class can be accepted. default behavior returns true. + * + * @param className Name of the class. + * @param classUrl URL for the class resource. + * @param classPathUrl URL for the class path resource that contains the class resource. + * If the URL protocol is {@code file}, it would be the path to root package. + * If the URL protocol is {@code jar}, it would be the jar file. + * @return true to accept the given class, false otherwise. + */ + public boolean accept(String className, URL classUrl, URL classPathUrl) { + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d181b7ce/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java index c2c62a1..f60080a 100644 --- a/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java +++ b/twill-api/src/main/java/org/apache/twill/api/TwillPreparer.java @@ -179,6 +179,26 @@ public interface TwillPreparer { TwillPreparer withClassPaths(Iterable<String> classPaths); /** + * Adds the set of paths to the classpath on the target machine for ApplicationMaster and all runnables. + * @return This {@link TwillPreparer} + */ + TwillPreparer withApplicationClassPaths(String... classPaths); + + /** + * Adds the set of paths to the classpath on the target machine for ApplicationMaster and all runnables. + * @return This {@link TwillPreparer} + */ + TwillPreparer withApplicationClassPaths(Iterable<String> classPaths); + + /** + * Uses {@link ClassAcceptor} to determine the classes to include in the bundle jar for + * ApplicationMaster and all runnables. + * @param classAcceptor to specify which classes to include in the bundle jar + * @return This {@link TwillPreparer} + */ + TwillPreparer withBundlerClassAcceptor(ClassAcceptor classAcceptor); + + /** * Adds security credentials for the runtime environment to gives application access to resources. * * @param secureStore Contains security token available for the runtime environment. http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d181b7ce/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java index c09b881..73efc81 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java +++ b/twill-core/src/main/java/org/apache/twill/internal/ApplicationBundler.java @@ -28,6 +28,7 @@ import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import com.google.common.io.Files; +import org.apache.twill.api.ClassAcceptor; import org.apache.twill.filesystem.Location; import org.apache.twill.internal.utils.Dependencies; import org.slf4j.Logger; @@ -43,7 +44,6 @@ import java.io.OutputStream; import java.net.URI; import java.net.URL; import java.util.Collections; -import java.util.List; import java.util.Queue; import java.util.Set; import java.util.jar.JarEntry; @@ -57,36 +57,22 @@ import java.util.zip.CheckedOutputStream; public final class ApplicationBundler { private static final Logger LOG = LoggerFactory.getLogger(ApplicationBundler.class); - + public static final String SUBDIR_CLASSES = "classes/"; public static final String SUBDIR_LIB = "lib/"; public static final String SUBDIR_RESOURCES = "resources/"; - private final List<String> excludePackages; - private final List<String> includePackages; + private final ClassAcceptor classAcceptor; private final Set<String> bootstrapClassPaths; private final CRC32 crc32; /** * Constructs a ApplicationBundler. * - * @param excludePackages Class packages to exclude - */ - public ApplicationBundler(Iterable<String> excludePackages) { - this(excludePackages, ImmutableList.<String>of()); - } - - /** - * Constructs a ApplicationBundler. - * - * @param excludePackages Class packages to exclude - * @param includePackages Class packages that should be included. Anything in this list will override the - * one provided in excludePackages. + * @param classAcceptor ClassAcceptor for class packages to include */ - public ApplicationBundler(Iterable<String> excludePackages, Iterable<String> includePackages) { - this.excludePackages = ImmutableList.copyOf(excludePackages); - this.includePackages = ImmutableList.copyOf(includePackages); - + public ApplicationBundler(ClassAcceptor classAcceptor) { + this.classAcceptor = classAcceptor; ImmutableSet.Builder<String> builder = ImmutableSet.builder(); for (String classpath : Splitter.on(File.pathSeparatorChar).split(System.getProperty("sun.boot.class.path"))) { File file = new File(classpath); @@ -99,7 +85,41 @@ public final class ApplicationBundler { } this.bootstrapClassPaths = builder.build(); this.crc32 = new CRC32(); + } + /** + * Constructs a ApplicationBundler. + * + * @param excludePackages Class packages to exclude + */ + public ApplicationBundler(Iterable<String> excludePackages) { + this(excludePackages, ImmutableList.<String>of()); + } + + /** + * Constructs a ApplicationBundler. + * + * @param excludePackages Class packages to exclude + * @param includePackages Class packages that should be included. Anything in this list will override the + * one provided in excludePackages. + */ + public ApplicationBundler(final Iterable<String> excludePackages, final Iterable<String> includePackages) { + this(new ClassAcceptor() { + @Override + public boolean accept(String className, URL classUrl, URL classPathUrl) { + for (String includePackage : includePackages) { + if (className.startsWith(includePackage)) { + return true; + } + } + for (String excludePackage : excludePackages) { + if (className.startsWith(excludePackage)) { + return false; + } + } + return true; + } + }); } public void createBundle(Location target, Iterable<Class<?>> classes) throws IOException { @@ -175,29 +195,15 @@ public final class ApplicationBundler { // Record the set of classpath URL that are already added to the jar final Set<URL> seenClassPaths = Sets.newHashSet(); - Dependencies.findClassDependencies(classLoader, new Dependencies.ClassAcceptor() { + Dependencies.findClassDependencies(classLoader, new ClassAcceptor() { @Override public boolean accept(String className, URL classUrl, URL classPathUrl) { if (bootstrapClassPaths.contains(classPathUrl.getFile())) { return false; } - - boolean shouldInclude = false; - for (String include : includePackages) { - if (className.startsWith(include)) { - shouldInclude = true; - break; - } - } - - if (!shouldInclude) { - for (String exclude : excludePackages) { - if (className.startsWith(exclude)) { - return false; - } - } + if (!classAcceptor.accept(className, classUrl, classPathUrl)) { + return false; } - if (seenClassPaths.add(classPathUrl)) { putEntry(className, classUrl, classPathUrl, entries, jarOut); } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d181b7ce/twill-core/src/main/java/org/apache/twill/internal/Constants.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java index fbc6e70..64b029d 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java +++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java @@ -45,6 +45,8 @@ public final class Constants { public static final String STDOUT = "stdout"; public static final String STDERR = "stderr"; + public static final String CLASSPATH = "classpath"; + public static final String APPLICATION_CLASSPATH = "application-classpath"; /** * Constants for names of internal files that are shared between client, AM and containers. */ http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d181b7ce/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java b/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java index f6f8dde..eb55557 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java +++ b/twill-core/src/main/java/org/apache/twill/internal/utils/Dependencies.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; +import org.apache.twill.api.ClassAcceptor; import org.objectweb.asm.AnnotationVisitor; import org.objectweb.asm.ClassReader; import org.objectweb.asm.ClassVisitor; @@ -47,23 +48,6 @@ import java.util.Set; */ public final class Dependencies { - /** - * Represents a callback for accepting a class during dependency traversal. - */ - public interface ClassAcceptor { - /** - * Invoked when a class is being found as a dependency. - * - * @param className Name of the class. - * @param classUrl URL for the class resource. - * @param classPathUrl URL for the class path resource that contains the class resource. - * If the URL protocol is {@code file}, it would be the path to root package. - * If the URL protocol is {@code jar}, it would be the jar file. - * @return true keep finding dependencies on the given class. - */ - boolean accept(String className, URL classUrl, URL classPathUrl); - } - public static void findClassDependencies(ClassLoader classLoader, ClassAcceptor acceptor, String...classesToResolve) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d181b7ce/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java index 3484349..3405709 100644 --- a/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/launcher/TwillLauncher.java @@ -17,6 +17,8 @@ */ package org.apache.twill.launcher; +import org.apache.twill.internal.Constants; + import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.File; @@ -156,22 +158,11 @@ public final class TwillLauncher { } if (useClassPath) { - InputStream is = ClassLoader.getSystemResourceAsStream("classpath"); - if (is != null) { - try { - BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8"))); - String line = reader.readLine(); - if (line != null) { - for (String path : line.split(":")) { - urls.addAll(getClassPaths(path)); - } - } - } finally { - is.close(); - } - } + addClassPathsToList(urls, Constants.CLASSPATH); } + addClassPathsToList(urls, Constants.APPLICATION_CLASSPATH); + return new URLClassLoader(urls.toArray(new URL[0])); } catch (Exception e) { @@ -179,6 +170,21 @@ public final class TwillLauncher { } } + private static void addClassPathsToList(List<URL> urls, String resource) throws IOException { + try (InputStream is = ClassLoader.getSystemResourceAsStream(resource)) { + if (is != null) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, Charset.forName("UTF-8")))) { + String line = reader.readLine(); + if (line != null) { + for (String path : line.split(":")) { + urls.addAll(getClassPaths(path.trim())); + } + } + } + } + } + } + private static Collection<URL> getClassPaths(String path) throws MalformedURLException { String classpath = expand(path); if (classpath.endsWith("/*")) { http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d181b7ce/twill-examples/yarn/src/main/java/org/apache/twill/example/yarn/HelloWorld.java ---------------------------------------------------------------------- diff --git a/twill-examples/yarn/src/main/java/org/apache/twill/example/yarn/HelloWorld.java b/twill-examples/yarn/src/main/java/org/apache/twill/example/yarn/HelloWorld.java index 76e4948..a9cbba8 100644 --- a/twill-examples/yarn/src/main/java/org/apache/twill/example/yarn/HelloWorld.java +++ b/twill-examples/yarn/src/main/java/org/apache/twill/example/yarn/HelloWorld.java @@ -17,9 +17,14 @@ */ package org.apache.twill.example.yarn; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.twill.api.AbstractTwillRunnable; +import org.apache.twill.api.ClassAcceptor; import org.apache.twill.api.TwillController; import org.apache.twill.api.TwillRunnerService; import org.apache.twill.api.logging.PrinterLogHandler; @@ -28,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.PrintWriter; +import java.net.URL; +import java.util.List; import java.util.concurrent.ExecutionException; /** @@ -57,15 +64,22 @@ public class HelloWorld { } String zkStr = args[0]; - + YarnConfiguration yarnConfiguration = new YarnConfiguration(); final TwillRunnerService twillRunner = new YarnTwillRunnerService( - new YarnConfiguration(), zkStr); + yarnConfiguration, zkStr); twillRunner.start(); + String yarnClasspath = + yarnConfiguration.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + Joiner.on(",").join(YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)); + List<String> applicationClassPaths = Lists.newArrayList(); + Iterables.addAll(applicationClassPaths, Splitter.on(",").split(yarnClasspath)); final TwillController controller = twillRunner.prepare(new HelloWorldRunnable()) .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true))) + .withApplicationClassPaths(applicationClassPaths) + .withBundlerClassAcceptor(new HadoopClassExcluder()) .start(); Runtime.getRuntime().addShutdownHook(new Thread() { @@ -85,4 +99,15 @@ public class HelloWorld { e.printStackTrace(); } } + + static class HadoopClassExcluder extends ClassAcceptor { + @Override + public boolean accept(String className, URL classUrl, URL classPathUrl) { + // exclude hadoop but not hbase package + if (className.startsWith("org.apache.hadoop") && !className.startsWith("org.apache.hadoop.hbase")) { + return false; + } + return true; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/d181b7ce/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index aca4728..4e9f76d 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -42,6 +42,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.twill.api.ClassAcceptor; import org.apache.twill.api.EventHandlerSpecification; import org.apache.twill.api.LocalFile; import org.apache.twill.api.RunId; @@ -122,12 +123,14 @@ final class YarnTwillPreparer implements TwillPreparer { private final List<URI> resources = Lists.newArrayList(); private final List<String> classPaths = Lists.newArrayList(); private final ListMultimap<String, String> runnableArgs = ArrayListMultimap.create(); + private final List<String> applicationClassPaths = Lists.newArrayList(); private final Credentials credentials; private final int reservedMemory; private String user; private String schedulerQueue; private String extraOptions; private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG; + private ClassAcceptor classAcceptor; private LogEntry.Level logLevel; YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec, @@ -147,6 +150,7 @@ final class YarnTwillPreparer implements TwillPreparer { this.user = System.getProperty("user.name"); this.extraOptions = extraOptions; this.logLevel = logLevel; + this.classAcceptor = new ClassAcceptor(); } @Override @@ -246,6 +250,23 @@ final class YarnTwillPreparer implements TwillPreparer { } @Override + public TwillPreparer withApplicationClassPaths(String... classPaths) { + return withApplicationClassPaths(ImmutableList.copyOf(classPaths)); + } + + @Override + public TwillPreparer withApplicationClassPaths(Iterable<String> classPaths) { + Iterables.addAll(this.applicationClassPaths, classPaths); + return this; + } + + @Override + public TwillPreparer withBundlerClassAcceptor(ClassAcceptor classAcceptor) { + this.classAcceptor = classAcceptor; + return this; + } + + @Override public TwillPreparer addSecureStore(SecureStore secureStore) { Object store = secureStore.getStore(); Preconditions.checkArgument(store instanceof Credentials, "Only Hadoop Credentials is supported."); @@ -264,7 +285,6 @@ final class YarnTwillPreparer implements TwillPreparer { try { final ProcessLauncher<ApplicationId> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue); final ApplicationId appId = launcher.getContainerInfo(); - Callable<ProcessController<YarnApplicationReport>> submitTask = new Callable<ProcessController<YarnApplicationReport>>() { @Override @@ -304,6 +324,7 @@ final class YarnTwillPreparer implements TwillPreparer { .put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory)) .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName()) .put(EnvKeys.YARN_RM_SCHEDULER_ADDRESS, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS)); + if (logLevel != null) { LOG.debug("Log level is set to {} for the Twill application.", logLevel); builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString()); @@ -351,7 +372,7 @@ final class YarnTwillPreparer implements TwillPreparer { } private ApplicationBundler createBundler() { - return new ApplicationBundler(ImmutableList.<String>of()); + return new ApplicationBundler(classAcceptor); } private LocalFile createLocalFile(String name, Location location) throws IOException { @@ -502,7 +523,7 @@ final class YarnTwillPreparer implements TwillPreparer { if (classLoader == null) { classLoader = getClass().getClassLoader(); } - Dependencies.findClassDependencies(classLoader, new Dependencies.ClassAcceptor() { + Dependencies.findClassDependencies(classLoader, new ClassAcceptor() { @Override public boolean accept(String className, URL classUrl, URL classPathUrl) { Preconditions.checkArgument(className.startsWith(launcherName) || className.equals(portFinderName), @@ -520,10 +541,8 @@ final class YarnTwillPreparer implements TwillPreparer { }, launcherName, portFinderName); try { - if (!classPaths.isEmpty()) { - jarOut.putNextEntry(new JarEntry("classpath")); - jarOut.write(Joiner.on(':').join(classPaths).getBytes(Charsets.UTF_8)); - } + addClassPaths(Constants.CLASSPATH, classPaths, jarOut); + addClassPaths(Constants.APPLICATION_CLASSPATH, applicationClassPaths, jarOut); } finally { jarOut.close(); } @@ -532,6 +551,13 @@ final class YarnTwillPreparer implements TwillPreparer { localFiles.put(Constants.Files.LAUNCHER_JAR, createLocalFile(Constants.Files.LAUNCHER_JAR, location)); } + private void addClassPaths(String classpathId, List<String> classPaths, JarOutputStream jarOut) throws IOException { + if (!classPaths.isEmpty()) { + jarOut.putNextEntry(new JarEntry(classpathId)); + jarOut.write(Joiner.on(':').join(classPaths).getBytes(Charsets.UTF_8)); + } + } + private void saveJvmOptions(Map<String, LocalFile> localFiles) throws IOException { if ((extraOptions == null || extraOptions.isEmpty()) && JvmOptions.DebugOptions.NO_DEBUG.equals(debugOptions)) {
