Repository: incubator-beam Updated Branches: refs/heads/master 58000b397 -> a834fb0eb
BEAM-830 Support launch on YARN cluster. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b6b2e202 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b6b2e202 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b6b2e202 Branch: refs/heads/master Commit: b6b2e202ae8c5d3d1c081a1e24033380d7f55593 Parents: 1fab152 Author: Thomas Weise <t...@apache.org> Authored: Thu Nov 24 18:36:11 2016 -0800 Committer: Thomas Weise <t...@apache.org> Committed: Fri Dec 9 23:46:40 2016 -0800 ---------------------------------------------------------------------- runners/apex/pom.xml | 52 ++- .../apache/beam/runners/apex/ApexRunner.java | 48 ++- .../beam/runners/apex/ApexRunnerResult.java | 50 +-- .../beam/runners/apex/ApexYarnLauncher.java | 395 +++++++++++++++++++ .../beam/runners/apex/ApexYarnLauncherTest.java | 138 +++++++ 5 files changed, 631 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index b604237..9f1455a 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -35,7 +35,7 @@ <packaging>jar</packaging> <properties> - <apex.core.version>3.5.0-SNAPSHOT</apex.core.version> + <apex.core.version>3.5.0</apex.core.version> <apex.malhar.version>3.4.0</apex.malhar.version> <skipIntegrationTests>true</skipIntegrationTests> <!-- memory limit for embedded cluster --> @@ -218,22 +218,64 @@ </goals> <configuration> <ignoredUsedUndeclaredDependencies> - <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.5.0-SNAPSHOT</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>org.apache.apex:apex-api:jar:3.5.0</ignoredUsedUndeclaredDependency> <ignoredUsedUndeclaredDependency>org.apache.commons:commons-lang3::3.1</ignoredUsedUndeclaredDependency> - <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>commons-io:commons-io:jar:2.4</ignoredUsedUndeclaredDependency> <ignoredUsedUndeclaredDependency>com.esotericsoftware.kryo:kryo::2.24.0</ignoredUsedUndeclaredDependency> - <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>com.datatorrent:netlet::1.3.0</ignoredUsedUndeclaredDependency> <ignoredUsedUndeclaredDependency>org.slf4j:slf4j-api:jar:1.7.14</ignoredUsedUndeclaredDependency> - <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:</ignoredUsedUndeclaredDependency> + <ignoredUsedUndeclaredDependency>org.apache.hadoop:hadoop-common:jar:2.6.0</ignoredUsedUndeclaredDependency> <ignoredUsedUndeclaredDependency>joda-time:joda-time:jar:2.4</ignoredUsedUndeclaredDependency> <ignoredUsedUndeclaredDependency>com.google.guava:guava:jar:19.0</ignoredUsedUndeclaredDependency> </ignoredUsedUndeclaredDependencies> </configuration> </execution> + <execution> + <!-- used in ApexYarnLauncher to filter compile time Hadoop dependencies --> + <id>dependency-tree</id> + <phase>generate-test-resources</phase> + <goals> + <goal>tree</goal> + </goals> + <configuration> + <outputFile>${project.build.directory}/classes/org/apache/beam/runners/apex/dependency-tree</outputFile> + </configuration> + </execution> </executions> </plugin> </plugins> + + <pluginManagement> + <plugins> + <!-- Eclipse has a problem with dependency:tree when it is not in package phase --> + <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> + <plugin> + <groupId>org.eclipse.m2e</groupId> + <artifactId>lifecycle-mapping</artifactId> + <version>1.0.0</version> + <configuration> + <lifecycleMappingMetadata> + <pluginExecutions> + <pluginExecution> + <pluginExecutionFilter> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <versionRange>[2.10,)</versionRange> + <goals> + <goal>tree</goal> + </goals> + </pluginExecutionFilter> + <action> + <ignore></ignore> + </action> + </pluginExecution> + </pluginExecutions> + </lifecycleMappingMetadata> + </configuration> + </plugin> + </plugins> + </pluginManagement> </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 9507fb9..899efa3 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -17,19 +17,22 @@ */ package org.apache.beam.runners.apex; -import static com.google.common.base.Preconditions.checkArgument; - +import com.datatorrent.api.Attribute; import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; -import com.datatorrent.api.LocalMode; import com.datatorrent.api.StreamingApplication; import com.google.common.base.Throwables; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import org.apache.apex.api.EmbeddedAppLauncher; +import org.apache.apex.api.Launcher; +import org.apache.apex.api.Launcher.AppHandle; +import org.apache.apex.api.Launcher.LaunchMode; import org.apache.beam.runners.apex.translation.ApexPipelineTranslator; import org.apache.beam.runners.core.AssignWindows; import org.apache.beam.sdk.Pipeline; @@ -122,33 +125,44 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { public ApexRunnerResult run(final Pipeline pipeline) { final ApexPipelineTranslator translator = new ApexPipelineTranslator(options); + final AtomicReference<DAG> apexDAG = new AtomicReference<>(); StreamingApplication apexApp = new StreamingApplication() { @Override public void populateDAG(DAG dag, Configuration conf) { + apexDAG.set(dag); dag.setAttribute(DAGContext.APPLICATION_NAME, options.getApplicationName()); translator.translate(pipeline, dag); } }; - checkArgument(options.isEmbeddedExecution(), - "only embedded execution is supported at this time"); - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - try { - lma.prepareDAG(apexApp, conf); - LocalMode.Controller lc = lma.getController(); + if (options.isEmbeddedExecution()) { + Launcher<AppHandle> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED); + Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); if (options.isEmbeddedExecutionDebugMode()) { // turns off timeout checking for operator progress - lc.setHeartbeatMonitoringEnabled(false); + launchAttributes.put(EmbeddedAppLauncher.HEARTBEAT_MONITORING, false); + } + Configuration conf = new Configuration(false); + try { + ApexRunner.ASSERTION_ERROR.set(null); + AppHandle apexAppResult = launcher.launchApp(apexApp, conf, launchAttributes); + return new ApexRunnerResult(apexDAG.get(), apexAppResult); + } catch (Exception e) { + Throwables.propagateIfPossible(e); + throw new RuntimeException(e); + } + } else { + try { + ApexYarnLauncher yarnLauncher = new ApexYarnLauncher(); + AppHandle apexAppResult = yarnLauncher.launchApp(apexApp); + return new ApexRunnerResult(apexDAG.get(), apexAppResult); + } catch (IOException e) { + throw new RuntimeException("Failed to launch the application on YARN.", e); } - ApexRunner.ASSERTION_ERROR.set(null); - lc.runAsync(); - return new ApexRunnerResult(lma.getDAG(), lc); - } catch (Exception e) { - Throwables.propagateIfPossible(e); - throw new RuntimeException(e); } + } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java index 18b50bc..8548194 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java @@ -18,11 +18,11 @@ package org.apache.beam.runners.apex; import com.datatorrent.api.DAG; -import com.datatorrent.api.LocalMode; import java.io.IOException; -import java.lang.reflect.Field; +import org.apache.apex.api.Launcher.AppHandle; +import org.apache.apex.api.Launcher.ShutdownMode; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.Pipeline; @@ -36,12 +36,12 @@ import org.joda.time.Duration; */ public class ApexRunnerResult implements PipelineResult { private final DAG apexDAG; - private final LocalMode.Controller ctrl; + private final AppHandle apexApp; private State state = State.UNKNOWN; - public ApexRunnerResult(DAG dag, LocalMode.Controller ctrl) { + public ApexRunnerResult(DAG dag, AppHandle apexApp) { this.apexDAG = dag; - this.ctrl = ctrl; + this.apexApp = apexApp; } @Override @@ -57,19 +57,31 @@ public class ApexRunnerResult implements PipelineResult { @Override public State cancel() throws IOException { - ctrl.shutdown(); + apexApp.shutdown(ShutdownMode.KILL); state = State.CANCELLED; return state; } @Override public State waitUntilFinish(Duration duration) { - return ApexRunnerResult.waitUntilFinished(ctrl, duration); + long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE + : System.currentTimeMillis() + duration.getMillis(); + try { + while (!apexApp.isFinished() && System.currentTimeMillis() < timeout) { + if (ApexRunner.ASSERTION_ERROR.get() != null) { + throw ApexRunner.ASSERTION_ERROR.get(); + } + Thread.sleep(500); + } + return apexApp.isFinished() ? State.DONE : null; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } @Override public State waitUntilFinish() { - return ApexRunnerResult.waitUntilFinished(ctrl, null); + return waitUntilFinish(null); } @Override @@ -85,26 +97,4 @@ public class ApexRunnerResult implements PipelineResult { return apexDAG; } - public static State waitUntilFinished(LocalMode.Controller ctrl, Duration duration) { - // we need to rely on internal field for now - // Apex should make it available through API in upcoming release. - long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE - : System.currentTimeMillis() + duration.getMillis(); - Field appDoneField; - try { - appDoneField = ctrl.getClass().getDeclaredField("appDone"); - appDoneField.setAccessible(true); - while (!appDoneField.getBoolean(ctrl) && System.currentTimeMillis() < timeout) { - if (ApexRunner.ASSERTION_ERROR.get() != null) { - throw ApexRunner.ASSERTION_ERROR.get(); - } - Thread.sleep(500); - } - return appDoneField.getBoolean(ctrl) ? State.DONE : null; - } catch (NoSuchFieldException | SecurityException | IllegalArgumentException - | IllegalAccessException | InterruptedException e) { - throw new RuntimeException(e); - } - } - } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java new file mode 100644 index 0000000..0ae4cc7 --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Attribute.AttributeMap; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Serializable; +import java.lang.reflect.AccessibleObject; +import java.lang.reflect.Field; +import java.net.URI; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.jar.JarFile; +import java.util.jar.Manifest; + +import org.apache.apex.api.EmbeddedAppLauncher; +import org.apache.apex.api.Launcher; +import org.apache.apex.api.Launcher.AppHandle; +import org.apache.apex.api.Launcher.LaunchMode; +import org.apache.apex.api.Launcher.LauncherException; +import org.apache.apex.api.Launcher.ShutdownMode; +import org.apache.apex.api.YarnAppLauncher; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.SerializationUtils; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Proxy to launch the YARN application through the hadoop script to run in the + * pre-configured environment (class path, configuration, native libraries etc.). + * + * <p>The proxy takes the DAG and communicates with the Hadoop services to launch + * it on the cluster. + */ +public class ApexYarnLauncher { + private static final Logger LOG = LoggerFactory.getLogger(ApexYarnLauncher.class); + + public AppHandle launchApp(StreamingApplication app) throws IOException { + + List<File> jarsToShip = getYarnDeployDependencies(); + StringBuilder classpath = new StringBuilder(); + for (File path : jarsToShip) { + if (path.isDirectory()) { + File tmpJar = File.createTempFile("beam-runners-apex-", ".jar"); + createJar(path, tmpJar); + tmpJar.deleteOnExit(); + path = tmpJar; + } + if (classpath.length() != 0) { + classpath.append(':'); + } + classpath.append(path.getAbsolutePath()); + } + + EmbeddedAppLauncher<?> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED); + DAG dag = embeddedLauncher.getDAG(); + app.populateDAG(dag, new Configuration(false)); + + Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + launchAttributes.put(YarnAppLauncher.LIB_JARS, classpath.toString().replace(':', ',')); + LaunchParams lp = new LaunchParams(dag, launchAttributes); + lp.cmd = "hadoop " + ApexYarnLauncher.class.getName(); + HashMap<String, String> env = new HashMap<>(); + env.put("HADOOP_USER_CLASSPATH_FIRST", "1"); + env.put("HADOOP_CLASSPATH", classpath.toString()); + lp.env = env; + return launchApp(lp); + } + + protected AppHandle launchApp(LaunchParams params) throws IOException { + File tmpFile = File.createTempFile("beam-runner-apex", "params"); + tmpFile.deleteOnExit(); + try (FileOutputStream fos = new FileOutputStream(tmpFile)) { + SerializationUtils.serialize(params, fos); + } + if (params.getCmd() == null) { + ApexYarnLauncher.main(new String[] {tmpFile.getAbsolutePath()}); + } else { + String cmd = params.getCmd() + " " + tmpFile.getAbsolutePath(); + ByteArrayOutputStream consoleOutput = new ByteArrayOutputStream(); + LOG.info("Executing: {} with {}", cmd, params.getEnv()); + + ProcessBuilder pb = new ProcessBuilder("bash", "-c", cmd); + Map<String, String> env = pb.environment(); + env.putAll(params.getEnv()); + Process p = pb.start(); + ProcessWatcher pw = new ProcessWatcher(p); + InputStream output = p.getInputStream(); + InputStream error = p.getErrorStream(); + while (!pw.isFinished()) { + IOUtils.copy(output, consoleOutput); + IOUtils.copy(error, consoleOutput); + } + if (pw.rc != 0) { + String msg = "The Beam Apex runner in non-embedded mode requires the Hadoop client" + + " to be installed on the machine from which you launch the job" + + " and the 'hadoop' script in $PATH"; + LOG.error(msg); + throw new RuntimeException("Failed to run: " + cmd + " (exit code " + pw.rc + ")" + "\n" + + consoleOutput.toString()); + } + } + return new AppHandle() { + @Override + public boolean isFinished() { + // TODO (future PR): interaction with child process + LOG.warn("YARN application runs asynchronously and status check not implemented."); + return true; + } + @Override + public void shutdown(ShutdownMode arg0) throws LauncherException { + // TODO (future PR): interaction with child process + throw new UnsupportedOperationException(); + } + }; + } + + /** + * From the current classpath, find the jar files that need to be deployed + * with the application to run on YARN. Hadoop dependencies are provided + * through the Hadoop installation and the application should not bundle them + * to avoid conflicts. This is done by removing the Hadoop compile + * dependencies (transitively) by parsing the Maven dependency tree. + * + * @return list of jar files to ship + * @throws IOException when dependency information cannot be read + */ + public static List<File> getYarnDeployDependencies() throws IOException { + InputStream dependencyTree = ApexRunner.class.getResourceAsStream("dependency-tree"); + BufferedReader br = new BufferedReader(new InputStreamReader(dependencyTree)); + String line = null; + List<String> excludes = new ArrayList<>(); + int excludeLevel = Integer.MAX_VALUE; + while ((line = br.readLine()) != null) { + for (int i = 0; i < line.length(); i++) { + char c = line.charAt(i); + if (Character.isLetter(c)) { + if (i > excludeLevel) { + excludes.add(line.substring(i)); + } else { + if (line.substring(i).startsWith("org.apache.hadoop")) { + excludeLevel = i; + excludes.add(line.substring(i)); + } else { + excludeLevel = Integer.MAX_VALUE; + } + } + break; + } + } + } + br.close(); + + Set<String> excludeJarFileNames = Sets.newHashSet(); + for (String exclude : excludes) { + String[] mvnc = exclude.split(":"); + String fileName = mvnc[1] + "-"; + if (mvnc.length == 6) { + fileName += mvnc[4] + "-" + mvnc[3]; // with classifier + } else { + fileName += mvnc[3]; + } + fileName += ".jar"; + excludeJarFileNames.add(fileName); + } + + ClassLoader classLoader = ApexYarnLauncher.class.getClassLoader(); + URL[] urls = ((URLClassLoader) classLoader).getURLs(); + List<File> dependencyJars = new ArrayList<>(); + for (int i = 0; i < urls.length; i++) { + File f = new File(urls[i].getFile()); + // dependencies can also be directories in the build reactor, + // the Apex client will automatically create jar files for those. + if (f.exists() && !excludeJarFileNames.contains(f.getName())) { + dependencyJars.add(f); + } + } + return dependencyJars; + } + + /** + * Create a jar file from the given directory. + * @param dir source directory + * @param jarFile jar file name + * @throws IOException when file cannot be created + */ + public static void createJar(File dir, File jarFile) throws IOException { + + final Map<String, ?> env = Collections.singletonMap("create", "true"); + if (jarFile.exists() && !jarFile.delete()) { + throw new RuntimeException("Failed to remove " + jarFile); + } + URI uri = URI.create("jar:" + jarFile.toURI()); + try (final FileSystem zipfs = FileSystems.newFileSystem(uri, env);) { + + File manifestFile = new File(dir, JarFile.MANIFEST_NAME); + Files.createDirectory(zipfs.getPath("META-INF")); + final OutputStream out = Files.newOutputStream(zipfs.getPath(JarFile.MANIFEST_NAME)); + if (!manifestFile.exists()) { + new Manifest().write(out); + } else { + FileUtils.copyFile(manifestFile, out); + } + out.close(); + + final java.nio.file.Path root = dir.toPath(); + Files.walkFileTree(root, new java.nio.file.SimpleFileVisitor<Path>() { + String relativePath; + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) + throws IOException { + relativePath = root.relativize(dir).toString(); + if (!relativePath.isEmpty()) { + if (!relativePath.endsWith("/")) { + relativePath += "/"; + } + final Path dstDir = zipfs.getPath(relativePath); + Files.createDirectory(dstDir); + } + return super.preVisitDirectory(dir, attrs); + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + String name = relativePath + file.getFileName(); + if (!JarFile.MANIFEST_NAME.equals(name)) { + final OutputStream out = Files.newOutputStream(zipfs.getPath(name)); + FileUtils.copyFile(file.toFile(), out); + out.close(); + } + return super.visitFile(file, attrs); + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + relativePath = root.relativize(dir.getParent()).toString(); + if (!relativePath.isEmpty() && !relativePath.endsWith("/")) { + relativePath += "/"; + } + return super.postVisitDirectory(dir, exc); + } + }); + } + } + + /** + * The main method expects the serialized DAG and will launch the YARN application. + * @param args location of launch parameters + * @throws IOException when parameters cannot be read + */ + public static void main(String[] args) throws IOException { + checkArgument(args.length == 1, "exactly one argument expected"); + File file = new File(args[0]); + checkArgument(file.exists() && file.isFile(), "invalid file path %s", file); + final LaunchParams params = (LaunchParams) SerializationUtils.deserialize( + new FileInputStream(file)); + StreamingApplication apexApp = new StreamingApplication() { + @Override + public void populateDAG(DAG dag, Configuration conf) { + copyShallow(params.dag, dag); + } + }; + Configuration conf = new Configuration(); // configuration from Hadoop client + AppHandle appHandle = params.getApexLauncher().launchApp(apexApp, conf, + params.launchAttributes); + if (appHandle == null) { + throw new AssertionError("Launch returns null handle."); + } + // TODO (future PR) + // At this point the application is running, but this process should remain active to + // allow the parent to implement the runner result. + } + + /** + * Launch parameters that will be serialized and passed to the child process. + */ + @VisibleForTesting + protected static class LaunchParams implements Serializable { + private static final long serialVersionUID = 1L; + private final DAG dag; + private final Attribute.AttributeMap launchAttributes; + private HashMap<String, String> env; + private String cmd; + + protected LaunchParams(DAG dag, AttributeMap launchAttributes) { + this.dag = dag; + this.launchAttributes = launchAttributes; + } + + protected Launcher<?> getApexLauncher() { + return Launcher.getLauncher(LaunchMode.YARN); + } + + protected String getCmd() { + return cmd; + } + + protected Map<String, String> getEnv() { + return env; + } + + } + + private static void copyShallow(DAG from, DAG to) { + checkArgument(from.getClass() == to.getClass(), "must be same class %s %s", + from.getClass(), to.getClass()); + Field[] fields = from.getClass().getDeclaredFields(); + AccessibleObject.setAccessible(fields, true); + for (int i = 0; i < fields.length; i++) { + Field field = fields[i]; + if (!java.lang.reflect.Modifier.isStatic(field.getModifiers())) { + try { + field.set(to, field.get(from)); + } catch (IllegalArgumentException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Starts a command and waits for it to complete. + */ + public static class ProcessWatcher implements Runnable { + private final Process p; + private volatile boolean finished = false; + private volatile int rc; + + public ProcessWatcher(Process p) { + this.p = p; + new Thread(this).start(); + } + + public boolean isFinished() { + return finished; + } + + @Override + public void run() { + try { + rc = p.waitFor(); + } catch (Exception e) { + // ignore + } + finished = true; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b6b2e202/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java new file mode 100644 index 0000000..986818b --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Attribute.AttributeMap; +import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; + +import java.io.File; +import java.net.URI; +import java.nio.file.FileSystem; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.jar.JarFile; + +import org.apache.apex.api.EmbeddedAppLauncher; +import org.apache.apex.api.Launcher; +import org.apache.apex.api.Launcher.AppHandle; +import org.apache.apex.api.Launcher.LaunchMode; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for dependency resolution for pipeline execution on YARN. + */ +public class ApexYarnLauncherTest { + + @Test + public void testGetYarnDeployDependencies() throws Exception { + List<File> deps = ApexYarnLauncher.getYarnDeployDependencies(); + String depsToString = deps.toString(); + // the beam dependencies are not present as jar when running within the Maven build reactor + //assertThat(depsToString, containsString("beam-runners-core-")); + //assertThat(depsToString, containsString("beam-runners-apex-")); + assertThat(depsToString, containsString("apex-common-")); + assertThat(depsToString, not(containsString("hadoop-"))); + assertThat(depsToString, not(containsString("zookeeper-"))); + } + + @Test + public void testProxyLauncher() throws Exception { + // use the embedded launcher to build the DAG only + EmbeddedAppLauncher<?> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED); + + StreamingApplication app = new StreamingApplication() { + @Override + public void populateDAG(DAG dag, Configuration conf) { + dag.setAttribute(DAGContext.APPLICATION_NAME, "DummyApp"); + } + }; + + Configuration conf = new Configuration(false); + DAG dag = embeddedLauncher.prepareDAG(app, conf); + Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + ApexYarnLauncher launcher = new ApexYarnLauncher(); + launcher.launchApp(new MockApexYarnLauncherParams(dag, launchAttributes)); + } + + private static class MockApexYarnLauncherParams extends ApexYarnLauncher.LaunchParams { + private static final long serialVersionUID = 1L; + + public MockApexYarnLauncherParams(DAG dag, AttributeMap launchAttributes) { + super(dag, launchAttributes); + } + + @Override + protected Launcher<?> getApexLauncher() { + return new Launcher<AppHandle>() { + @Override + public AppHandle launchApp(StreamingApplication application, + Configuration configuration, AttributeMap launchParameters) + throws org.apache.apex.api.Launcher.LauncherException { + EmbeddedAppLauncher<?> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED); + DAG dag = embeddedLauncher.getDAG(); + application.populateDAG(dag, new Configuration(false)); + String appName = dag.getValue(DAGContext.APPLICATION_NAME); + Assert.assertEquals("DummyApp", appName); + return new AppHandle() { + @Override + public boolean isFinished() { + return true; + } + @Override + public void shutdown(org.apache.apex.api.Launcher.ShutdownMode arg0) { + } + }; + } + }; + } + + } + + @Test + public void testCreateJar() throws Exception { + File baseDir = new File("./target/testCreateJar"); + File srcDir = new File(baseDir, "src"); + String file1 = "file1"; + FileUtils.forceMkdir(srcDir); + FileUtils.write(new File(srcDir, file1), "file1"); + + File jarFile = new File(baseDir, "test.jar"); + ApexYarnLauncher.createJar(srcDir, jarFile); + Assert.assertTrue("exists: " + jarFile, jarFile.exists()); + URI uri = URI.create("jar:" + jarFile.toURI()); + final Map<String, ?> env = Collections.singletonMap("create", "true"); + try (final FileSystem zipfs = FileSystems.newFileSystem(uri, env);) { + Assert.assertTrue("manifest", Files.isRegularFile(zipfs.getPath(JarFile.MANIFEST_NAME))); + Assert.assertTrue("file1", Files.isRegularFile(zipfs.getPath(file1))); + } + + } +}