GIRAPH-931: Provide a Strongly Connected Components algorithm (gianluca via majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/de0efb07 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/de0efb07 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/de0efb07 Branch: refs/heads/release-1.1 Commit: de0efb07518082439b9a5cccd503270b09f40e84 Parents: 5adca63 Author: Maja Kabiljo <[email protected]> Authored: Tue Aug 26 12:09:54 2014 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Tue Aug 26 12:11:00 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + giraph-examples/pom.xml | 4 + .../giraph/examples/scc/SccComputation.java | 213 +++++++++++++++++++ .../scc/SccLongLongNullTextInputFormat.java | 90 ++++++++ .../examples/scc/SccPhaseMasterCompute.java | 136 ++++++++++++ .../giraph/examples/scc/SccVertexValue.java | 157 ++++++++++++++ .../giraph/examples/scc/package-info.java | 21 ++ .../scc/SccComputationTestInMemory.java | 128 +++++++++++ 8 files changed, 751 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index b64ce2c..d5b284e 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-931: Provide a Strongly Connected Components algorithm (gianluca via majakabiljo) + GIRAPH-933: Checkpointing improvements (edunov via majakabiljo) GIRAPH-943: Perf regression due to netty 4.0.21 (pavanka) http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-examples/pom.xml b/giraph-examples/pom.xml index 90f6889..f8304a1 100644 --- a/giraph-examples/pom.xml +++ b/giraph-examples/pom.xml @@ -415,6 +415,10 @@ under the License. <artifactId>commons-collections</artifactId> </dependency> <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + </dependency> + <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccComputation.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccComputation.java new file mode 100644 index 0000000..ca194f6 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccComputation.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples.scc; + +import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.PHASE; +import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.NEW_MAXIMUM; +import static org.apache.giraph.examples.scc.SccPhaseMasterCompute.CONVERGED; + +import java.io.IOException; + +import org.apache.giraph.Algorithm; +import org.apache.giraph.examples.scc.SccPhaseMasterCompute.Phases; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +/** + * Finds strongly connected components of the graph. + */ +@Algorithm(name = "Strongly Connected Components", + description = "Finds strongly connected components of the graph") +public class SccComputation extends + BasicComputation<LongWritable, SccVertexValue, NullWritable, LongWritable> { + + /** + * Current phase of the computation as defined in SccPhaseMasterCompute + */ + private Phases currPhase; + + /** + * Reusable object to encapsulate message value, in order to avoid + * creating a new instance every time a message is sent. + */ + private LongWritable messageValue = new LongWritable(); + + /** + * Reusable object to encapsulate a parent vertex id. + */ + private LongWritable parentId = new LongWritable(); + + @Override + public void preSuperstep() { + IntWritable phaseInt = getAggregatedValue(PHASE); + currPhase = SccPhaseMasterCompute.getPhase(phaseInt); + } + + @Override + public void compute( + Vertex<LongWritable, SccVertexValue, NullWritable> vertex, + Iterable<LongWritable> messages) throws IOException { + + SccVertexValue vertexValue = vertex.getValue(); + + if (!vertexValue.isActive()) { + vertex.voteToHalt(); + return; + } + + switch (currPhase) { + case TRANSPOSE : + vertexValue.clearParents(); + sendMessageToAllEdges(vertex, vertex.getId()); + break; + case TRIMMING : + trim(vertex, messages); + break; + case FORWARD_TRAVERSAL : + forwardTraversal(vertex, messages); + break; + case BACKWARD_TRAVERSAL_START : + backwardTraversalStart(vertex); + break; + case BACKWARD_TRAVERSAL_REST : + backwardTraversalRest(vertex, messages); + break; + default : + break; + } + + } + + /** + * Creates list of parents based on the received ids and halts the vertices + * that don't have any parent or outgoing edge, hence, they can't be + * part of an SCC. + * @param vertex Current vertex. + * @param messages Received ids from the Transpose phase. + */ + private void trim(Vertex<LongWritable, SccVertexValue, NullWritable> vertex, + Iterable<LongWritable> messages) { + SccVertexValue vertexValue = vertex.getValue(); + // Keep the ids of the parent nodes to allow for backwards traversal + for (LongWritable parent : messages) { + vertexValue.addParent(parent.get()); + } + // If this node doesn't have any parents or outgoing edges, + // it can't be part of an SCC + vertexValue.set(vertex.getId().get()); + if (vertex.getNumEdges() == 0 || vertexValue.getParents() == null) { + vertexValue.deactivate(); + } else { + messageValue.set(vertexValue.get()); + sendMessageToAllEdges(vertex, messageValue); + } + } + + /** + * Traverse the graph through outgoing edges and keep the maximum vertex + * value. + * If a new maximum value is found, propagate it until convergence. + * @param vertex Current vertex. + * @param messages Received values from neighbor vertices. + */ + private void forwardTraversal( + Vertex<LongWritable, SccVertexValue, NullWritable> vertex, + Iterable<LongWritable> messages) { + SccVertexValue vertexValue = vertex.getValue(); + boolean changed = setMaxValue(vertexValue, messages); + if (changed) { + messageValue.set(vertexValue.get()); + sendMessageToAllEdges(vertex, messageValue); + aggregate(NEW_MAXIMUM, new BooleanWritable(true)); + } + } + + /** + * Traverse the transposed graph and keep the maximum vertex value. + * @param vertex Current vertex. + */ + private void backwardTraversalStart( + Vertex<LongWritable, SccVertexValue, NullWritable> vertex) { + SccVertexValue vertexValue = vertex.getValue(); + if (vertexValue.get() == vertex.getId().get()) { + messageValue.set(vertexValue.get()); + sendMessageToAllParents(vertex, messageValue); + } + } + + /** + * Traverse the transposed graph and keep the maximum vertex value. + * @param vertex Current vertex. + * @param messages Received values from children vertices. + */ + private void backwardTraversalRest( + Vertex<LongWritable, SccVertexValue, NullWritable> vertex, + Iterable<LongWritable> messages) { + SccVertexValue vertexValue = vertex.getValue(); + for (LongWritable m : messages) { + if (vertexValue.get() == m.get()) { + sendMessageToAllParents(vertex, m); + aggregate(CONVERGED, new BooleanWritable(true)); + vertexValue.deactivate(); + vertex.voteToHalt(); + break; + } + } + } + + /** + * Compares the messages values with the current vertex value and finds + * the maximum. + * If the maximum value is different from the vertex value, makes it the + * new vertex value and returns true, otherwise, returns false. + * @param vertexValue Current vertex value. + * @param messages Messages containing neighbors' vertex values. + * @return True if a new maximum was found, otherwise, returns false. + */ + private boolean setMaxValue(SccVertexValue vertexValue, + Iterable<LongWritable> messages) { + boolean changed = false; + for (LongWritable m : messages) { + if (vertexValue.get() < m.get()) { + vertexValue.set(m.get()); + changed = true; + } + } + return changed; + } + + + /** + * Send message to all parents. + * @param vertex Current vertex. + * @param message Message to be sent. + */ + private void sendMessageToAllParents( + Vertex<LongWritable, SccVertexValue, NullWritable> vertex, + LongWritable message) { + for (Long id : vertex.getValue().getParents()) { + parentId.set(id); + sendMessage(parentId, message); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccLongLongNullTextInputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccLongLongNullTextInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccLongLongNullTextInputFormat.java new file mode 100644 index 0000000..e5a4c86 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccLongLongNullTextInputFormat.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.examples.scc; + +import java.io.IOException; +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.io.formats.TextVertexInputFormat; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import com.google.common.collect.Lists; + +/** + * Simple text-based {@link org.apache.giraph.io.VertexInputFormat} for + * unweighted graphs with long ids. + * + * Each line consists of: vertex neighbor1 neighbor2 ... + */ +public class SccLongLongNullTextInputFormat extends + TextVertexInputFormat<LongWritable, SccVertexValue, NullWritable> { + /** Separator of the vertex and neighbors */ + private static final Pattern SEPARATOR = Pattern.compile("[\t ]"); + + @Override + public TextVertexReader createVertexReader(InputSplit split, + TaskAttemptContext context) + throws IOException { + return new LongLongNullVertexReader(); + } + + /** + * Vertex reader associated with {@link LongLongNullLongTextInputFormat}. + */ + public class LongLongNullVertexReader extends + TextVertexReaderFromEachLineProcessed<String[]> { + /** Cached vertex id for the current line */ + private LongWritable id; + + @Override + protected String[] preprocessLine(Text line) throws IOException { + String[] tokens = SEPARATOR.split(line.toString()); + id = new LongWritable(Long.parseLong(tokens[0])); + return tokens; + } + + @Override + protected LongWritable getId(String[] tokens) throws IOException { + return id; + } + + @Override + protected SccVertexValue getValue(String[] tokens) throws IOException { + return new SccVertexValue(Long.parseLong(tokens[0])); + } + + @Override + protected Iterable<Edge<LongWritable, NullWritable>> getEdges( + String[] tokens) throws IOException { + List<Edge<LongWritable, NullWritable>> edges = + Lists.newArrayListWithCapacity(tokens.length - 1); + for (int n = 1; n < tokens.length; n++) { + edges.add(EdgeFactory.create( + new LongWritable(Long.parseLong(tokens[n])))); + } + return edges; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccPhaseMasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccPhaseMasterCompute.java b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccPhaseMasterCompute.java new file mode 100644 index 0000000..f5fd82b --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccPhaseMasterCompute.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.examples.scc; + +import org.apache.giraph.aggregators.BooleanOverwriteAggregator; +import org.apache.giraph.aggregators.IntOverwriteAggregator; +import org.apache.giraph.master.DefaultMasterCompute; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.IntWritable; + +/** + * This master compute keeps track of what phase is being currently executed by + * the Strongly Connected Components computation. The phases comprehend the + * following: 1 - Transpose (comprehends 2 supersteps, one to propagate parent + * vertices ids and another one to store them by their respective children) 2 - + * Trimming (this phase can happen multiple times) 3 - Forward Traversal 4 - + * Backward Traversal + */ +public class SccPhaseMasterCompute extends DefaultMasterCompute { + + /** + * Aggregator that stores the current phase + */ + public static final String PHASE = "scccompute.phase"; + + /** + * Flags whether a new maximum was found in the Forward Traversal phase + */ + public static final String NEW_MAXIMUM = "scccompute.max"; + + /** + * Flags whether a vertex converged in the Backward Traversal phase + */ + public static final String CONVERGED = "scccompute.converged"; + + /** + * Enumerates the possible phases of the algorithm. + */ + public enum Phases { + /** Tranpose and Trimming phases **/ + TRANSPOSE, TRIMMING, + /** Maximum id propagation **/ + FORWARD_TRAVERSAL, + /** Vertex convergence in SCC **/ + BACKWARD_TRAVERSAL_START, BACKWARD_TRAVERSAL_REST + }; + + @Override + public void initialize() throws InstantiationException, + IllegalAccessException { + registerPersistentAggregator(PHASE, IntOverwriteAggregator.class); + registerAggregator(NEW_MAXIMUM, BooleanOverwriteAggregator.class); + registerAggregator(CONVERGED, BooleanOverwriteAggregator.class); + } + + @Override + public void compute() { + if (getSuperstep() == 0) { + setPhase(Phases.TRANSPOSE); + } else { + Phases currPhase = getPhase(); + switch (currPhase) { + case TRANSPOSE: + setPhase(Phases.TRIMMING); + break; + case TRIMMING : + setPhase(Phases.FORWARD_TRAVERSAL); + break; + case FORWARD_TRAVERSAL : + BooleanWritable newMaxFound = getAggregatedValue(NEW_MAXIMUM); + // If no new maximum value was found it means the propagation + // converged, so we can move to the next phase + if (!newMaxFound.get()) { + setPhase(Phases.BACKWARD_TRAVERSAL_START); + } + break; + case BACKWARD_TRAVERSAL_START : + setPhase(Phases.BACKWARD_TRAVERSAL_REST); + break; + case BACKWARD_TRAVERSAL_REST : + BooleanWritable converged = getAggregatedValue(CONVERGED); + if (!converged.get()) { + setPhase(Phases.TRANSPOSE); + } + break; + default : + break; + } + } + } + + /** + * Sets the next phase of the algorithm. + * @param phase + * Next phase. + */ + private void setPhase(Phases phase) { + setAggregatedValue(PHASE, new IntWritable(phase.ordinal())); + } + + /** + * Get current phase. + * @return Current phase as enumerator. + */ + private Phases getPhase() { + IntWritable phaseInt = getAggregatedValue(PHASE); + return getPhase(phaseInt); + } + + /** + * Helper function to convert from internal aggregated value to a Phases + * enumerator. + * @param phaseInt + * An integer that matches a position in the Phases enumerator. + * @return A Phases' item for the given position. + */ + public static Phases getPhase(IntWritable phaseInt) { + return Phases.values()[phaseInt.get()]; + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccVertexValue.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccVertexValue.java b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccVertexValue.java new file mode 100644 index 0000000..63c23c5 --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/SccVertexValue.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.examples.scc; + +import it.unimi.dsi.fastutil.longs.LongArrayList; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * Vertex value for the Strongly Connected Components algorithm. It keeps track + * of the parents of the vertex in order to traverse the graph backwards. + */ +public class SccVertexValue implements Writable { + + /** Vertex's parents **/ + private LongArrayList parents; + + /** Current vertex value **/ + private long value = Long.MIN_VALUE; + + /** Indicates whether the vertex was trimmed, hence, + * it can't be part of the computation anymore. + */ + private boolean active = true; + + /** + * Public constructor required for serialization. + */ + public SccVertexValue() { + } + + /** + * Constructor + * @param value Initial value for this vertex. + */ + public SccVertexValue(long value) { + this.value = value; + } + + @Override + public void readFields(DataInput in) throws IOException { + value = in.readLong(); + + int size = in.readInt(); + if (size != 0) { + for (int i = 0; i < size; i++) { + addParent(in.readLong()); + } + } + + active = in.readBoolean(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(value); + + int size = parents == null ? 0 : parents.size(); + out.writeInt(size); + if (size != 0) { + for (long incomingId : parents) { + out.writeLong(incomingId); + } + } + + out.writeBoolean(active); + } + + /** + * Returns the list of parent vertices, i.e., vertices that are at the other + * end of incoming edges. If the vertex doesn't have any incoming edge, it + * returns null. + * @return List of the vertex's parents. + */ + public LongArrayList getParents() { + return parents; + } + + /** + * Adds a vertex id to the list of parent vertices. + * @param vertexId It of the parent vertex. + */ + public void addParent(long vertexId) { + // Initialize the list of parent vertices only when one attempts to add + // the first item, so we save some memory on vertices that have no incoming + // edges + if (parents == null) { + parents = new LongArrayList(); + } + parents.add(vertexId); + } + + /** + * Clear parents list. + */ + public void clearParents() { + parents = null; + } + + /** + * Sets the vertex value. At the end of the SCC computation, vertices with the + * same vertex value are part of the same component. + * @param value Vertex value. + */ + public void set(long value) { + this.value = value; + } + + /** + * Returns the vertex value. At the end of the SCC computation, vertices with + * the same vertex value are part of the same component. + * @return Current vertex value. + */ + public long get() { + return value; + } + + /** + * Remove this vertex from the computation. + */ + public void deactivate() { + this.active = false; + } + + /** + * Indicates whether the vertex was removed in a Trimming phase. + * @return True if the vertex was trimmed, otherwise, return false. + */ + public boolean isActive() { + return active; + } + + @Override + public String toString() { + return String.valueOf(value); + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/src/main/java/org/apache/giraph/examples/scc/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/scc/package-info.java b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/package-info.java new file mode 100644 index 0000000..70e345a --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/scc/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Classes for Strongly Connected Components computation. + */ +package org.apache.giraph.examples.scc; http://git-wip-us.apache.org/repos/asf/giraph/blob/de0efb07/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java b/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java new file mode 100644 index 0000000..833c43e --- /dev/null +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/scc/SccComputationTestInMemory.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples.scc; + +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.edge.ByteArrayEdges; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.giraph.utils.TestGraph; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for {@link SccComputation} + */ +public class SccComputationTestInMemory { + @SuppressWarnings("unchecked") + public static Entry<LongWritable, NullWritable>[] makeEdges(long... args) { + Entry<LongWritable, NullWritable> result[] = new Entry[args.length]; + for (int i = 0; i < args.length; i++) { + result[i] = new SimpleEntry<LongWritable, NullWritable>(new LongWritable( + args[i]), NullWritable.get()); + } + return result; + } + + /** + * Connects the {@outgoingVertices} to the given vertex id + * with null-valued edges. + * + * @param graph + * @param id + * @param outgoingVertices + */ + public static void addVertex( + TestGraph<LongWritable, SccVertexValue, NullWritable> graph, long id, + long... outgoingVertices) { + graph.addVertex(new LongWritable(id), new SccVertexValue(id), + makeEdges(outgoingVertices)); + } + + @Test + public void testToyData() throws Exception { + GiraphConfiguration conf = new GiraphConfiguration(); + conf.setComputationClass(SccComputation.class); + conf.setMasterComputeClass(SccPhaseMasterCompute.class); + conf.setOutEdgesClass(ByteArrayEdges.class); + + + TestGraph<LongWritable, SccVertexValue, NullWritable> graph = new TestGraph<LongWritable, SccVertexValue, NullWritable>( + conf); + + addVertex(graph, 0, 1, 2, 4); + addVertex(graph, 1, 3, 20); + addVertex(graph, 2, 3); + addVertex(graph, 3, 0); + addVertex(graph, 20, 21); + addVertex(graph, 21, 22); + addVertex(graph, 22, 23); + addVertex(graph, 23, 24); + addVertex(graph, 24, 25); + addVertex(graph, 25, 20); + addVertex(graph, 4, 5); + addVertex(graph, 5, 6); + + TestGraph<LongWritable, SccVertexValue, NullWritable> results = InternalVertexRunner.runWithInMemoryOutput(conf, graph); + + Map<Long, List<Long>> scc = parse(results); + + List<Long> components = scc.get(3l); + Collections.sort(components); + Assert.assertEquals(Arrays.asList(0l, 1l, 2l, 3l), components); + + Assert.assertEquals(Arrays.asList(4l), scc.get(4l)); + Assert.assertEquals(Arrays.asList(5l), scc.get(5l)); + Assert.assertEquals(Arrays.asList(6l), scc.get(6l)); + + components = scc.get(25l); + Collections.sort(components); + Assert.assertEquals(Arrays.asList(20l, 21l, 22l, 23l, 24l, 25l), components); + } + + private Map<Long, List<Long>> parse( + TestGraph<LongWritable, SccVertexValue, NullWritable> g) { + Map<Long, List<Long>> scc = new HashMap<Long, List<Long>>(); + for (LongWritable v : g.getVertices().keySet()) { + Vertex<LongWritable, SccVertexValue, NullWritable> vertex = g + .getVertex(v); + long sccId = vertex.getValue().get(); + List<Long> verticesIds = scc.get(sccId); + if (verticesIds == null) {// New SCC + List<Long> newScc = new ArrayList<Long>(); + newScc.add(vertex.getId().get()); + scc.put(sccId, newScc); + } else { + verticesIds.add(vertex.getId().get()); + } + } + return scc; + } +}
