CRUNCH-67: Fix planning for jobs that have map-side outputs
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/28e51b6a Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/28e51b6a Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/28e51b6a Branch: refs/heads/master Commit: 28e51b6a4505ff406c0d9472303c28cd2e2d6aaa Parents: 7cc16e3 Author: Josh Wills <[email protected]> Authored: Tue Sep 18 20:32:38 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Sep 18 20:32:38 2012 -0700 ---------------------------------------------------------------------- .../src/it/java/org/apache/crunch/CleanTextIT.java | 79 +++++++++++++++ .../java/org/apache/crunch/impl/mr/plan/Graph.java | 15 ++- .../apache/crunch/impl/mr/plan/GraphBuilder.java | 6 +- .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 71 +++++++++++-- .../org/apache/crunch/impl/mr/plan/Vertex.java | 10 ++ 5 files changed, 161 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/28e51b6a/crunch/src/it/java/org/apache/crunch/CleanTextIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/CleanTextIT.java b/crunch/src/it/java/org/apache/crunch/CleanTextIT.java new file mode 100644 index 0000000..86a1a8d --- /dev/null +++ b/crunch/src/it/java/org/apache/crunch/CleanTextIT.java @@ -0,0 +1,79 @@ +/** + * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved. + * + * Cloudera, Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"). You may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for + * the specific language governing permissions and limitations under the + * License. + */ +package org.apache.crunch; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.nio.charset.Charset; +import java.util.List; + +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.To; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.avro.Avros; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.io.Files; + +/** + * + */ +public class CleanTextIT { + + private static final int LINES_IN_SHAKES = 3667; + + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + static DoFn<String, String> CLEANER = new DoFn<String, String>() { + @Override + public void process(String input, Emitter<String> emitter) { + emitter.emit(input.toLowerCase()); + } + }; + + static DoFn<String, String> SPLIT = new DoFn<String, String>() { + @Override + public void process(String input, Emitter<String> emitter) { + for (String word : input.split("\\S+")) { + if (!word.isEmpty()) { + emitter.emit(word); + } + } + } + }; + + @Test + public void testMapSideOutputs() throws Exception { + Pipeline pipeline = new MRPipeline(CleanTextIT.class, tmpDir.getDefaultConfiguration()); + String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt"); + PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath); + + PCollection<String> cleanShakes = shakespeare.parallelDo(CLEANER, Avros.strings()); + File cso = tmpDir.getFile("cleanShakes"); + cleanShakes.write(To.textFile(cso.getAbsolutePath())); + + File wc = tmpDir.getFile("wordCounts"); + cleanShakes.parallelDo(SPLIT, Avros.strings()).count().write(To.textFile(wc.getAbsolutePath())); + pipeline.done(); + + File cleanFile = new File(cso, "part-m-00000"); + List<String> lines = Files.readLines(cleanFile, Charset.defaultCharset()); + assertEquals(LINES_IN_SHAKES, lines.size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/28e51b6a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java index 93ba2bf..d634c7e 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Graph.java @@ -14,9 +14,7 @@ */ package org.apache.crunch.impl.mr.plan; -import java.util.Collection; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -47,13 +45,20 @@ public class Graph implements Iterable<Vertex> { public Vertex getVertexAt(PCollectionImpl impl) { return vertices.get(impl); } - - public Vertex addVertex(PCollectionImpl impl) { + + public Vertex addVertex(PCollectionImpl impl, boolean output) { if (vertices.containsKey(impl)) { - return vertices.get(impl); + Vertex v = vertices.get(impl); + if (output) { + v.setOutput(); + } + return v; } Vertex v = new Vertex(impl); vertices.put(impl, v); + if (output) { + v.setOutput(); + } return v; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/28e51b6a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java index 7705896..7fb942f 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/GraphBuilder.java @@ -38,14 +38,14 @@ public class GraphBuilder implements PCollectionImpl.Visitor { } public void visitOutput(PCollectionImpl<?> output) { - workingVertex = graph.addVertex(output); + workingVertex = graph.addVertex(output, true); workingPath = new NodePath(); output.accept(this); } @Override public void visitInputCollection(InputCollection<?> collection) { - Vertex v = graph.addVertex(collection); + Vertex v = graph.addVertex(collection, false); graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection)); } @@ -74,7 +74,7 @@ public class GraphBuilder implements PCollectionImpl.Visitor { @Override public void visitGroupedTable(PGroupedTableImpl<?, ?> collection) { - Vertex v = graph.addVertex(collection); + Vertex v = graph.addVertex(collection, false); graph.getEdge(v, workingVertex).addNodePath(workingPath.close(collection)); workingVertex = v; workingPath = new NodePath(collection); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/28e51b6a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index f959f14..bca0bea 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration; import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import com.google.common.collect.Sets; public class MSCRPlanner { @@ -91,17 +92,19 @@ public class MSCRPlanner { // depending on its profile. // For dependency handling, we only need to care about which // job prototype a particular GBK is assigned to. - Map<Vertex, JobPrototype> assignments = Maps.newHashMap(); + Multimap<Vertex, JobPrototype> assignments = HashMultimap.create(); for (List<Vertex> component : components) { assignments.putAll(constructJobPrototypes(component)); } // Add in the job dependency information here. - for (Map.Entry<Vertex, JobPrototype> e : assignments.entrySet()) { + for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) { JobPrototype current = e.getValue(); List<Vertex> parents = graph.getParents(e.getKey()); for (Vertex parent : parents) { - current.addDependency(assignments.get(parent)); + for (JobPrototype parentJobProto : assignments.get(parent)) { + current.addDependency(parentJobProto); + } } } @@ -118,12 +121,14 @@ public class MSCRPlanner { for (Vertex baseVertex : baseGraph) { // Add all of the vertices in the base graph, but no edges (yet). - graph.addVertex(baseVertex.getPCollection()); + graph.addVertex(baseVertex.getPCollection(), baseVertex.isOutput()); } for (Edge e : baseGraph.getAllEdges()) { - // Add back all of the edges where neither vertex is a GBK. - if (!e.getHead().isGBK() && !e.getTail().isGBK()) { + // Add back all of the edges where neither vertex is a GBK and we do not + // have an output feeding into a GBK. + if (!(e.getHead().isGBK() && e.getTail().isGBK()) && + !(e.getHead().isOutput() && e.getTail().isGBK())) { Vertex head = graph.getVertexAt(e.getHead().getPCollection()); Vertex tail = graph.getVertexAt(e.getTail().getPCollection()); graph.getEdge(head, tail).addAllNodePaths(e.getNodePaths()); @@ -134,7 +139,24 @@ public class MSCRPlanner { if (baseVertex.isGBK()) { Vertex vertex = graph.getVertexAt(baseVertex.getPCollection()); for (Edge e : baseVertex.getIncomingEdges()) { - if (!e.getHead().isGBK()) { + if (e.getHead().isOutput()) { + // Execute an edge split. + Vertex splitTail = e.getHead(); + PCollectionImpl<?> split = splitTail.getPCollection(); + InputCollection<?> inputNode = handleSplitTarget(split); + Vertex splitHead = graph.addVertex(inputNode, false); + + // Divide up the node paths in the edge between the two GBK nodes so + // that each node is either owned by GBK1 -> newTail or newHead -> GBK2. + for (NodePath path : e.getNodePaths()) { + NodePath headPath = path.splitAt(split, splitHead.getPCollection()); + graph.getEdge(vertex, splitTail).addNodePath(headPath); + graph.getEdge(splitHead, vertex).addNodePath(path); + } + + // Note the dependency between the vertices in the graph. + graph.markDependency(splitHead, splitTail); + } else if (!e.getHead().isGBK()) { Vertex newHead = graph.getVertexAt(e.getHead().getPCollection()); graph.getEdge(newHead, vertex).addAllNodePaths(e.getNodePaths()); } @@ -144,13 +166,12 @@ public class MSCRPlanner { Vertex newTail = graph.getVertexAt(e.getTail().getPCollection()); graph.getEdge(vertex, newTail).addAllNodePaths(e.getNodePaths()); } else { - // Execute an Edge split Vertex newGraphTail = graph.getVertexAt(e.getTail().getPCollection()); PCollectionImpl split = e.getSplit(); InputCollection<?> inputNode = handleSplitTarget(split); - Vertex splitTail = graph.addVertex(split); - Vertex splitHead = graph.addVertex(inputNode); + Vertex splitTail = graph.addVertex(split, true); + Vertex splitHead = graph.addVertex(inputNode, false); // Divide up the node paths in the edge between the two GBK nodes so // that each node is either owned by GBK1 -> newTail or newHead -> GBK2. @@ -170,8 +191,8 @@ public class MSCRPlanner { return graph; } - private Map<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component) { - Map<Vertex, JobPrototype> assignment = Maps.newHashMap(); + private Multimap<Vertex, JobPrototype> constructJobPrototypes(List<Vertex> component) { + Multimap<Vertex, JobPrototype> assignment = HashMultimap.create(); List<Vertex> gbks = Lists.newArrayList(); for (Vertex v : component) { if (v.isGBK()) { @@ -223,6 +244,32 @@ public class MSCRPlanner { } prototype.addReducePaths(outputPaths); } + + // Check for any un-assigned vertices, which should be map-side outputs + // that we will need to run in a map-only job. + HashMultimap<Target, NodePath> outputPaths = HashMultimap.create(); + Set<Vertex> orphans = Sets.newHashSet(); + for (Vertex v : component) { + if (!assignment.containsKey(v) && v.isOutput()) { + orphans.add(v); + for (Edge e : v.getIncomingEdges()) { + orphans.add(e.getHead()); + for (NodePath nodePath : e.getNodePaths()) { + PCollectionImpl target = nodePath.tail(); + for (Target t : outputs.get(target)) { + outputPaths.put(t, nodePath); + } + } + } + } + } + if (!outputPaths.isEmpty()) { + JobPrototype prototype = JobPrototype.createMapOnlyJob( + outputPaths, pipeline.createTempPath()); + for (Vertex orphan : orphans) { + assignment.put(orphan, prototype); + } + } } return assignment; http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/28e51b6a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java index db49e83..50efe6a 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/Vertex.java @@ -33,6 +33,8 @@ import com.google.common.collect.Sets; */ public class Vertex { private final PCollectionImpl impl; + + private boolean output; private Set<Edge> incoming; private Set<Edge> outgoing; @@ -54,6 +56,14 @@ public class Vertex { return impl instanceof PGroupedTableImpl; } + public void setOutput() { + this.output = true; + } + + public boolean isOutput() { + return output; + } + public Source getSource() { if (isInput()) { return ((InputCollection) impl).getSource();
