Updated Branches:
  refs/heads/master 1c58b6fb2 -> 1897ae82f

CRUNCH-82: Added support non HDFS OutputFormats when writing to multiple 
targets.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/1897ae82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/1897ae82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/1897ae82

Branch: refs/heads/master
Commit: 1897ae82fdb94e9c6bd4314dee0c6dfa165ae5e0
Parents: 1c58b6f
Author: Robert Chu <[email protected]>
Authored: Mon Sep 17 16:35:56 2012 -0700
Committer: Robert Chu <[email protected]>
Committed: Fri Sep 28 14:46:42 2012 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/io/hbase/HBaseTarget.java    |   19 +++++++++-
 .../org/apache/crunch/impl/mr/exec/CrunchJob.java  |   10 +++--
 .../crunch/impl/mr/plan/MSCROutputHandler.java     |   26 ++++++++-------
 3 files changed, 37 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1897ae82/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git 
a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java 
b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 050cff1..c659c86 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
 import org.apache.crunch.io.MapReduceTarget;
 import org.apache.crunch.io.OutputHandler;
@@ -29,9 +30,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 public class HBaseTarget implements MapReduceTarget {
 
@@ -75,15 +78,27 @@ public class HBaseTarget implements MapReduceTarget {
 
   @Override
   public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, 
String name) {
-    Configuration conf = job.getConfiguration();
+    final Configuration conf = job.getConfiguration();
     HBaseConfiguration.addHbaseResources(conf);
-    job.setOutputFormatClass(TableOutputFormat.class);
     conf.set(TableOutputFormat.OUTPUT_TABLE, table);
+
     try {
       TableMapReduceUtil.addDependencyJars(job);
+      FileOutputFormat.setOutputPath(job, outputPath);
     } catch (IOException e) {
       throw new CrunchRuntimeException(e);
     }
+
+    if (null == name) {
+      job.setOutputFormatClass(TableOutputFormat.class);
+      job.setOutputKeyClass(ImmutableBytesWritable.class);
+      job.setOutputValueClass(Put.class);
+    } else {
+      CrunchMultipleOutputs.addNamedOutput(job, name,
+          TableOutputFormat.class,
+          ImmutableBytesWritable.class,
+          Put.class);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1897ae82/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java 
b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
index 85fae63..74c6ff3 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java
@@ -18,7 +18,7 @@
 package org.apache.crunch.impl.mr.exec;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +38,7 @@ public class CrunchJob extends CrunchControlledJob {
   private final Log log = LogFactory.getLog(CrunchJob.class);
 
   private final Path workingPath;
-  private final List<Path> multiPaths;
+  private final Map<Integer, Path> multiPaths;
   private final boolean mapOnlyJob;
 
   public CrunchJob(Job job, Path workingPath, MSCROutputHandler handler) 
throws IOException {
@@ -53,10 +53,12 @@ public class CrunchJob extends CrunchControlledJob {
       // Need to handle moving the data from the output directory of the
       // job to the output locations specified in the paths.
       FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());
-      for (int i = 0; i < multiPaths.size(); i++) {
+      for (Map.Entry<Integer, Path> entry : multiPaths.entrySet()) {
+        final int i = entry.getKey();
+        final Path dst = entry.getValue();
+
         Path src = new Path(workingPath, 
PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
         Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
-        Path dst = multiPaths.get(i);
         FileSystem dstFs = dst.getFileSystem(job.getConfiguration());
         if (!dstFs.exists(dst)) {
           dstFs.mkdirs(dst);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1897ae82/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java 
b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
index bfd3e26..b6a41da 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCROutputHandler.java
@@ -17,7 +17,7 @@
  */
 package org.apache.crunch.impl.mr.plan;
 
-import java.util.List;
+import java.util.Map;
 
 import org.apache.crunch.Target;
 import org.apache.crunch.io.MapReduceTarget;
@@ -27,7 +27,7 @@ import org.apache.crunch.types.PType;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 
-import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public class MSCROutputHandler implements OutputHandler {
 
@@ -36,13 +36,14 @@ public class MSCROutputHandler implements OutputHandler {
   private final boolean mapOnlyJob;
 
   private DoNode workingNode;
-  private List<Path> multiPaths;
+  private Map<Integer, Path> multiPaths;
+  private int jobCount;
 
   public MSCROutputHandler(Job job, Path outputPath, boolean mapOnlyJob) {
     this.job = job;
     this.path = outputPath;
     this.mapOnlyJob = mapOnlyJob;
-    this.multiPaths = Lists.newArrayList();
+    this.multiPaths = Maps.newHashMap();
   }
 
   public void configureNode(DoNode node, Target target) {
@@ -51,17 +52,18 @@ public class MSCROutputHandler implements OutputHandler {
   }
 
   public boolean configure(Target target, PType<?> ptype) {
-    if (target instanceof MapReduceTarget && target instanceof PathTarget) {
-      String name = PlanningParameters.MULTI_OUTPUT_PREFIX + multiPaths.size();
-      multiPaths.add(((PathTarget) target).getPath());
+    if (target instanceof MapReduceTarget) {
+      if (target instanceof PathTarget) {
+        multiPaths.put(jobCount, ((PathTarget) target).getPath());
+      }
+
+      String name = PlanningParameters.MULTI_OUTPUT_PREFIX + jobCount;
+      jobCount++;
       workingNode.setOutputName(name);
       ((MapReduceTarget) target).configureForMapReduce(job, ptype, path, name);
       return true;
     }
-    if (target instanceof MapReduceTarget) {
-      ((MapReduceTarget) target).configureForMapReduce(job, ptype, null, null);
-      return true;
-    }
+
     return false;
   }
 
@@ -69,7 +71,7 @@ public class MSCROutputHandler implements OutputHandler {
     return mapOnlyJob;
   }
 
-  public List<Path> getMultiPaths() {
+  public Map<Integer, Path> getMultiPaths() {
     return multiPaths;
   }
 }

Reply via email to