Updated Branches: refs/heads/trunk 67f5f7475 -> b2dff2751
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java new file mode 100644 index 0000000..d596413 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.yarn; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.GraphTaskManager; + +import org.apache.giraph.io.VertexOutputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.MapContext; +import org.apache.hadoop.mapreduce.Mapper.Context; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; +import org.apache.hadoop.mapreduce.task.MapContextImpl; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.log4j.Logger; + +import java.io.IOException; + +/** + * This process will execute the BSP graph tasks alloted to this YARN + * execution container. All tasks will be performed by calling the + * GraphTaskManager object. Since this GiraphYarnTask will + * not be passing data by key-value pairs through the MR framework, the + * Mapper parameter types are irrelevant, and set to <code>Object</code> type. + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + * @param <M> Message data + */ +public class GiraphYarnTask<I extends WritableComparable, V extends Writable, + E extends Writable, M extends Writable> { + static { + Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE); + } + /** Class logger */ + private static final Logger LOG = Logger.getLogger(GiraphYarnTask.class); + /** Manage the framework-agnostic Giraph task for this job run */ + private GraphTaskManager<I, V, E, M> graphTaskManager; + /** Giraph task ID number must start @ index 0. Used by ZK, BSP, etc. */ + private final int bspTaskId; + /** A special "dummy" override of Mapper#Context, used to deliver MRv1 deps */ + private Context proxy; + /** Configuration to hand off into Giraph, through wrapper Mapper#Context */ + private ImmutableClassesGiraphConfiguration conf; + + /** + * Constructor. Build our DUMMY MRv1 data structures to pass to our + * GiraphTaskManager. This allows us to continue to look the other way + * while Giraph relies on MRv1 under the hood. + * @param taskAttemptId the MRv1 TaskAttemptID we constructed from CLI args + * supplied by GiraphApplicationMaster. + */ + public GiraphYarnTask(final TaskAttemptID taskAttemptId) { + conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>( + new GiraphConfiguration()); + bspTaskId = taskAttemptId.getTaskID().getId(); + conf.setInt("mapred.task.partition", bspTaskId); + proxy = buildProxyMapperContext(taskAttemptId); + graphTaskManager = new GraphTaskManager<I, V, E, M>(proxy); + } + + /** + * Run one Giraph worker (or master) task, hosted in this execution container. + */ + public void run() { + // Notify the master quicker if there is worker failure rather than + // waiting for ZooKeeper to timeout and delete the ephemeral znodes + try { + graphTaskManager.setup(null); // defaults GTM to "assume fatjar mode" + graphTaskManager.execute(); + graphTaskManager.cleanup(); + } catch (InterruptedException ie) { + LOG.error("run() caught an unrecoverable InterruptedException.", ie); + } catch (IOException ioe) { + throw new RuntimeException( + "run() caught an unrecoverable IOException.", ioe); + // CHECKSTYLE: stop IllegalCatch + } catch (RuntimeException e) { + // CHECKSTYLE: resume IllegalCatch + graphTaskManager.zooKeeperCleanup(); + graphTaskManager.workerFailureCleanup(); + throw new RuntimeException( + "run: Caught an unrecoverable exception " + e.getMessage(), e); + } finally { + // YARN: must complete the commit of the final output, Hadoop isn't there. + finalizeYarnJob(); + } + } + + /** + * Without Hadoop MR to finish the consolidation of all the task output from + * each HDFS task tmp dir, it won't get done. YARN has some job finalization + * it must do "for us." -- AND must delete "jar cache" in HDFS too! + */ + private void finalizeYarnJob() { + if (conf.isPureYarnJob() && graphTaskManager.isMaster() && + conf.getVertexOutputFormatClass() != null) { + try { + LOG.info("Master is ready to commit final job output data."); + VertexOutputFormat vertexOutputFormat = + conf.createVertexOutputFormat(); + OutputCommitter outputCommitter = + vertexOutputFormat.getOutputCommitter(proxy); + // now we will have our output in OUTDIR if all went well... + outputCommitter.commitJob(proxy); + LOG.info("Master has committed the final job output data."); + } catch (InterruptedException ie) { + LOG.error("Interrupted while attempting to obtain " + + "OutputCommitter.", ie); + } catch (IOException ioe) { + LOG.error("Master task's attempt to commit output has " + + "FAILED.", ioe); + } + } + } + + /** + * Utility to generate dummy Mapper#Context for use in Giraph internals. + * This is the "key hack" to inject MapReduce-related data structures + * containing YARN cluster metadata (and our GiraphConf from the AppMaster) + * into our Giraph BSP task code. + * @param tid the TaskAttemptID to construct this Mapper#Context from. + * @return sort of a Mapper#Context if you squint just right. + */ + private Context buildProxyMapperContext(final TaskAttemptID tid) { + MapContext mc = new MapContextImpl<Object, Object, Object, Object>( + conf, // our Configuration, populated back at the GiraphYarnClient. + tid, // our TaskAttemptId, generated w/YARN app, container, attempt IDs + null, // RecordReader here will never be used by Giraph + null, // RecordWriter here will never be used by Giraph + null, // OutputCommitter here will never be used by Giraph + new TaskAttemptContextImpl.DummyReporter() { // goes in task logs for now + @Override + public void setStatus(String msg) { + LOG.info("[STATUS: task-" + bspTaskId + "] " + msg); + } + }, + null); // Input split setting here will never be used by Giraph + + // now, we wrap our MapContext ref so we can produce a Mapper#Context + WrappedMapper<Object, Object, Object, Object> wrappedMapper + = new WrappedMapper<Object, Object, Object, Object>(); + return wrappedMapper.getMapContext(mc); + } + + /** + * Default handler for uncaught exceptions. + */ + class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler { + @Override + public void uncaughtException(final Thread t, final Throwable e) { + LOG.fatal( + "uncaughtException: OverrideExceptionHandler on thread " + + t.getName() + ", msg = " + e.getMessage() + ", exiting...", e); + System.exit(1); + } + } + + /** + * Task entry point. + * @param args CLI arguments injected by GiraphApplicationMaster to hand off + * job, task, and attempt ID's to this (and every) Giraph task. + * Args should be: <code>AppId ContainerId AppAttemptId</code> + */ + @SuppressWarnings("rawtypes") + public static void main(String[] args) { + if (args.length != 4) { + throw new IllegalStateException("GiraphYarnTask could not construct " + + "a TaskAttemptID for the Giraph job from args: " + printArgs(args)); + } + try { + GiraphYarnTask<?, ?, ?, ?> giraphYarnTask = + new GiraphYarnTask(getTaskAttemptID(args)); + giraphYarnTask.run(); + // CHECKSTYLE: stop IllegalCatch + } catch (Throwable t) { + // CHECKSTYLE resume IllegalCatch + LOG.error("GiraphYarnTask threw a top-level exception, failing task", t); + System.exit(2); + } // ALWAYS finish a YARN task or AppMaster with System#exit!!! + System.exit(0); + } + + /** + * Utility to create a TaskAttemptId we can feed to our fake Mapper#Context. + * + * NOTE: ContainerId will serve as MR TaskID for Giraph tasks. + * YARN container 1 is always AppMaster, so the least container id we will + * ever get from YARN for a Giraph task is container id 2. Giraph on MapReduce + * tasks must start at index 0. So we SUBTRACT TWO from each container id. + * + * @param args the command line args, fed to us by GiraphApplicationMaster. + * @return the TaskAttemptId object, populated with YARN job data. + */ + private static TaskAttemptID getTaskAttemptID(String[] args) { + return new TaskAttemptID( + args[0], // YARN ApplicationId Cluster Timestamp + Integer.parseInt(args[1]), // YARN ApplicationId # + TaskID.getTaskType('m'), // Make Giraph think this is a Mapper task. + Integer.parseInt(args[2]) - 2, // YARN ContainerId MINUS TWO (see above) + Integer.parseInt(args[3])); // YARN AppAttemptId # + } + + /** + * Utility to help log command line args in the event of an error. + * @param args the CLI args. + * @return a pretty-print of the input args. + */ + private static String printArgs(String[] args) { + int count = 0; + StringBuilder sb = new StringBuilder(); + for (String arg : args) { + sb.append("arg[" + (count++) + "] == " + arg + ", "); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java b/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java new file mode 100644 index 0000000..aa042e8 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.yarn; + +import com.google.common.collect.Sets; +import java.io.FileOutputStream; +import java.util.Set; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; +import java.io.File; +import java.io.IOException; +import java.util.Map; +import org.apache.log4j.Logger; + +/** + * Utilities that can only compile with versions of Hadoop that support YARN, + * so they live here instead of o.a.g.utils package. + */ +public class YarnUtils { + /** Class Logger */ + private static final Logger LOG = Logger.getLogger(YarnUtils.class); + /** Default dir on HDFS (or equivalent) where LocalResources are stored */ + private static final String HDFS_RESOURCE_DIR = "giraph_yarn_jar_cache"; + + /** Private constructor, this is a utility class only */ + private YarnUtils() { /* no-op */ } + + /** + * Populates the LocalResources list with the HDFS paths listed in + * the conf under GiraphConstants.GIRAPH_YARN_LIBJARS, and the + * GiraphConfiguration for this job. Also adds the Giraph default application + * jar as determined by GiraphYarnClient.GIRAPH_CLIENT_JAR constant. + * @param map the LocalResources list to populate. + * @param giraphConf the configuration to use to select jars to include. + * @param appId the ApplicationId, naming the the HDFS base dir for job jars. + */ + public static void addFsResourcesToMap(Map<String, LocalResource> map, + GiraphConfiguration giraphConf, ApplicationId appId) throws IOException { + FileSystem fs = FileSystem.get(giraphConf); + Path baseDir = YarnUtils.getFsCachePath(fs, appId); + boolean coreJarFound = false; + for (String fileName : giraphConf.getYarnLibJars().split(",")) { + if (fileName.length() > 0) { + Path filePath = new Path(baseDir, fileName); + LOG.info("Adding " + fileName + " to LocalResources for export."); + if (fileName.contains("giraph-core")) { + coreJarFound = true; + } + addFileToResourceMap(map, fs, filePath); + } + } + if (!coreJarFound) { // OK if you are running giraph-examples-jar-with-deps + LOG.warn("Job jars (-yj option) didn't include giraph-core."); + } + Path confPath = new Path(baseDir, GiraphConstants.GIRAPH_YARN_CONF_FILE); + addFileToResourceMap(map, fs, confPath); + } + + /** + * Utility function to locate local JAR files and other resources + * recursively in the dirs on the local CLASSPATH. Once all the files + * named in <code>fileNames</code> are found, we stop and return the results. + * @param fileNames the file name of the jars, without path information. + * @return a set of Paths to the jar files requested in fileNames. + */ + public static Set<Path> getLocalFiles(final Set<String> fileNames) { + Set<Path> jarPaths = Sets.newHashSet(); + String classPath = ".:" + System.getenv("HADOOP_HOME"); + if (classPath.length() > 2) { + classPath += ":"; + } + classPath += System.getenv("CLASSPATH"); + for (String baseDir : classPath.split(":")) { + if (baseDir.length() > 0) { + // lose the globbing chars that will fail in File#listFiles + final int lastFileSep = baseDir.lastIndexOf("/"); + if (lastFileSep > 0) { + String test = baseDir.substring(lastFileSep); + if (test.contains("*")) { + baseDir = baseDir.substring(0, lastFileSep); + } + } + populateJarList(new File(baseDir), jarPaths, fileNames); + } + if (jarPaths.size() >= fileNames.size()) { + break; // found a resource for each name in the input set, all done + } + } + return jarPaths; + } + + /** + * Start in the working directory and recursively locate all jars. + * @param dir current directory to explore. + * @param fileSet the list to populate. + * @param fileNames file names to locate. + */ + private static void populateJarList(final File dir, + final Set<Path> fileSet, final Set<String> fileNames) { + File[] filesInThisDir = dir.listFiles(); + if (null == filesInThisDir) { + return; + } + for (File f : dir.listFiles()) { + if (f.isDirectory()) { + populateJarList(f, fileSet, fileNames); + } else if (f.isFile() && fileNames.contains(f.getName())) { + fileSet.add(new Path(f.getAbsolutePath())); + } + } + } + + /** + * Boilerplate to add a file to the local resources.. + * @param localResources the LocalResources map to populate. + * @param fs handle to the HDFS file system. + * @param target the file to send to the remote container. + */ + public static void addFileToResourceMap(Map<String, LocalResource> + localResources, FileSystem fs, Path target) + throws IOException { + LocalResource resource = Records.newRecord(LocalResource.class); + FileStatus destStatus = fs.getFileStatus(target); + resource.setResource(ConverterUtils.getYarnUrlFromURI(target.toUri())); + resource.setSize(destStatus.getLen()); + resource.setTimestamp(destStatus.getModificationTime()); + resource.setType(LocalResourceType.FILE); // use FILE, even for jars! + resource.setVisibility(LocalResourceVisibility.APPLICATION); + localResources.put(target.getName(), resource); + LOG.info("Registered file in LocalResources: " + target.getName()); + } + + /** + * Get the base HDFS dir we will be storing our LocalResources in. + * @param fs the file system. + * @param appId the ApplicationId under which our resources will be stored. + * @return the path + */ + public static Path getFsCachePath(final FileSystem fs, + final ApplicationId appId) { + return new Path(fs.getHomeDirectory(), HDFS_RESOURCE_DIR + "/" + appId); + } + + /** + * Popuate the environment string map to be added to the environment vars + * in a remote execution container. Adds the local classpath to pick up + * "yarn-site.xml" and "mapred-site.xml" stuff. + * @param env the map of env var values. + * @param giraphConf the GiraphConfiguration to pull values from. + */ + public static void addLocalClasspathToEnv(final Map<String, String> env, + final GiraphConfiguration giraphConf) { + StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*"); + for (String cpEntry : giraphConf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + classPathEnv.append(':').append(cpEntry.trim()); + } + for (String cpEntry : giraphConf.getStrings( + MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, + MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) { + classPathEnv.append(':').append(cpEntry.trim()); + } + // add the runtime classpath needed for tests to work + if (giraphConf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) { + classPathEnv.append(':').append(System.getenv("CLASSPATH")); + } + env.put("CLASSPATH", classPathEnv.toString()); + } + + /** + * Populate the LocalResources list with the GiraphConf XML file's HDFS path. + * @param giraphConf the GiraphConfifuration to export for worker tasks. + * @param appId the ApplicationId for this YARN app. + * @param localResourceMap the LocalResource map of files to export to tasks. + */ + public static void addGiraphConfToLocalResourceMap(GiraphConfiguration + giraphConf, ApplicationId appId, Map<String, LocalResource> + localResourceMap) throws IOException { + FileSystem fs = FileSystem.get(giraphConf); + Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId), + GiraphConstants.GIRAPH_YARN_CONF_FILE); + YarnUtils.addFileToResourceMap(localResourceMap, fs, hdfsConfPath); + } + + /** + * Export our populated GiraphConfiguration as an XML file to be used by the + * ApplicationMaster's exec container, and register it with LocalResources. + * @param giraphConf the current Configuration object to be published. + * @param appId the ApplicationId to stamp this app's base HDFS resources dir. + */ + public static void exportGiraphConfiguration(GiraphConfiguration giraphConf, + ApplicationId appId) throws IOException { + File confFile = new File(System.getProperty("java.io.tmpdir"), + GiraphConstants.GIRAPH_YARN_CONF_FILE); + if (confFile.exists()) { + confFile.delete(); + } + String localConfPath = confFile.getAbsolutePath(); + FileOutputStream fos = null; + try { + fos = new FileOutputStream(localConfPath); + giraphConf.writeXml(fos); + FileSystem fs = FileSystem.get(giraphConf); + Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId), + GiraphConstants.GIRAPH_YARN_CONF_FILE); + fos.flush(); + fs.copyFromLocalFile(false, true, new Path(localConfPath), hdfsConfPath); + } finally { + if (null != fos) { + fos.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java b/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java new file mode 100644 index 0000000..c8a3683 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Catch all package for YARN-specific code. Only compiles under Maven + * hadoop_yarn profile. + */ +package org.apache.giraph.yarn; http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java b/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java new file mode 100644 index 0000000..efd1179 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.yarn; + +import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR; + +import java.io.File; +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import junit.framework.Assert; +import org.apache.commons.io.FileUtils; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.formats.GiraphFileInputFormat; +import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; +import org.apache.giraph.io.formats.IntIntNullIntTextInputFormat; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.log4j.Logger; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.server.ServerConfig; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; + +import org.junit.Test; + + +/** + * Tests the Giraph on YARN workflow. Basically, the plan is to use a + * <code>MiniYARNCluster</code> to run a small test job through our + * GiraphYarnClient -> GiraphApplicationMaster -> GiraphYarnTask (2 no-ops) + * No "real" BSP code need be tested here, as it is not aware it is running on + * YARN once the job is in progress, so the existing MRv1 BSP tests are fine. + */ +public class TestYarnJob implements Watcher { + private static final Logger LOG = Logger.getLogger(TestYarnJob.class); + /** + * Simple No-Op vertex to test if we can run a quick Giraph job on YARN. + */ + private static class DummyYarnVertex extends Vertex<IntWritable, IntWritable, + NullWritable, IntWritable> { + @Override + public void compute(Iterable<IntWritable> messages) throws IOException { + voteToHalt(); + } + } + + /** job name for this integration test */ + private static final String JOB_NAME = "giraph-TestPureYarnJob"; + /** ZooKeeper port to use for tests, avoiding InternalVertexRunner's port */ + private static final int LOCAL_ZOOKEEPER_PORT = 22183; + /** ZooKeeper list system property */ + private static final String zkList = "localhost:" + LOCAL_ZOOKEEPER_PORT; + /** Local ZK working dir, avoid InternalVertexRunner naming */ + private static final String zkDirName = "_bspZooKeeperYarn"; + /** Local ZK Manager working dir, avoid InternalVertexRunner naming */ + private static final String zkMgrDirName = "_defaultZooKeeperManagerYarn"; + + /** Temp ZK base working dir for integration test */ + private File testBaseDir = null; + /** Fake input dir for integration test */ + private File inputDir = null; + /** Fake output dir for integration test */ + private File outputDir = null; + /** Temp ZK working dir for integration test */ + private File zkDir = null; + /** Temp ZK Manager working dir for integration test */ + private File zkMgrDir = null; + /** Internal ZooKeeper instance for integration test run */ + private InternalZooKeeper zookeeper; + /** For running the ZK instance locally */ + private ExecutorService exec = Executors.newSingleThreadExecutor(); + /** GiraphConfiguration for a "fake YARN job" */ + private GiraphConfiguration conf = null; + /** Counter for # of znode events during integration test */ + private int zkEventCount = 0; + /** Our YARN test cluster for local integration test */ + private MiniYARNCluster cluster = null; + + @Test + public void testPureYarnJob() { + try { + setupYarnConfiguration(); + initLocalZookeeper(); + initYarnCluster(); + GiraphYarnClient testGyc = new GiraphYarnClient(conf, JOB_NAME); + Assert.assertTrue(testGyc.run(true)); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Caught exception in TestYarnJob: " + e); + } finally { + zookeeper.end(); + exec.shutdown(); + cluster.stop(); + deleteTempDirectories(); + } + } + + /** + * Logging this stuff will help you debug integration test issues. + * @param zkEvent incoming event for our current test ZK's znode tree. + */ + @Override + public void process(WatchedEvent zkEvent) { + String event = zkEvent == null ? "NULL" : zkEvent.toString(); + LOG.info("TestYarnJob observed ZK event: " + event + + " for a total of " + (++zkEventCount) + " so far."); + } + + /** + * Delete our temp dir so checkstyle and rat plugins are happy. + */ + private void deleteTempDirectories() { + try { + if (testBaseDir != null && testBaseDir.exists()) { + FileUtils.deleteDirectory(testBaseDir); + } + } catch (IOException ioe) { + LOG.error("TestYarnJob#deleteTempDirectories() FAIL at: " + testBaseDir); + } + } + + /** + * Initialize a local ZK instance for our test run. + */ + private void initLocalZookeeper() throws IOException { + zookeeper = new InternalZooKeeper(); + exec.execute(new Runnable() { + @Override + public void run() { + try { + // Configure a local zookeeper instance + Properties zkProperties = generateLocalZkProperties(); + QuorumPeerConfig qpConfig = new QuorumPeerConfig(); + qpConfig.parseProperties(zkProperties); + // run the zookeeper instance + final ServerConfig zkConfig = new ServerConfig(); + zkConfig.readFrom(qpConfig); + zookeeper.runFromConfig(zkConfig); + } catch (QuorumPeerConfig.ConfigException qpcce) { + throw new RuntimeException("parse of generated ZK config file " + + "has failed.", qpcce); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException("initLocalZookeeper in TestYarnJob: ", e); + } + } + + /** + * Returns pre-created ZK conf properties for Giraph integration test. + * @return the populated properties sheet. + */ + Properties generateLocalZkProperties() { + Properties zkProperties = new Properties(); + zkProperties.setProperty("tickTime", "2000"); + zkProperties.setProperty("dataDir", zkDir.getAbsolutePath()); + zkProperties.setProperty("clientPort", + String.valueOf(LOCAL_ZOOKEEPER_PORT)); + zkProperties.setProperty("maxClientCnxns", "10000"); + zkProperties.setProperty("minSessionTimeout", "10000"); + zkProperties.setProperty("maxSessionTimeout", "100000"); + zkProperties.setProperty("initLimit", "10"); + zkProperties.setProperty("syncLimit", "5"); + zkProperties.setProperty("snapCount", "50000"); + return zkProperties; + } + }); + } + + /** + * Set up the GiraphConfiguration settings we need to run a no-op Giraph + * job on a MiniYARNCluster as an integration test. Some YARN-specific + * flags are set inside GiraphYarnClient and won't need to be set here. + */ + private void setupYarnConfiguration() throws IOException { + conf = new GiraphConfiguration(); + conf.setWorkerConfiguration(1, 1, 100.0f); + conf.setMaxMasterSuperstepWaitMsecs(30 * 1000); + conf.setEventWaitMsecs(3 * 1000); + conf.setYarnLibJars(""); // no need + conf.setYarnTaskHeapMb(256); // small since no work to be done + conf.setVertexClass(DummyYarnVertex.class); + conf.setVertexInputFormatClass(IntIntNullIntTextInputFormat.class); + conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class); + conf.setNumComputeThreads(1); + conf.setMaxTaskAttempts(1); + conf.setNumInputSplitsThreads(1); + // Giraph on YARN only ever things its running in "non-local" mode + conf.setLocalTestMode(false); + // this has to happen here before we populate the conf with the temp dirs + setupTempDirectories(); + conf.set(OUTDIR, new Path(outputDir.getAbsolutePath()).toString()); + GiraphFileInputFormat.addVertexInputPath(conf, new Path(inputDir.getAbsolutePath())); + // hand off the ZK info we just created to our no-op job + GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.set(conf, 500); + conf.setZooKeeperConfiguration(zkList); + conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.getAbsolutePath()); + GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, zkMgrDir.getAbsolutePath()); + // without this, our "real" client won't connect w/"fake" YARN cluster + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + } + + /** + * Initialize the temp dir tree for ZK and I/O for no-op integration test. + */ + private void setupTempDirectories() throws IOException { + try { + testBaseDir = + new File(System.getProperty("user.dir"), JOB_NAME); + if (testBaseDir.exists()) { + testBaseDir.delete(); + } + testBaseDir.mkdir(); + inputDir = new File(testBaseDir, "yarninput"); + if (inputDir.exists()) { + inputDir.delete(); + } + inputDir.mkdir(); + File inFile = new File(inputDir, "graph_data.txt"); + inFile.createNewFile(); + outputDir = new File(testBaseDir, "yarnoutput"); + if (outputDir.exists()) { + outputDir.delete(); + } // don't actually produce the output dir, let Giraph On YARN do it + zkDir = new File(testBaseDir, zkDirName); + if (zkDir.exists()) { + zkDir.delete(); + } + zkDir.mkdir(); + zkMgrDir = new File(testBaseDir, zkMgrDirName); + if (zkMgrDir.exists()) { + zkMgrDir.delete(); + } + zkMgrDir.mkdir(); + } catch (IOException ioe) { + ioe.printStackTrace(); + throw new IOException("from setupTempDirectories: ", ioe); + } + } + + /** + * Initialize the MiniYARNCluster for the integration test. + */ + private void initYarnCluster() { + cluster = new MiniYARNCluster(TestYarnJob.class.getName(), 1, 1, 1); + cluster.init(new ImmutableClassesGiraphConfiguration(conf)); + cluster.start(); + } + + /** + * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown + */ + class InternalZooKeeper extends ZooKeeperServerMain { + /** + * Shutdown the ZooKeeper instance. + */ + void end() { + shutdown(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/test/resources/capacity-scheduler.xml ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/resources/capacity-scheduler.xml b/giraph-core/src/test/resources/capacity-scheduler.xml new file mode 100644 index 0000000..5e19b9a --- /dev/null +++ b/giraph-core/src/test/resources/capacity-scheduler.xml @@ -0,0 +1,26 @@ +<?xml version="1.0" encoding="UTF-8" ?> + +<configuration> + + <property> + <name>yarn.scheduler.capacity.root.queues</name> + <value>unfunded,default</value> + </property> + + <property> + <name>yarn.scheduler.capacity.root.capacity</name> + <value>100</value> + </property> + + <property> + <name>yarn.scheduler.capacity.root.unfunded.capacity</name> + <value>50</value> + </property> + + <property> + <name>yarn.scheduler.capacity.root.default.capacity</name> + <value>50</value> + </property> + +</configuration> + http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-examples/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml index 3b6a08c..21e8ccf 100644 --- a/giraph-examples/pom.xml +++ b/giraph-examples/pom.xml @@ -104,6 +104,18 @@ under the License. <build> <plugins> <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/yarn/**</exclude> + </excludes> + <testExcludes> + <exclude>**/yarn/**</exclude> + </testExcludes> + </configuration> + </plugin> + <plugin> <groupId>org.sonatype.plugins</groupId> <artifactId>munge-maven-plugin</artifactId> </plugin> @@ -116,6 +128,18 @@ under the License. <build> <plugins> <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/yarn/**</exclude> + </excludes> + <testExcludes> + <exclude>**/yarn/**</exclude> + </testExcludes> + </configuration> + </plugin> + <plugin> <groupId>org.sonatype.plugins</groupId> <artifactId>munge-maven-plugin</artifactId> </plugin> @@ -128,6 +152,18 @@ under the License. <build> <plugins> <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/yarn/**</exclude> + </excludes> + <testExcludes> + <exclude>**/yarn/**</exclude> + </testExcludes> + </configuration> + </plugin> + <plugin> <groupId>org.sonatype.plugins</groupId> <artifactId>munge-maven-plugin</artifactId> <configuration> @@ -152,6 +188,18 @@ under the License. <build> <plugins> <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/yarn/**</exclude> + </excludes> + <testExcludes> + <exclude>**/yarn/**</exclude> + </testExcludes> + </configuration> + </plugin> + <plugin> <groupId>org.sonatype.plugins</groupId> <artifactId>munge-maven-plugin</artifactId> <configuration> @@ -180,6 +228,18 @@ under the License. <build> <plugins> <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/yarn/**</exclude> + </excludes> + <testExcludes> + <exclude>**/yarn/**</exclude> + </testExcludes> + </configuration> + </plugin> + <plugin> <groupId>org.sonatype.plugins</groupId> <artifactId>munge-maven-plugin</artifactId> </plugin> @@ -191,24 +251,119 @@ under the License. </build> </profile> + <!-- Currently supports hadoop-2.0.2-alpha + (see hadoop_yarn profile in giraph-parent POM to change) --> + <profile> + <id>hadoop_yarn</id> + <build> + <plugins> + <plugin> + <groupId>org.sonatype.plugins</groupId> + <artifactId>munge-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + </profile> + + <!-- The profiles below do not (yet) include any munge flags --> <profile> <id>hadoop_2.0.0</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/yarn/**</exclude> + </excludes> + <testExcludes> + <exclude>**/yarn/**</exclude> + </testExcludes> + </configuration> + </plugin> + </plugins> + </build> </profile> <profile> <id>hadoop_2.0.1</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/yarn/**</exclude> + </excludes> + <testExcludes> + <exclude>**/yarn/**</exclude> + </testExcludes> + </configuration> + </plugin> + </plugins> + </build> </profile> <profile> <id>hadoop_2.0.2</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/yarn/**</exclude> + </excludes> + <testExcludes> + <exclude>**/yarn/**</exclude> + </testExcludes> + </configuration> + </plugin> + </plugins> + </build> </profile> <profile> - <id>hadoop_2.0.3</id> + <id>hadoop_2.0.3</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/yarn/**</exclude> + </excludes> + <testExcludes> + <exclude>**/yarn/**</exclude> + </testExcludes> + </configuration> + </plugin> + </plugins> + </build> </profile> <profile> <id>hadoop_trunk</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>**/yarn/**</exclude> + </excludes> + <testExcludes> + <exclude>**/yarn/**</exclude> + </testExcludes> + </configuration> + </plugin> + </plugins> + </build> </profile> </profiles> http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 446aa2a..c883d41 100644 --- a/pom.xml +++ b/pom.xml @@ -496,7 +496,7 @@ under the License. <exclude>CODE_CONVENTIONS</exclude> <!-- generated content --> <exclude>**/target/**</exclude> - <exclude>_bsp/**</exclude> + <exclude>/_bsp/**</exclude> <exclude>.checkstyle</exclude> <!-- source control and IDEs --> <exclude>.reviewboardrc</exclude> @@ -506,7 +506,9 @@ under the License. <exclude>.idea/**</exclude> <exclude>**/*.iml</exclude> <exclude>**/*.ipr</exclude> - </excludes> + <!-- test resources (for Giraph on YARN profile) --> + <exclude>**/test/resources/**</exclude> + </excludes> </configuration> </plugin> <plugin> @@ -749,6 +751,60 @@ under the License. </dependencies> </profile> + <!-- This profile runs on Hadoop-2.0.3-alpha by default, but does not + use Hadoop MapReduce v2 to set up the Giraph job. This means the Giraph + worker/master tasks are not Mappers. Tasks are run in YARN-managed execution + containers. Internally, the Giraph framework continues to depend on many Hadoop + MapReduce classes to perform work. --> + <profile> + <id>hadoop_yarn</id> + <properties> + <hadoop.version>2.0.3-alpha</hadoop.version> + <munge.symbols>PURE_YARN</munge.symbols> + </properties> + <dependencies> + <!-- sorted lexicographically --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-resourcemanager</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-nodemanager</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-tests</artifactId> + <version>${hadoop.version}</version> + <type>test-jar</type> + </dependency> + </dependencies> + </profile> + <!-- Help keep future Hadoop versions munge-free: All profiles below are munge-free: avoid introducing any munge flags on any of the following profiles. --> @@ -917,7 +973,7 @@ under the License. <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> - <version>1.3.2</version> + <version>2.1</version> </dependency> <dependency> <groupId>commons-cli</groupId>
