Updated Branches: refs/heads/master 69105d07b -> 2af9cbae4
CRUNCH-102: Fix GBK and map-only unioned output Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2af9cbae Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2af9cbae Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2af9cbae Branch: refs/heads/master Commit: 2af9cbae45ac44fd0b8b325efe9382973bfe50f1 Parents: 69105d0 Author: Gabriel Reid <[email protected]> Authored: Sat Oct 27 10:13:12 2012 +0200 Committer: Gabriel Reid <[email protected]> Committed: Mon Oct 29 08:07:05 2012 +0100 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/UnionResultsIT.java | 80 +++++++++++++++ .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 27 ++++-- .../org/apache/crunch/impl/mr/plan/Vertex.java | 6 + 3 files changed, 105 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2af9cbae/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java b/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java new file mode 100644 index 0000000..df0511a --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/UnionResultsIT.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Set; + +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.test.CrunchTestSupport; +import org.apache.crunch.types.writable.Writables; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class UnionResultsIT extends CrunchTestSupport implements Serializable { + + static class StringLengthMapFn extends MapFn<String, Pair<String, Long>> { + + @Override + public Pair<String, Long> map(String input) { + return new Pair<String, Long>(input, 10L); + } + } + + + /** + * Tests combining a GBK output with a map-only job output into a single + * unioned collection. + */ + @Test + public void testUnionOfGroupedOutputAndNonGroupedOutput() throws IOException { + String inputPath = tempDir.copyResourceFileName("set1.txt"); + String inputPath2 = tempDir.copyResourceFileName("set2.txt"); + + Pipeline pipeline = new MRPipeline(UnionResultsIT.class); + + PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings())); + PCollection<Pair<String, Long>> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(), + Writables.pairs(Writables.strings(), Writables.longs())); + PCollection<Pair<String, Long>> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count(); + + PCollection<Pair<String, Long>> union = set1Lengths.union(set2Counts); + + List<Pair<String, Long>> unionValues = Lists.newArrayList(union.materialize()); + assertEquals(7, unionValues.size()); + + Set<Pair<String, Long>> expectedPairs = Sets.newHashSet(); + expectedPairs.add(Pair.of("b", 10L)); + expectedPairs.add(Pair.of("c", 10L)); + expectedPairs.add(Pair.of("a", 10L)); + expectedPairs.add(Pair.of("e", 10L)); + expectedPairs.add(Pair.of("a", 1L)); + expectedPairs.add(Pair.of("c", 1L)); + expectedPairs.add(Pair.of("d", 1L)); + + assertEquals(expectedPairs, Sets.newHashSet(unionValues)); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2af9cbae/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index bca0bea..59b95e8 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -19,29 +19,22 @@ package org.apache.crunch.impl.mr.plan; import java.io.IOException; import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import org.apache.crunch.Source; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.impl.mr.MRPipeline; -import org.apache.crunch.impl.mr.collect.DoCollectionImpl; -import org.apache.crunch.impl.mr.collect.DoTableImpl; import org.apache.crunch.impl.mr.collect.InputCollection; import org.apache.crunch.impl.mr.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; -import org.apache.crunch.impl.mr.collect.UnionCollection; import org.apache.crunch.impl.mr.exec.MRExecutor; import org.apache.hadoop.conf.Configuration; import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; @@ -88,6 +81,7 @@ public class MSCRPlanner { // Break the graph up into connected components. List<List<Vertex>> components = graph.connectedComponents(); + // For each component, we will create one or more job prototypes, // depending on its profile. // For dependency handling, we only need to care about which @@ -250,9 +244,25 @@ public class MSCRPlanner { HashMultimap<Target, NodePath> outputPaths = HashMultimap.create(); Set<Vertex> orphans = Sets.newHashSet(); for (Vertex v : component) { - if (!assignment.containsKey(v) && v.isOutput()) { + + // Check if this vertex has multiple inputs but only a subset of + // them have already been assigned + boolean vertexHasUnassignedIncomingEdges = false; + if (v.isOutput()) { + for (Edge e : v.getIncomingEdges()) { + if (!assignment.containsKey(e.getHead())) { + vertexHasUnassignedIncomingEdges = true; + } + } + } + + if (v.isOutput() && (vertexHasUnassignedIncomingEdges || !assignment.containsKey(v))) { orphans.add(v); for (Edge e : v.getIncomingEdges()) { + if (vertexHasUnassignedIncomingEdges && assignment.containsKey(e.getHead())) { + // We've already dealt with this incoming edge + continue; + } orphans.add(e.getHead()); for (NodePath nodePath : e.getNodePaths()) { PCollectionImpl target = nodePath.tail(); @@ -262,6 +272,7 @@ public class MSCRPlanner { } } } + } if (!outputPaths.isEmpty()) { JobPrototype prototype = JobPrototype.createMapOnlyJob( http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2af9cbae/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java index 50efe6a..3404a03 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java @@ -20,6 +20,7 @@ package org.apache.crunch.impl.mr.plan; import java.util.List; import java.util.Set; +import org.apache.commons.lang.builder.ReflectionToStringBuilder; import org.apache.crunch.Source; import org.apache.crunch.impl.mr.collect.InputCollection; import org.apache.crunch.impl.mr.collect.PCollectionImpl; @@ -115,4 +116,9 @@ public class Vertex { public int hashCode() { return 17 + 37 * impl.hashCode(); } + + @Override + public String toString() { + return ReflectionToStringBuilder.toStringExclude(this, Lists.newArrayList("outgoing", "incoming")); + } }
