Updated Branches: refs/heads/master a8847de58 -> a547e90dc
CRUNCH 105: Add writing of Graphviz dot files for job plan Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/a547e90d Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/a547e90d Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/a547e90d Branch: refs/heads/master Commit: a547e90dce7f51295c95cc1b05d428670f621b43 Parents: a8847de Author: Gabriel Reid <[email protected]> Authored: Tue Oct 30 00:06:44 2012 +0100 Committer: Gabriel Reid <[email protected]> Committed: Wed Oct 31 19:40:16 2012 +0100 ---------------------------------------------------------------------- .../java/org/apache/crunch/impl/mr/MRPipeline.java | 16 +- .../apache/crunch/impl/mr/plan/DotfileWriter.java | 238 +++++++++++++++ .../apache/crunch/impl/mr/plan/JobPrototype.java | 16 + .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 8 +- .../crunch/impl/mr/plan/PlanningParameters.java | 10 + .../crunch/impl/mr/plan/DotfileWriterTest.java | 132 ++++++++ 6 files changed, 416 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a547e90d/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 043f7b1..60950f3 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -40,7 +40,9 @@ import org.apache.crunch.impl.mr.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; import org.apache.crunch.impl.mr.collect.UnionCollection; import org.apache.crunch.impl.mr.collect.UnionTable; +import org.apache.crunch.impl.mr.exec.MRExecutor; import org.apache.crunch.impl.mr.plan.MSCRPlanner; +import org.apache.crunch.impl.mr.run.CrunchRuntimeException; import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.crunch.io.At; import org.apache.crunch.io.ReadableSourceTarget; @@ -136,13 +138,21 @@ public class MRPipeline implements Pipeline { this.tempDirectory = createTempDirectory(conf); } + public MRExecutor plan() { + MSCRPlanner planner = new MSCRPlanner(this, outputTargets); + try { + return planner.plan(jarClass, conf); + } catch (IOException e) { + throw new CrunchRuntimeException(e); + } + } + @Override public PipelineResult run() { - MSCRPlanner planner = new MSCRPlanner(this, outputTargets); PipelineResult res = null; try { - res = planner.plan(jarClass, conf).execute(); - } catch (IOException e) { + res = plan().execute(); + } catch (CrunchRuntimeException e) { LOG.error(e); return PipelineResult.EMPTY; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a547e90d/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java new file mode 100644 index 0000000..46d8c53 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import java.util.List; +import java.util.Set; + +import org.apache.crunch.Pair; +import org.apache.crunch.Target; +import org.apache.crunch.impl.mr.collect.InputCollection; +import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; + +import com.google.common.base.Joiner; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +/** + * Writes <a href="http://www.graphviz.org">Graphviz</a> dot files to illustrate + * the topology of Crunch pipelines. + */ +public class DotfileWriter { + + /** The types of tasks within a MapReduce job. */ + enum MRTaskType { MAP, REDUCE }; + + private Set<JobPrototype> jobPrototypes = Sets.newHashSet(); + private HashMultimap<Pair<JobPrototype, MRTaskType>, String> jobNodeDeclarations = HashMultimap.create(); + private Set<String> globalNodeDeclarations = Sets.newHashSet(); + private Set<String> nodePathChains = Sets.newHashSet(); + + /** + * Format the declaration of a node based on a PCollection. + * + * @param pcollectionImpl PCollection for which a node will be declared + * @param jobPrototype The job containing the PCollection + * @return The node declaration + */ + String formatPCollectionNodeDeclaration(PCollectionImpl<?> pcollectionImpl, JobPrototype jobPrototype) { + String shape = "box"; + if (pcollectionImpl instanceof InputCollection) { + shape = "folder"; + } + return String.format("%s [label=\"%s\" shape=%s];", formatPCollection(pcollectionImpl, jobPrototype), pcollectionImpl.getName(), + shape); + } + + /** + * Format a Target as a node declaration. + * + * @param target A Target used within a MapReduce pipeline + * @return The global node declaration for the Target + */ + String formatTargetNodeDeclaration(Target target) { + return String.format("\"%s\" [label=\"%s\" shape=folder];", target.toString(), target.toString()); + } + + /** + * Format a PCollectionImpl into a format to be used for dot files. + * + * @param pcollectionImpl The PCollectionImpl to be formatted + * @param jobPrototype The job containing the PCollection + * @return The dot file formatted representation of the PCollectionImpl + */ + String formatPCollection(PCollectionImpl<?> pcollectionImpl, JobPrototype jobPrototype) { + if (pcollectionImpl instanceof InputCollection) { + InputCollection<?> inputCollection = (InputCollection<?>) pcollectionImpl; + return String.format("\"%s\"", inputCollection.getSource()); + } + return String.format("\"%s@%d@%d\"", pcollectionImpl.getName(), pcollectionImpl.hashCode(), jobPrototype.hashCode()); + } + + /** + * Format a collection of node strings into dot file syntax. + * + * @param nodeCollection Collection of chained node strings + * @return The dot-formatted chain of nodes + */ + String formatNodeCollection(List<String> nodeCollection) { + return String.format("%s;", Joiner.on(" -> ").join(nodeCollection)); + } + + /** + * Format a NodePath in dot file syntax. + * + * @param nodePath The node path to be formatted + * @param jobPrototype The job containing the NodePath + * @return The dot file representation of the node path + */ + List<String> formatNodePath(NodePath nodePath, JobPrototype jobPrototype) { + List<String> formattedNodePaths = Lists.newArrayList(); + + List<PCollectionImpl<?>> pcollections = Lists.newArrayList(nodePath); + for (int collectionIndex = 1; collectionIndex < pcollections.size(); collectionIndex++){ + String fromNode = formatPCollection(pcollections.get(collectionIndex - 1), jobPrototype); + String toNode = formatPCollection(pcollections.get(collectionIndex), jobPrototype); + formattedNodePaths.add(formatNodeCollection(Lists.newArrayList(fromNode, toNode))); + } + return formattedNodePaths; + } + + /** + * Add a NodePath to be formatted as a list of node declarations within a + * single job. + * + * @param jobPrototype The job containing the node path + * @param nodePath The node path to be formatted + */ + void addNodePathDeclarations(JobPrototype jobPrototype, NodePath nodePath) { + boolean groupingEncountered = false; + for (PCollectionImpl<?> pcollectionImpl : nodePath) { + if (pcollectionImpl instanceof InputCollection) { + globalNodeDeclarations.add(formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype)); + } else { + if (!groupingEncountered){ + groupingEncountered = (pcollectionImpl instanceof PGroupedTableImpl); + } + + MRTaskType taskType = groupingEncountered ? MRTaskType.REDUCE : MRTaskType.MAP; + jobNodeDeclarations.put(Pair.of(jobPrototype, taskType), formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype)); + } + } + } + + /** + * Add the chaining of a NodePath to the graph. + * + * @param nodePath The path to be formatted as a node chain in the dot file + * @param jobPrototype The job containing the NodePath + */ + void addNodePathChain(NodePath nodePath, JobPrototype jobPrototype) { + for (String nodePathChain : formatNodePath(nodePath, jobPrototype)){ + this.nodePathChains.add(nodePathChain); + } + } + + /** + * Get the graph attributes for a task-specific subgraph. + * + * @param taskType The type of task in the subgraph + * @return Graph attributes + */ + String getTaskGraphAttributes(MRTaskType taskType) { + if (taskType == MRTaskType.MAP) { + return "label = Map; color = blue;"; + } else { + return "label = Reduce; color = red;"; + } + } + + /** + * Add the contents of a {@link JobPrototype} to the graph describing a + * pipeline. + * + * @param jobPrototype A JobPrototype representing a portion of a MapReduce + * pipeline + */ + public void addJobPrototype(JobPrototype jobPrototype) { + jobPrototypes.add(jobPrototype); + if (!jobPrototype.isMapOnly()) { + for (NodePath nodePath : jobPrototype.getMapNodePaths()) { + addNodePathDeclarations(jobPrototype, nodePath); + addNodePathChain(nodePath, jobPrototype); + } + } + + HashMultimap<Target, NodePath> targetsToNodePaths = jobPrototype.getTargetsToNodePaths(); + for (Target target : targetsToNodePaths.keySet()) { + globalNodeDeclarations.add(formatTargetNodeDeclaration(target)); + for (NodePath nodePath : targetsToNodePaths.get(target)) { + addNodePathDeclarations(jobPrototype, nodePath); + addNodePathChain(nodePath, jobPrototype); + nodePathChains.add(formatNodeCollection(Lists.newArrayList(formatPCollection(nodePath.descendingIterator() + .next(), jobPrototype), String.format("\"%s\"", target.toString())))); + } + } + } + + /** + * Build up the full dot file containing the description of a MapReduce + * pipeline. + * + * @return Graphviz dot file contents + */ + public String buildDotfile() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("digraph G {\n"); + int clusterIndex = 0; + + for (String globalDeclaration : globalNodeDeclarations) { + stringBuilder.append(String.format(" %s\n", globalDeclaration)); + } + + for (JobPrototype jobPrototype : jobPrototypes){ + StringBuilder jobProtoStringBuilder = new StringBuilder(); + jobProtoStringBuilder.append(String.format(" subgraph cluster%d {\n", clusterIndex++)); + for (MRTaskType taskType : MRTaskType.values()){ + Pair<JobPrototype,MRTaskType> jobTaskKey = Pair.of(jobPrototype, taskType); + if (jobNodeDeclarations.containsKey(jobTaskKey)){ + jobProtoStringBuilder.append(String.format(" subgraph cluster%d {\n", clusterIndex++)); + jobProtoStringBuilder.append(String.format(" %s\n", getTaskGraphAttributes(taskType))); + for (String declarationEntry : jobNodeDeclarations.get(jobTaskKey)){ + jobProtoStringBuilder.append(String.format(" %s\n", declarationEntry)); + } + jobProtoStringBuilder.append(" }\n"); + } + } + jobProtoStringBuilder.append(" }\n"); + stringBuilder.append(jobProtoStringBuilder.toString()); + } + + for (String nodePathChain : nodePathChains) { + stringBuilder.append(String.format(" %s\n", nodePathChain)); + } + + stringBuilder.append("}\n"); + return stringBuilder.toString(); + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a547e90d/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index 0cae079..0ad1d00 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -82,6 +82,22 @@ public class JobPrototype { this.targetsToNodePaths = outputPaths; } + public boolean isMapOnly() { + return this.group == null; + } + + Set<NodePath> getMapNodePaths() { + return mapNodePaths; + } + + PGroupedTableImpl<?, ?> getGroupingTable() { + return group; + } + + HashMultimap<Target, NodePath> getTargetsToNodePaths() { + return targetsToNodePaths; + } + public void addReducePaths(HashMultimap<Target, NodePath> outputPaths) { if (group == null) { throw new IllegalStateException("Cannot add a reduce phase to a map-only job"); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a547e90d/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 59b95e8..7fe2809 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -55,7 +55,7 @@ public class MSCRPlanner { } return cmp; } - }; + }; private final MRPipeline pipeline; private final Map<PCollectionImpl<?>, Set<Target>> outputs; @@ -91,6 +91,7 @@ public class MSCRPlanner { assignments.putAll(constructJobPrototypes(component)); } + // Add in the job dependency information here. for (Map.Entry<Vertex, JobPrototype> e : assignments.entries()) { JobPrototype current = e.getValue(); @@ -103,10 +104,15 @@ public class MSCRPlanner { } // Finally, construct the jobs from the prototypes and return. + DotfileWriter dotfileWriter = new DotfileWriter(); MRExecutor exec = new MRExecutor(jarClass); for (JobPrototype proto : Sets.newHashSet(assignments.values())) { + dotfileWriter.addJobPrototype(proto); exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline)); } + + conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, dotfileWriter.buildDotfile()); + return exec; } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a547e90d/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java index 6ead212..b90a911 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java @@ -17,12 +17,22 @@ */ package org.apache.crunch.impl.mr.plan; +/** + * Collection of Configuration keys and various constants used when planning MapReduce jobs for a + * pipeline. + */ public class PlanningParameters { public static final String MULTI_OUTPUT_PREFIX = "out"; public static final String CRUNCH_WORKING_DIRECTORY = "crunch.work.dir"; + /** + * Configuration key under which a <a href="http://www.graphviz.org">DOT</a> file containing the + * pipeline job graph is stored by the planner. + */ + public static final String PIPELINE_PLAN_DOTFILE = "crunch.planner.dotfile"; + private PlanningParameters() { } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a547e90d/crunch/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java new file mode 100644 index 0000000..562238d --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.List; + +import org.apache.crunch.Source; +import org.apache.crunch.Target; +import org.apache.crunch.impl.mr.collect.InputCollection; +import org.apache.crunch.impl.mr.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.plan.DotfileWriter.MRTaskType; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.collect.Lists; + +public class DotfileWriterTest { + + private DotfileWriter dotfileWriter; + + @Before + public void setUp() { + dotfileWriter = new DotfileWriter(); + } + + @Test + public void testFormatPCollectionNodeDeclaration() { + PCollectionImpl<?> pcollectionImpl = mock(PCollectionImpl.class); + JobPrototype jobPrototype = mock(JobPrototype.class); + when(pcollectionImpl.getName()).thenReturn("collection"); + + assertEquals("\"collection@" + pcollectionImpl.hashCode() + "@" + jobPrototype.hashCode() + + "\" [label=\"collection\" shape=box];", + dotfileWriter.formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype)); + } + + @Test + public void testFormatPCollectionNodeDeclaration_InputPCollection() { + InputCollection<?> inputCollection = mock(InputCollection.class, Mockito.RETURNS_DEEP_STUBS); + JobPrototype jobPrototype = mock(JobPrototype.class); + when(inputCollection.getName()).thenReturn("input"); + when(inputCollection.getSource().toString()).thenReturn("source"); + + assertEquals("\"source\" [label=\"input\" shape=folder];", + dotfileWriter.formatPCollectionNodeDeclaration(inputCollection, jobPrototype)); + } + + @Test + public void testFormatTargetNodeDeclaration() { + Target target = mock(Target.class); + when(target.toString()).thenReturn("target/path"); + + assertEquals("\"target/path\" [label=\"target/path\" shape=folder];", + dotfileWriter.formatTargetNodeDeclaration(target)); + } + + @Test + public void testFormatPCollection() { + PCollectionImpl<?> pcollectionImpl = mock(PCollectionImpl.class); + JobPrototype jobPrototype = mock(JobPrototype.class); + when(pcollectionImpl.getName()).thenReturn("collection"); + + assertEquals("\"collection@" + pcollectionImpl.hashCode() + "@" + jobPrototype.hashCode() + "\"", + dotfileWriter.formatPCollection(pcollectionImpl, jobPrototype)); + } + + @Test + public void testFormatPCollection_InputCollection() { + InputCollection<Object> inputCollection = mock(InputCollection.class); + Source<Object> source = mock(Source.class); + JobPrototype jobPrototype = mock(JobPrototype.class); + when(source.toString()).thenReturn("mocksource"); + when(inputCollection.getSource()).thenReturn(source); + + assertEquals("\"mocksource\"", dotfileWriter.formatPCollection(inputCollection, jobPrototype)); + } + + @Test + public void testFormatNodeCollection() { + List<String> nodeCollection = Lists.newArrayList("one", "two", "three"); + assertEquals("one -> two -> three;", dotfileWriter.formatNodeCollection(nodeCollection)); + } + + @Test + public void testFormatNodePath() { + PCollectionImpl<?> tail = mock(PCollectionImpl.class); + PCollectionImpl<?> head = mock(PCollectionImpl.class); + JobPrototype jobPrototype = mock(JobPrototype.class); + + when(tail.getName()).thenReturn("tail"); + when(head.getName()).thenReturn("head"); + + NodePath nodePath = new NodePath(tail); + nodePath.close(head); + + assertEquals( + Lists.newArrayList("\"head@" + head.hashCode() + "@" + jobPrototype.hashCode() + "\" -> \"tail@" + + tail.hashCode() + "@" + jobPrototype.hashCode() + "\";"), + dotfileWriter.formatNodePath(nodePath, jobPrototype)); + } + + @Test + public void testGetTaskGraphAttributes_Map() { + assertEquals("label = Map; color = blue;", dotfileWriter.getTaskGraphAttributes(MRTaskType.MAP)); + } + + @Test + public void testGetTaskGraphAttributes_Reduce() { + assertEquals("label = Reduce; color = red;", dotfileWriter.getTaskGraphAttributes(MRTaskType.REDUCE)); + } + +}
