Updated Branches:
  refs/heads/trunk 67f5f7475 -> b2dff2751

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java 
b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
new file mode 100644
index 0000000..d596413
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/GiraphYarnTask.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.yarn;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.GraphTaskManager;
+
+import org.apache.giraph.io.VertexOutputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.task.MapContextImpl;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * This process will execute the BSP graph tasks alloted to this YARN
+ * execution container. All tasks will be performed by calling the
+ * GraphTaskManager object. Since this GiraphYarnTask will
+ * not be passing data by key-value pairs through the MR framework, the
+ * Mapper parameter types are irrelevant, and set to <code>Object</code> type.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class GiraphYarnTask<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable> {
+  static {
+    Configuration.addDefaultResource(GiraphConstants.GIRAPH_YARN_CONF_FILE);
+  }
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(GiraphYarnTask.class);
+  /** Manage the framework-agnostic Giraph task for this job run */
+  private GraphTaskManager<I, V, E, M> graphTaskManager;
+  /** Giraph task ID number must start @ index 0. Used by ZK, BSP, etc. */
+  private final int bspTaskId;
+  /** A special "dummy" override of Mapper#Context, used to deliver MRv1 deps 
*/
+  private Context proxy;
+  /** Configuration to hand off into Giraph, through wrapper Mapper#Context */
+  private ImmutableClassesGiraphConfiguration conf;
+
+  /**
+   * Constructor. Build our DUMMY MRv1 data structures to pass to our
+   * GiraphTaskManager. This allows us to continue to look the other way
+   * while Giraph relies on MRv1 under the hood.
+   * @param taskAttemptId the MRv1 TaskAttemptID we constructed from CLI args
+   *                      supplied by GiraphApplicationMaster.
+   */
+  public GiraphYarnTask(final TaskAttemptID taskAttemptId) {
+    conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+      new GiraphConfiguration());
+    bspTaskId = taskAttemptId.getTaskID().getId();
+    conf.setInt("mapred.task.partition", bspTaskId);
+    proxy = buildProxyMapperContext(taskAttemptId);
+    graphTaskManager = new GraphTaskManager<I, V, E, M>(proxy);
+  }
+
+  /**
+   * Run one Giraph worker (or master) task, hosted in this execution 
container.
+   */
+  public void run() {
+    // Notify the master quicker if there is worker failure rather than
+    // waiting for ZooKeeper to timeout and delete the ephemeral znodes
+    try {
+      graphTaskManager.setup(null); // defaults GTM to "assume fatjar mode"
+      graphTaskManager.execute();
+      graphTaskManager.cleanup();
+    } catch (InterruptedException ie) {
+      LOG.error("run() caught an unrecoverable InterruptedException.", ie);
+    } catch (IOException ioe) {
+      throw new RuntimeException(
+        "run() caught an unrecoverable IOException.", ioe);
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (RuntimeException e) {
+      // CHECKSTYLE: resume IllegalCatch
+      graphTaskManager.zooKeeperCleanup();
+      graphTaskManager.workerFailureCleanup();
+      throw new RuntimeException(
+        "run: Caught an unrecoverable exception " + e.getMessage(), e);
+    } finally {
+      // YARN: must complete the commit of the final output, Hadoop isn't 
there.
+      finalizeYarnJob();
+    }
+  }
+
+  /**
+   * Without Hadoop MR to finish the consolidation of all the task output from
+   * each HDFS task tmp dir, it won't get done. YARN has some job finalization
+   * it must do "for us." -- AND must delete "jar cache" in HDFS too!
+   */
+  private void finalizeYarnJob() {
+    if (conf.isPureYarnJob() && graphTaskManager.isMaster() &&
+      conf.getVertexOutputFormatClass() != null) {
+      try {
+        LOG.info("Master is ready to commit final job output data.");
+        VertexOutputFormat vertexOutputFormat =
+          conf.createVertexOutputFormat();
+        OutputCommitter outputCommitter =
+          vertexOutputFormat.getOutputCommitter(proxy);
+        // now we will have our output in OUTDIR if all went well...
+        outputCommitter.commitJob(proxy);
+        LOG.info("Master has committed the final job output data.");
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted while attempting to obtain " +
+          "OutputCommitter.", ie);
+      } catch (IOException ioe) {
+        LOG.error("Master task's attempt to commit output has " +
+          "FAILED.", ioe);
+      }
+    }
+  }
+
+  /**
+   * Utility to generate dummy Mapper#Context for use in Giraph internals.
+   * This is the "key hack" to inject MapReduce-related data structures
+   * containing YARN cluster metadata (and our GiraphConf from the AppMaster)
+   * into our Giraph BSP task code.
+   * @param tid the TaskAttemptID to construct this Mapper#Context from.
+   * @return sort of a Mapper#Context if you squint just right.
+   */
+  private Context buildProxyMapperContext(final TaskAttemptID tid) {
+    MapContext mc = new MapContextImpl<Object, Object, Object, Object>(
+      conf, // our Configuration, populated back at the GiraphYarnClient.
+      tid,  // our TaskAttemptId, generated w/YARN app, container, attempt IDs
+      null, // RecordReader here will never be used by Giraph
+      null, // RecordWriter here will never be used by Giraph
+      null, // OutputCommitter here will never be used by Giraph
+      new TaskAttemptContextImpl.DummyReporter() { // goes in task logs for now
+        @Override
+        public void setStatus(String msg) {
+          LOG.info("[STATUS: task-" + bspTaskId + "] " + msg);
+        }
+      },
+      null); // Input split setting here will never be used by Giraph
+
+    // now, we wrap our MapContext ref so we can produce a Mapper#Context
+    WrappedMapper<Object, Object, Object, Object> wrappedMapper
+      = new WrappedMapper<Object, Object, Object, Object>();
+    return wrappedMapper.getMapContext(mc);
+  }
+
+  /**
+    * Default handler for uncaught exceptions.
+    */
+  class OverrideExceptionHandler implements Thread.UncaughtExceptionHandler {
+    @Override
+    public void uncaughtException(final Thread t, final Throwable e) {
+      LOG.fatal(
+        "uncaughtException: OverrideExceptionHandler on thread " +
+         t.getName() + ", msg = " +  e.getMessage() + ", exiting...", e);
+      System.exit(1);
+    }
+  }
+
+  /**
+   * Task entry point.
+   * @param args CLI arguments injected by GiraphApplicationMaster to hand off
+   *             job, task, and attempt ID's to this (and every) Giraph task.
+   *             Args should be: <code>AppId ContainerId AppAttemptId</code>
+   */
+  @SuppressWarnings("rawtypes")
+  public static void main(String[] args) {
+    if (args.length != 4) {
+      throw new IllegalStateException("GiraphYarnTask could not construct " +
+        "a TaskAttemptID for the Giraph job from args: " + printArgs(args));
+    }
+    try {
+      GiraphYarnTask<?, ?, ?, ?> giraphYarnTask =
+        new GiraphYarnTask(getTaskAttemptID(args));
+      giraphYarnTask.run();
+      // CHECKSTYLE: stop IllegalCatch
+    } catch (Throwable t) {
+      // CHECKSTYLE resume IllegalCatch
+      LOG.error("GiraphYarnTask threw a top-level exception, failing task", t);
+      System.exit(2);
+    } // ALWAYS finish a YARN task or AppMaster with System#exit!!!
+    System.exit(0);
+  }
+
+  /**
+   * Utility to create a TaskAttemptId we can feed to our fake Mapper#Context.
+   *
+   * NOTE: ContainerId will serve as MR TaskID for Giraph tasks.
+   * YARN container 1 is always AppMaster, so the least container id we will
+   * ever get from YARN for a Giraph task is container id 2. Giraph on 
MapReduce
+   * tasks must start at index 0. So we SUBTRACT TWO from each container id.
+   *
+   * @param args the command line args, fed to us by GiraphApplicationMaster.
+   * @return the TaskAttemptId object, populated with YARN job data.
+   */
+  private static TaskAttemptID getTaskAttemptID(String[] args) {
+    return new TaskAttemptID(
+      args[0], // YARN ApplicationId Cluster Timestamp
+      Integer.parseInt(args[1]), // YARN ApplicationId #
+      TaskID.getTaskType('m'),  // Make Giraph think this is a Mapper task.
+      Integer.parseInt(args[2]) - 2, // YARN ContainerId MINUS TWO (see above)
+      Integer.parseInt(args[3])); // YARN AppAttemptId #
+  }
+
+  /**
+   * Utility to help log command line args in the event of an error.
+   * @param args the CLI args.
+   * @return a pretty-print of the input args.
+   */
+  private static String printArgs(String[] args) {
+    int count = 0;
+    StringBuilder sb = new StringBuilder();
+    for (String arg : args) {
+      sb.append("arg[" + (count++) + "] == " + arg + ", ");
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java 
b/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
new file mode 100644
index 0000000..aa042e8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/YarnUtils.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.yarn;
+
+import com.google.common.collect.Sets;
+import java.io.FileOutputStream;
+import java.util.Set;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.log4j.Logger;
+
+/**
+ * Utilities that can only compile with versions of Hadoop that support YARN,
+ * so they live here instead of o.a.g.utils package.
+ */
+public class YarnUtils {
+  /** Class Logger */
+  private static final Logger LOG = Logger.getLogger(YarnUtils.class);
+  /** Default dir on HDFS (or equivalent) where LocalResources are stored */
+  private static final String HDFS_RESOURCE_DIR = "giraph_yarn_jar_cache";
+
+  /** Private constructor, this is a utility class only */
+  private YarnUtils() { /* no-op */ }
+
+  /**
+   * Populates the LocalResources list with the HDFS paths listed in
+   * the conf under GiraphConstants.GIRAPH_YARN_LIBJARS, and the
+   * GiraphConfiguration for this job. Also adds the Giraph default application
+   * jar as determined by GiraphYarnClient.GIRAPH_CLIENT_JAR constant.
+   * @param map the LocalResources list to populate.
+   * @param giraphConf the configuration to use to select jars to include.
+   * @param appId the ApplicationId, naming the the HDFS base dir for job jars.
+   */
+  public static void addFsResourcesToMap(Map<String, LocalResource> map,
+    GiraphConfiguration giraphConf, ApplicationId appId) throws IOException {
+    FileSystem fs = FileSystem.get(giraphConf);
+    Path baseDir = YarnUtils.getFsCachePath(fs, appId);
+    boolean coreJarFound = false;
+    for (String fileName : giraphConf.getYarnLibJars().split(",")) {
+      if (fileName.length() > 0) {
+        Path filePath = new Path(baseDir, fileName);
+        LOG.info("Adding " + fileName + " to LocalResources for export.");
+        if (fileName.contains("giraph-core")) {
+          coreJarFound = true;
+        }
+        addFileToResourceMap(map, fs, filePath);
+      }
+    }
+    if (!coreJarFound) { // OK if you are running giraph-examples-jar-with-deps
+      LOG.warn("Job jars (-yj option) didn't include giraph-core.");
+    }
+    Path confPath = new Path(baseDir, GiraphConstants.GIRAPH_YARN_CONF_FILE);
+    addFileToResourceMap(map, fs, confPath);
+  }
+
+  /**
+   * Utility function to locate local JAR files and other resources
+   * recursively in the dirs on the local CLASSPATH. Once all the files
+   * named in <code>fileNames</code> are found, we stop and return the results.
+   * @param fileNames the file name of the jars, without path information.
+   * @return a set of Paths to the jar files requested in fileNames.
+   */
+  public static Set<Path> getLocalFiles(final Set<String> fileNames) {
+    Set<Path> jarPaths = Sets.newHashSet();
+    String classPath = ".:" + System.getenv("HADOOP_HOME");
+    if (classPath.length() > 2) {
+      classPath += ":";
+    }
+    classPath += System.getenv("CLASSPATH");
+    for (String baseDir : classPath.split(":")) {
+      if (baseDir.length() > 0) {
+        // lose the globbing chars that will fail in File#listFiles
+        final int lastFileSep = baseDir.lastIndexOf("/");
+        if (lastFileSep > 0) {
+          String test = baseDir.substring(lastFileSep);
+          if (test.contains("*")) {
+            baseDir = baseDir.substring(0, lastFileSep);
+          }
+        }
+        populateJarList(new File(baseDir), jarPaths, fileNames);
+      }
+      if (jarPaths.size() >= fileNames.size()) {
+        break; // found a resource for each name in the input set, all done
+      }
+    }
+    return jarPaths;
+  }
+
+  /**
+   * Start in the working directory and recursively locate all jars.
+   * @param dir current directory to explore.
+   * @param fileSet the list to populate.
+   * @param fileNames file names to locate.
+   */
+  private static void populateJarList(final File dir,
+    final Set<Path> fileSet, final Set<String> fileNames) {
+    File[] filesInThisDir = dir.listFiles();
+    if (null == filesInThisDir) {
+      return;
+    }
+    for (File f : dir.listFiles()) {
+      if (f.isDirectory()) {
+        populateJarList(f, fileSet, fileNames);
+      } else if (f.isFile() && fileNames.contains(f.getName())) {
+        fileSet.add(new Path(f.getAbsolutePath()));
+      }
+    }
+  }
+
+  /**
+   * Boilerplate to add a file to the local resources..
+   * @param localResources the LocalResources map to populate.
+   * @param fs handle to the HDFS file system.
+   * @param target the file to send to the remote container.
+   */
+  public static void addFileToResourceMap(Map<String, LocalResource>
+    localResources, FileSystem fs, Path target)
+    throws IOException {
+    LocalResource resource = Records.newRecord(LocalResource.class);
+    FileStatus destStatus = fs.getFileStatus(target);
+    resource.setResource(ConverterUtils.getYarnUrlFromURI(target.toUri()));
+    resource.setSize(destStatus.getLen());
+    resource.setTimestamp(destStatus.getModificationTime());
+    resource.setType(LocalResourceType.FILE); // use FILE, even for jars!
+    resource.setVisibility(LocalResourceVisibility.APPLICATION);
+    localResources.put(target.getName(), resource);
+    LOG.info("Registered file in LocalResources: " + target.getName());
+  }
+
+  /**
+   * Get the base HDFS dir we will be storing our LocalResources in.
+   * @param fs the file system.
+   * @param appId the ApplicationId under which our resources will be stored.
+   * @return the path
+   */
+  public static Path getFsCachePath(final FileSystem fs,
+    final ApplicationId appId) {
+    return new Path(fs.getHomeDirectory(), HDFS_RESOURCE_DIR + "/" + appId);
+  }
+
+  /**
+   * Popuate the environment string map to be added to the environment vars
+   * in a remote execution container. Adds the local classpath to pick up
+   * "yarn-site.xml" and "mapred-site.xml" stuff.
+   * @param env the map of env var values.
+   * @param giraphConf the GiraphConfiguration to pull values from.
+   */
+  public static void addLocalClasspathToEnv(final Map<String, String> env,
+    final GiraphConfiguration giraphConf) {
+    StringBuilder classPathEnv = new StringBuilder("${CLASSPATH}:./*");
+    for (String cpEntry : giraphConf.getStrings(
+      YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+      YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+      classPathEnv.append(':').append(cpEntry.trim());
+    }
+    for (String cpEntry : giraphConf.getStrings(
+      MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
+      MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) {
+      classPathEnv.append(':').append(cpEntry.trim());
+    }
+    // add the runtime classpath needed for tests to work
+    if (giraphConf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+      classPathEnv.append(':').append(System.getenv("CLASSPATH"));
+    }
+    env.put("CLASSPATH", classPathEnv.toString());
+  }
+
+  /**
+   * Populate the LocalResources list with the GiraphConf XML file's HDFS path.
+   * @param giraphConf the GiraphConfifuration to export for worker tasks.
+   * @param appId the ApplicationId for this YARN app.
+   * @param localResourceMap the LocalResource map of files to export to tasks.
+   */
+  public static void addGiraphConfToLocalResourceMap(GiraphConfiguration
+    giraphConf, ApplicationId appId, Map<String, LocalResource>
+    localResourceMap) throws IOException {
+    FileSystem fs = FileSystem.get(giraphConf);
+    Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
+      GiraphConstants.GIRAPH_YARN_CONF_FILE);
+    YarnUtils.addFileToResourceMap(localResourceMap, fs, hdfsConfPath);
+  }
+
+  /**
+   * Export our populated GiraphConfiguration as an XML file to be used by the
+   * ApplicationMaster's exec container, and register it with LocalResources.
+   * @param giraphConf the current Configuration object to be published.
+   * @param appId the ApplicationId to stamp this app's base HDFS resources 
dir.
+   */
+  public static void exportGiraphConfiguration(GiraphConfiguration giraphConf,
+    ApplicationId appId) throws IOException {
+    File confFile = new File(System.getProperty("java.io.tmpdir"),
+      GiraphConstants.GIRAPH_YARN_CONF_FILE);
+    if (confFile.exists()) {
+      confFile.delete();
+    }
+    String localConfPath = confFile.getAbsolutePath();
+    FileOutputStream fos = null;
+    try {
+      fos = new FileOutputStream(localConfPath);
+      giraphConf.writeXml(fos);
+      FileSystem fs = FileSystem.get(giraphConf);
+      Path hdfsConfPath = new Path(YarnUtils.getFsCachePath(fs, appId),
+        GiraphConstants.GIRAPH_YARN_CONF_FILE);
+      fos.flush();
+      fs.copyFromLocalFile(false, true, new Path(localConfPath), hdfsConfPath);
+    } finally {
+      if (null != fos) {
+        fos.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java 
b/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java
new file mode 100644
index 0000000..c8a3683
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/yarn/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Catch all package for YARN-specific code. Only compiles under Maven
+ * hadoop_yarn profile.
+ */
+package org.apache.giraph.yarn;

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java 
b/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
new file mode 100644
index 0000000..efd1179
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.yarn;
+
+import static org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import junit.framework.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.formats.GiraphFileInputFormat;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntIntNullIntTextInputFormat;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+
+import org.junit.Test;
+
+
+/**
+ * Tests the Giraph on YARN workflow. Basically, the plan is to use a
+ * <code>MiniYARNCluster</code> to run a small test job through our
+ * GiraphYarnClient -> GiraphApplicationMaster -> GiraphYarnTask (2 no-ops)
+ * No "real" BSP code need be tested here, as it is not aware it is running on
+ * YARN once the job is in progress, so the existing MRv1 BSP tests are fine.
+ */
+public class TestYarnJob implements Watcher {
+  private static final Logger LOG = Logger.getLogger(TestYarnJob.class);
+  /**
+   * Simple No-Op vertex to test if we can run a quick Giraph job on YARN.
+   */
+  private static class DummyYarnVertex extends Vertex<IntWritable, IntWritable,
+      NullWritable, IntWritable> {
+    @Override
+    public void compute(Iterable<IntWritable> messages) throws IOException {
+      voteToHalt();
+    }
+  }
+
+  /** job name for this integration test */
+  private static final String JOB_NAME = "giraph-TestPureYarnJob";
+  /** ZooKeeper port to use for tests, avoiding InternalVertexRunner's port */
+  private static final int LOCAL_ZOOKEEPER_PORT = 22183;
+  /** ZooKeeper list system property */
+  private static final String zkList = "localhost:" + LOCAL_ZOOKEEPER_PORT;
+  /** Local ZK working dir, avoid InternalVertexRunner naming */
+  private static final String zkDirName = "_bspZooKeeperYarn";
+  /** Local ZK Manager working dir, avoid InternalVertexRunner naming */
+  private static final String zkMgrDirName = "_defaultZooKeeperManagerYarn";
+
+  /** Temp ZK base working dir for integration test */
+  private File testBaseDir = null;
+  /** Fake input dir for integration test */
+  private File inputDir = null;
+  /** Fake output dir for integration test */
+  private File outputDir = null;
+  /** Temp ZK working dir for integration test */
+  private File zkDir = null;
+  /** Temp ZK Manager working dir for integration test */
+  private File zkMgrDir = null;
+  /** Internal ZooKeeper instance for integration test run */
+  private InternalZooKeeper zookeeper;
+  /** For running the ZK instance locally */
+  private ExecutorService exec = Executors.newSingleThreadExecutor();
+  /** GiraphConfiguration for a "fake YARN job" */
+  private GiraphConfiguration conf = null;
+  /** Counter for # of znode events during integration test */
+  private int zkEventCount = 0;
+  /** Our YARN test cluster for local integration test */
+  private MiniYARNCluster cluster = null;
+
+  @Test
+  public void testPureYarnJob() {
+    try {
+      setupYarnConfiguration();
+      initLocalZookeeper();
+      initYarnCluster();
+      GiraphYarnClient testGyc = new GiraphYarnClient(conf, JOB_NAME);
+      Assert.assertTrue(testGyc.run(true));
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail("Caught exception in TestYarnJob: " + e);
+    } finally {
+      zookeeper.end();
+      exec.shutdown();
+      cluster.stop();
+      deleteTempDirectories();
+    }
+  }
+
+  /**
+   * Logging this stuff will help you debug integration test issues.
+   * @param zkEvent incoming event for our current test ZK's znode tree.
+   */
+  @Override
+  public void process(WatchedEvent zkEvent) {
+    String event = zkEvent == null ? "NULL" : zkEvent.toString();
+    LOG.info("TestYarnJob observed ZK event: " + event +
+      " for a total of " + (++zkEventCount) + " so far.");
+  }
+
+  /**
+   * Delete our temp dir so checkstyle and rat plugins are happy.
+   */
+  private void deleteTempDirectories() {
+    try {
+      if (testBaseDir != null && testBaseDir.exists()) {
+        FileUtils.deleteDirectory(testBaseDir);
+      }
+    } catch (IOException ioe) {
+      LOG.error("TestYarnJob#deleteTempDirectories() FAIL at: " + testBaseDir);
+    }
+  }
+
+  /**
+   * Initialize a local ZK instance for our test run.
+   */
+  private void initLocalZookeeper() throws IOException {
+    zookeeper = new InternalZooKeeper();
+    exec.execute(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          // Configure a local zookeeper instance
+          Properties zkProperties = generateLocalZkProperties();
+          QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+          qpConfig.parseProperties(zkProperties);
+          // run the zookeeper instance
+          final ServerConfig zkConfig = new ServerConfig();
+          zkConfig.readFrom(qpConfig);
+          zookeeper.runFromConfig(zkConfig);
+        } catch (QuorumPeerConfig.ConfigException qpcce) {
+          throw new RuntimeException("parse of generated ZK config file " +
+                                       "has failed.", qpcce);
+        } catch (IOException e) {
+          e.printStackTrace();
+          throw new RuntimeException("initLocalZookeeper in TestYarnJob: ", e);
+        }
+      }
+
+      /**
+       * Returns pre-created ZK conf properties for Giraph integration test.
+       * @return the populated properties sheet.
+       */
+      Properties generateLocalZkProperties() {
+        Properties zkProperties = new Properties();
+        zkProperties.setProperty("tickTime", "2000");
+        zkProperties.setProperty("dataDir", zkDir.getAbsolutePath());
+        zkProperties.setProperty("clientPort",
+                                  String.valueOf(LOCAL_ZOOKEEPER_PORT));
+        zkProperties.setProperty("maxClientCnxns", "10000");
+        zkProperties.setProperty("minSessionTimeout", "10000");
+        zkProperties.setProperty("maxSessionTimeout", "100000");
+        zkProperties.setProperty("initLimit", "10");
+        zkProperties.setProperty("syncLimit", "5");
+        zkProperties.setProperty("snapCount", "50000");
+        return zkProperties;
+      }
+    });
+  }
+
+  /**
+   * Set up the GiraphConfiguration settings we need to run a no-op Giraph
+   * job on a MiniYARNCluster as an integration test. Some YARN-specific
+   * flags are set inside GiraphYarnClient and won't need to be set here.
+   */
+  private void setupYarnConfiguration() throws IOException {
+    conf = new GiraphConfiguration();
+    conf.setWorkerConfiguration(1, 1, 100.0f);
+    conf.setMaxMasterSuperstepWaitMsecs(30 * 1000);
+    conf.setEventWaitMsecs(3 * 1000);
+    conf.setYarnLibJars(""); // no need
+    conf.setYarnTaskHeapMb(256); // small since no work to be done
+    conf.setVertexClass(DummyYarnVertex.class);
+    conf.setVertexInputFormatClass(IntIntNullIntTextInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    conf.setNumComputeThreads(1);
+    conf.setMaxTaskAttempts(1);
+    conf.setNumInputSplitsThreads(1);
+    // Giraph on YARN only ever things its running in "non-local" mode
+    conf.setLocalTestMode(false);
+    // this has to happen here before we populate the conf with the temp dirs
+    setupTempDirectories();
+    conf.set(OUTDIR, new Path(outputDir.getAbsolutePath()).toString());
+    GiraphFileInputFormat.addVertexInputPath(conf, new 
Path(inputDir.getAbsolutePath()));
+    // hand off the ZK info we just created to our no-op job
+    GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.set(conf, 500);
+    conf.setZooKeeperConfiguration(zkList);
+    conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.getAbsolutePath());
+    GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, 
zkMgrDir.getAbsolutePath());
+    // without this, our "real" client won't connect w/"fake" YARN cluster
+    conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+  }
+
+  /**
+   * Initialize the temp dir tree for ZK and I/O for no-op integration test.
+   */
+  private void setupTempDirectories() throws IOException {
+    try {
+    testBaseDir =
+      new File(System.getProperty("user.dir"), JOB_NAME);
+    if (testBaseDir.exists()) {
+      testBaseDir.delete();
+    }
+    testBaseDir.mkdir();
+    inputDir = new File(testBaseDir, "yarninput");
+    if (inputDir.exists()) {
+      inputDir.delete();
+    }
+    inputDir.mkdir();
+    File inFile = new File(inputDir, "graph_data.txt");
+    inFile.createNewFile();
+    outputDir = new File(testBaseDir, "yarnoutput");
+    if (outputDir.exists()) {
+      outputDir.delete();
+    } // don't actually produce the output dir, let Giraph On YARN do it
+    zkDir = new File(testBaseDir, zkDirName);
+    if (zkDir.exists()) {
+      zkDir.delete();
+    }
+    zkDir.mkdir();
+    zkMgrDir = new File(testBaseDir, zkMgrDirName);
+    if (zkMgrDir.exists()) {
+      zkMgrDir.delete();
+    }
+    zkMgrDir.mkdir();
+    } catch (IOException ioe) {
+      ioe.printStackTrace();
+      throw new IOException("from setupTempDirectories: ", ioe);
+    }
+  }
+
+  /**
+   * Initialize the MiniYARNCluster for the integration test.
+   */
+  private void initYarnCluster() {
+    cluster = new MiniYARNCluster(TestYarnJob.class.getName(), 1, 1, 1);
+    cluster.init(new ImmutableClassesGiraphConfiguration(conf));
+    cluster.start();
+  }
+
+  /**
+   * Extension of {@link ZooKeeperServerMain} that allows programmatic shutdown
+   */
+  class InternalZooKeeper extends ZooKeeperServerMain {
+    /**
+     * Shutdown the ZooKeeper instance.
+     */
+    void end() {
+      shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-core/src/test/resources/capacity-scheduler.xml
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/resources/capacity-scheduler.xml 
b/giraph-core/src/test/resources/capacity-scheduler.xml
new file mode 100644
index 0000000..5e19b9a
--- /dev/null
+++ b/giraph-core/src/test/resources/capacity-scheduler.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<configuration>
+
+  <property>
+    <name>yarn.scheduler.capacity.root.queues</name>
+    <value>unfunded,default</value>
+  </property>
+  
+  <property>
+    <name>yarn.scheduler.capacity.root.capacity</name>
+    <value>100</value>
+  </property>
+  
+  <property>
+    <name>yarn.scheduler.capacity.root.unfunded.capacity</name>
+    <value>50</value>
+  </property>
+  
+  <property>
+    <name>yarn.scheduler.capacity.root.default.capacity</name>
+    <value>50</value>
+  </property>
+
+</configuration>
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/giraph-examples/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml
index 3b6a08c..21e8ccf 100644
--- a/giraph-examples/pom.xml
+++ b/giraph-examples/pom.xml
@@ -104,6 +104,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
           </plugin>
@@ -116,6 +128,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
           </plugin>
@@ -128,6 +152,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
             <configuration>
@@ -152,6 +188,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
             <configuration>
@@ -180,6 +228,18 @@ under the License.
       <build>
         <plugins>
           <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+          <plugin>
             <groupId>org.sonatype.plugins</groupId>
             <artifactId>munge-maven-plugin</artifactId>
           </plugin>
@@ -191,24 +251,119 @@ under the License.
       </build>
     </profile>
 
+    <!-- Currently supports hadoop-2.0.2-alpha
+      (see hadoop_yarn profile in giraph-parent POM to change) -->
+    <profile>
+      <id>hadoop_yarn</id>
+      <build>
+        <plugins>
+          <plugin>
+              <groupId>org.sonatype.plugins</groupId>
+              <artifactId>munge-maven-plugin</artifactId>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
+    <!-- The profiles below do not (yet) include any munge flags -->
     <profile>
       <id>hadoop_2.0.0</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <profile>
       <id>hadoop_2.0.1</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <profile>
       <id>hadoop_2.0.2</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <profile>
-        <id>hadoop_2.0.3</id>
+      <id>hadoop_2.0.3</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
 
     <profile>
       <id>hadoop_trunk</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**/yarn/**</exclude>
+              </excludes>
+              <testExcludes>
+                <exclude>**/yarn/**</exclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
   </profiles>
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/b2dff275/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 446aa2a..c883d41 100644
--- a/pom.xml
+++ b/pom.xml
@@ -496,7 +496,7 @@ under the License.
                 <exclude>CODE_CONVENTIONS</exclude>
                 <!-- generated content -->
                 <exclude>**/target/**</exclude>
-                <exclude>_bsp/**</exclude>
+                <exclude>/_bsp/**</exclude>
                 <exclude>.checkstyle</exclude>
                 <!-- source control and IDEs -->
                 <exclude>.reviewboardrc</exclude>
@@ -506,7 +506,9 @@ under the License.
                 <exclude>.idea/**</exclude>
                 <exclude>**/*.iml</exclude>
                 <exclude>**/*.ipr</exclude>
-             </excludes>
+                <!-- test resources (for Giraph on YARN profile) -->
+                <exclude>**/test/resources/**</exclude>
+              </excludes>
           </configuration>
         </plugin>
         <plugin>
@@ -749,6 +751,60 @@ under the License.
       </dependencies>
     </profile>
 
+    <!-- This profile runs on Hadoop-2.0.3-alpha by default, but does not
+      use Hadoop MapReduce v2 to set up the Giraph job. This means the Giraph
+      worker/master tasks are not Mappers. Tasks are run in YARN-managed 
execution
+      containers. Internally, the Giraph framework continues to depend on many 
Hadoop
+      MapReduce classes to perform work. -->
+    <profile>
+      <id>hadoop_yarn</id>
+      <properties>
+        <hadoop.version>2.0.3-alpha</hadoop.version>
+        <munge.symbols>PURE_YARN</munge.symbols>
+      </properties>
+      <dependencies>
+        <!-- sorted lexicographically -->
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-common</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-common</artifactId>
+             <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+             <groupId>org.apache.hadoop</groupId>
+             <artifactId>hadoop-yarn-server-common</artifactId>
+             <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+             <groupId>org.apache.hadoop</groupId>
+             <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+             <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+             <groupId>org.apache.hadoop</groupId>
+             <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+             <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+             <groupId>org.apache.hadoop</groupId>
+             <artifactId>hadoop-yarn-server-tests</artifactId>
+             <version>${hadoop.version}</version>
+          <type>test-jar</type>
+        </dependency>
+      </dependencies>
+    </profile>
+
     <!-- Help keep future Hadoop versions munge-free:
          All profiles below are munge-free: avoid introducing any munge
          flags on any of the following profiles. -->
@@ -917,7 +973,7 @@ under the License.
       <dependency>
         <groupId>commons-io</groupId>
         <artifactId>commons-io</artifactId>
-        <version>1.3.2</version>
+        <version>2.1</version>
       </dependency>
       <dependency>
         <groupId>commons-cli</groupId>

Reply via email to