Updated Branches:
  refs/heads/master 69105d07b -> 2af9cbae4

CRUNCH-102: Fix GBK and map-only unioned output


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2af9cbae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2af9cbae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2af9cbae

Branch: refs/heads/master
Commit: 2af9cbae45ac44fd0b8b325efe9382973bfe50f1
Parents: 69105d0
Author: Gabriel Reid <[email protected]>
Authored: Sat Oct 27 10:13:12 2012 +0200
Committer: Gabriel Reid <[email protected]>
Committed: Mon Oct 29 08:07:05 2012 +0100

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/UnionResultsIT.java  |   80 +++++++++++++++
 .../apache/crunch/impl/mr/plan/MSCRPlanner.java    |   27 ++++--
 .../org/apache/crunch/impl/mr/plan/Vertex.java     |    6 +
 3 files changed, 105 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2af9cbae/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java 
b/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java
new file mode 100644
index 0000000..df0511a
--- /dev/null
+++ b/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.At;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class UnionResultsIT extends CrunchTestSupport implements Serializable {
+
+  static class StringLengthMapFn extends MapFn<String, Pair<String, Long>> {
+
+    @Override
+    public Pair<String, Long> map(String input) {
+      return new Pair<String, Long>(input, 10L);
+    }
+  }
+
+
+  /**
+   * Tests combining a GBK output with a map-only job output into a single
+   * unioned collection.
+   */
+  @Test
+  public void testUnionOfGroupedOutputAndNonGroupedOutput() throws IOException 
{
+    String inputPath = tempDir.copyResourceFileName("set1.txt");
+    String inputPath2 = tempDir.copyResourceFileName("set2.txt");
+
+    Pipeline pipeline = new MRPipeline(UnionResultsIT.class);
+
+    PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, 
Writables.strings()));
+    PCollection<Pair<String, Long>> set1Lengths = set1Lines.parallelDo(new 
StringLengthMapFn(),
+        Writables.pairs(Writables.strings(), Writables.longs()));
+    PCollection<Pair<String, Long>> set2Counts = 
pipeline.read(At.textFile(inputPath2, Writables.strings())).count();
+
+    PCollection<Pair<String, Long>> union = set1Lengths.union(set2Counts);
+
+    List<Pair<String, Long>> unionValues = 
Lists.newArrayList(union.materialize());
+    assertEquals(7, unionValues.size());
+
+    Set<Pair<String, Long>> expectedPairs = Sets.newHashSet();
+    expectedPairs.add(Pair.of("b", 10L));
+    expectedPairs.add(Pair.of("c", 10L));
+    expectedPairs.add(Pair.of("a", 10L));
+    expectedPairs.add(Pair.of("e", 10L));
+    expectedPairs.add(Pair.of("a", 1L));
+    expectedPairs.add(Pair.of("c", 1L));
+    expectedPairs.add(Pair.of("d", 1L));
+
+    assertEquals(expectedPairs, Sets.newHashSet(unionValues));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2af9cbae/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git 
a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java 
b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index bca0bea..59b95e8 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -19,29 +19,22 @@ package org.apache.crunch.impl.mr.plan;
 
 import java.io.IOException;
 import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 
-import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.impl.mr.collect.DoCollectionImpl;
-import org.apache.crunch.impl.mr.collect.DoTableImpl;
 import org.apache.crunch.impl.mr.collect.InputCollection;
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
-import org.apache.crunch.impl.mr.collect.UnionCollection;
 import org.apache.crunch.impl.mr.exec.MRExecutor;
 import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
@@ -88,6 +81,7 @@ public class MSCRPlanner {
     // Break the graph up into connected components.
     List<List<Vertex>> components = graph.connectedComponents();
     
+
     // For each component, we will create one or more job prototypes,
     // depending on its profile.
     // For dependency handling, we only need to care about which
@@ -250,9 +244,25 @@ public class MSCRPlanner {
       HashMultimap<Target, NodePath> outputPaths = HashMultimap.create();
       Set<Vertex> orphans = Sets.newHashSet();
       for (Vertex v : component) {
-        if (!assignment.containsKey(v) && v.isOutput()) {
+
+        // Check if this vertex has multiple inputs but only a subset of
+        // them have already been assigned
+        boolean vertexHasUnassignedIncomingEdges = false;
+        if (v.isOutput()) {
+          for (Edge e : v.getIncomingEdges()) {
+            if (!assignment.containsKey(e.getHead())) {
+              vertexHasUnassignedIncomingEdges = true;
+            }
+          }
+        }
+
+        if (v.isOutput() && (vertexHasUnassignedIncomingEdges || 
!assignment.containsKey(v))) {
           orphans.add(v);
           for (Edge e : v.getIncomingEdges()) {
+            if (vertexHasUnassignedIncomingEdges && 
assignment.containsKey(e.getHead())) {
+              // We've already dealt with this incoming edge
+              continue;
+            }
             orphans.add(e.getHead());
             for (NodePath nodePath : e.getNodePaths()) {
               PCollectionImpl target = nodePath.tail();
@@ -262,6 +272,7 @@ public class MSCRPlanner {
             }
           }
         }
+
       }
       if (!outputPaths.isEmpty()) {
         JobPrototype prototype = JobPrototype.createMapOnlyJob(

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2af9cbae/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java 
b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
index 50efe6a..3404a03 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java
@@ -20,6 +20,7 @@ package org.apache.crunch.impl.mr.plan;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
 import org.apache.crunch.Source;
 import org.apache.crunch.impl.mr.collect.InputCollection;
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
@@ -115,4 +116,9 @@ public class Vertex {
   public int hashCode() {
     return 17 + 37 * impl.hashCode();
   }
+
+  @Override
+  public String toString() {
+    return ReflectionToStringBuilder.toStringExclude(this, 
Lists.newArrayList("outgoing", "incoming"));
+  }
 }

Reply via email to