Repository: giraph Updated Branches: refs/heads/trunk d7e4bde11 -> 3b7c68e54
http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/TestMessageChain.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/TestMessageChain.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/TestMessageChain.java new file mode 100644 index 0000000..0f5f1ac --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/TestMessageChain.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library; + +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.combiner.MaxMessageCombiner; +import org.apache.giraph.combiner.SumMessageCombiner; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.function.primitive.PrimitiveRefs.LongRef; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Iterators; + +/** + * Tests and examples of using SendMessageChain + */ +public class TestMessageChain { + + private static GiraphConfiguration createConf() { + GiraphConfiguration conf = new GiraphConfiguration(); + GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class); + GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class); + GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class); + return conf; + } + + private static NumericTestGraph<LongWritable, LongWritable, NullWritable> createTestGraph() { + NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = + new NumericTestGraph<LongWritable, LongWritable, NullWritable>(createConf()); + graph.addVertex(1); + graph.addVertex(2); + graph.addVertex(3); + graph.addVertex(4); + + graph.addSymmetricEdge(1, 2); + graph.addSymmetricEdge(2, 3); + return graph; + } + + @Test + public void testReply() { + NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(); + + // calculates max ID of FOFs + Block reply = SendMessageChain.<LongWritable, LongWritable, NullWritable, LongWritable> + startSendToNeighbors( + "SendMyIdToAllNeighbors", + LongWritable.class, + VertexSuppliers.vertexIdSupplier() + ).thenSendToNeighbors( + "SendMaxIReceivedToAllNeighbors", + LongWritable.class, + (vertex, messages) -> new LongWritable(max(messages)) + ).endConsume( + (vertex, messages) -> vertex.getValue().set(max(messages)) + ); + + LocalBlockRunner.runBlock(graph.getTestGraph(), reply, new Object()); + + Assert.assertEquals(3, graph.getVertex(1).getValue().get()); + Assert.assertEquals(2, graph.getVertex(2).getValue().get()); + Assert.assertEquals(3, graph.getVertex(3).getValue().get()); + Assert.assertEquals(0, graph.getVertex(4).getValue().get()); + } + + @Test + public void testReplyCombiner() { + NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(); + + // calculates max ID of FOFs + Block reply = SendMessageChain.<LongWritable, LongWritable, NullWritable, LongWritable> + startSendToNeighbors( + "SendMyIdToAllNeighbors", + MaxMessageCombiner.LONG, + VertexSuppliers.vertexIdSupplier() + ).thenSendToNeighbors( + "SendMaxIReceivedToAllNeighbors", + MaxMessageCombiner.LONG, + (vertex, message) -> message + ).endConsume( + (vertex, message) -> vertex.getValue().set(message != null ? message.get() : 0) + ); + + LocalBlockRunner.runBlock(graph.getTestGraph(), reply, new Object()); + + Assert.assertEquals(3, graph.getVertex(1).getValue().get()); + Assert.assertEquals(2, graph.getVertex(2).getValue().get()); + Assert.assertEquals(3, graph.getVertex(3).getValue().get()); + Assert.assertEquals(0, graph.getVertex(4).getValue().get()); + } + + @Test + public void testReplyCombinerEndReduce() { + NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(); + + LongRef sumOfAll = new LongRef(0); + + // calculates max ID of FOFs + Block reply = SendMessageChain.<LongWritable, LongWritable, NullWritable, LongWritable> + startSendToNeighbors( + "SendMyIdToAllNeighbors", + MaxMessageCombiner.LONG, + VertexSuppliers.vertexIdSupplier() + ).thenSendToNeighbors( + "SendMaxIReceivedToAllNeighbors", + MaxMessageCombiner.LONG, + (vertex, message) -> message + ).endReduce( + "SumAllReceivedValues", + SumReduce.LONG, + (vertex, message) -> message != null ? message : new LongWritable(0), + (value) -> sumOfAll.value = value.get() + ); + + LocalBlockRunner.runBlock( + graph.getTestGraph(), + new SequenceBlock( + reply, + Pieces.forAllVertices( + "SetAllValuesToReduced", + (vertex) -> ((LongWritable) vertex.getValue()).set(sumOfAll.value))), + new Object()); + + Assert.assertEquals(8, graph.getVertex(1).getValue().get()); + Assert.assertEquals(8, graph.getVertex(2).getValue().get()); + Assert.assertEquals(8, graph.getVertex(3).getValue().get()); + Assert.assertEquals(8, graph.getVertex(4).getValue().get()); + + // Block execution is happening in the separate environment if SERIALIZE_MASTER is used, + // so our instance of sumOfAll will be unchanged + Assert.assertEquals(LocalBlockRunner.SERIALIZE_MASTER.getDefaultValue() ? 0 : 8, sumOfAll.value); + } + + + @Test + public void testStartCustom() { + NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(); + + Block reply = SendMessageChain.<LongWritable, LongWritable, NullWritable, LongWritable> + startCustom( + // Sends ID to it's first neighbor, passing max of received messages to next part of the chain + (consumer) -> new Piece<LongWritable, LongWritable, NullWritable, LongWritable, Object>() { + @Override + public VertexSender<LongWritable, LongWritable, NullWritable> getVertexSender( + BlockWorkerSendApi<LongWritable, LongWritable, NullWritable, LongWritable> workerApi, + Object executionStage) { + return (vertex) -> { + Edge<LongWritable, NullWritable> edge = + Iterators.getNext(vertex.getEdges().iterator(), null); + if (edge != null) { + workerApi.sendMessage(edge.getTargetVertexId(), vertex.getId()); + } + }; + } + + @Override + public VertexReceiver<LongWritable, LongWritable, NullWritable, LongWritable> + getVertexReceiver(BlockWorkerReceiveApi<LongWritable> workerApi, Object executionStage) { + return (vertex, messages) -> { + consumer.apply(vertex, new LongWritable(max(messages))); + }; + } + + @Override + protected Class<LongWritable> getMessageClass() { + return LongWritable.class; + } + } + ).thenSendToNeighbors( + "SendMaxIReceivedToAllNeighbors", + SumMessageCombiner.LONG, + (vertex, message) -> message + ).endConsume( + (vertex, message) -> vertex.getValue().set(message != null ? message.get() : 0) + ); + + LocalBlockRunner.runBlock(graph.getTestGraph(), reply, new Object()); + + Assert.assertEquals(3, graph.getVertex(1).getValue().get()); + Assert.assertEquals(2, graph.getVertex(2).getValue().get()); + Assert.assertEquals(3, graph.getVertex(3).getValue().get()); + Assert.assertEquals(0, graph.getVertex(4).getValue().get()); + } + + + + + private static long max(Iterable<LongWritable> messages) { + long result = 0; + for (LongWritable message : messages) { + result = Math.max(result, message.get()); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchBlockFactory.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchBlockFactory.java new file mode 100644 index 0000000..e16313a --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchBlockFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.algo; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; + +/** + * Example Application of BFS calculation + */ +public class BreadthFirstSearchBlockFactory extends AbstractBlockFactory<Object> { + @Override + public Block createBlock(GiraphConfiguration conf) { + SupplierFromVertex<LongWritable, BreadthFirstSearchVertexValue, Writable, Boolean> + isVertexInSeedSet = + (vertex) -> { + return vertex.getValue().isSeedVertex(); + }; + + SupplierFromVertex<LongWritable, BreadthFirstSearchVertexValue, Writable, IntWritable> + getDistance = + (vertex) -> { + return new IntWritable(vertex.getValue().getDistance()); + }; + + ConsumerWithVertex<LongWritable, BreadthFirstSearchVertexValue, Writable, IntWritable> + setDistance = + (vertex, value) -> { + vertex.getValue().setDistance(value.get()); + }; + + return BreadthFirstSearch.bfs( + isVertexInSeedSet, + getDistance, + setDistance); + } + + @Override + public Object createExecutionStage(GiraphConfiguration conf) { + return new Object(); + } + + @Override + protected Class<LongWritable> getVertexIDClass(GiraphConfiguration conf) { + return LongWritable.class; + } + + @Override + protected Class<BreadthFirstSearchVertexValue> getVertexValueClass(GiraphConfiguration conf) { + return BreadthFirstSearchVertexValue.class; + } + + @Override + protected Class<NullWritable> getEdgeValueClass(GiraphConfiguration conf) { + return NullWritable.class; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchVertexValue.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchVertexValue.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchVertexValue.java new file mode 100644 index 0000000..88b78e5 --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/BreadthFirstSearchVertexValue.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.algo; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +/** + * Vertex-value class for BreadthFirstSearchBlockFactory + */ +public class BreadthFirstSearchVertexValue implements Writable { + private boolean seedVertex; + private int distance; + + public boolean isSeedVertex() { + return seedVertex; + } + + public void setSeedVertex(boolean seedVertex) { + this.seedVertex = seedVertex; + } + + public int getDistance() { + return distance; + } + + public void setDistance(int distance) { + this.distance = distance; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeBoolean(seedVertex); + out.writeInt(distance); + } + + @Override + public void readFields(DataInput in) throws IOException { + seedVertex = in.readBoolean(); + distance = in.readInt(); + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetBlockFactory.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetBlockFactory.java new file mode 100644 index 0000000..76c9f89 --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetBlockFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.algo; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +/** + * Example Application of distributed independent set calculation + */ +public class DistributedIndependentSetBlockFactory extends AbstractBlockFactory<Object> { + @Override + public Block createBlock(GiraphConfiguration conf) { + return DistributedIndependentSet. + <LongWritable, DistributedIndependentSetVertexValue>independentSets( + getVertexIDClass(conf), + (vertex) -> vertex.getValue().getIndependentSetID(), + (vertex, value) -> vertex.getValue().setIndependentSetID(value) + ); + } + + @Override + public Object createExecutionStage(GiraphConfiguration conf) { + return new Object(); + } + + @Override + protected Class<LongWritable> getVertexIDClass(GiraphConfiguration conf) { + return LongWritable.class; + } + + @Override + protected Class<DistributedIndependentSetVertexValue> getVertexValueClass( + GiraphConfiguration conf) { + return DistributedIndependentSetVertexValue.class; + } + + @Override + protected Class<NullWritable> getEdgeValueClass(GiraphConfiguration conf) { + return NullWritable.class; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetVertexValue.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetVertexValue.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetVertexValue.java new file mode 100644 index 0000000..d29ee00 --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/DistributedIndependentSetVertexValue.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.algo; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; + +/** + * Vertex-value class for DistributedIndependentSetBlockFactory + */ +public class DistributedIndependentSetVertexValue implements Writable { + private IntWritable independentSetID; + + public IntWritable getIndependentSetID() { + return independentSetID; + } + + public void setIndependentSetID(IntWritable independentSetID) { + this.independentSetID = independentSetID; + } + + @Override + public void write(DataOutput out) throws IOException { + if (independentSetID == null) { + independentSetID = new IntWritable(); + } + out.writeInt(independentSetID.get()); + } + + @Override + public void readFields(DataInput in) throws IOException { + if (independentSetID == null) { + independentSetID = new IntWritable(); + } + independentSetID.set(in.readInt()); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java new file mode 100644 index 0000000..c19fe27 --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestBreadthFirstSearch.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.algo; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.TestGraphModifier; +import org.apache.giraph.block_app.test_setup.TestGraphUtils; +import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.junit.Test; + +public class TestBreadthFirstSearch { + private void run( + TestGraphModifier<LongWritable, BreadthFirstSearchVertexValue, NullWritable> graphLoader, + int[] expectedDistances, + int[] seedVertices + ) throws Exception { + TestGraphModifier<LongWritable, BreadthFirstSearchVertexValue, NullWritable> valueLoader = + (graph) -> { + List<Integer> seeds = Arrays.asList(ArrayUtils.toObject(seedVertices)); + for (int i = 0; i < graph.getVertexCount(); i++) + graph.getVertex(i).getValue().setSeedVertex(seeds.contains(i)); + }; + + TestGraphUtils.runTest( + TestGraphUtils.chainModifiers(graphLoader, valueLoader), + (graph) -> { + for (int i = 0; i < expectedDistances.length; i++) { + assertEquals(expectedDistances[i], graph.getValue(i).getDistance()); + } + }, + (conf) -> { + BlockUtils.setBlockFactoryClass(conf, BreadthFirstSearchBlockFactory.class); + } + ); + } + + @Test + public void testSmall1SingleSeed() throws Exception { + int[] expected = {0, 1, 1, 2, 3, 3, -1}; + int[] seeds = {0}; + run(new Small1GraphInit<>(), expected, seeds); + } + + @Test + public void testSmall1TwoSeeds() throws Exception { + int[] expected = {0, 1, 1, 1, 0, 1, -1}; + int[] seeds = {0, 4}; + run(new Small1GraphInit<>(), expected, seeds); + } + + @Test + public void testSmall1IsolatedSeed() throws Exception { + int[] expected = {-1, -1, -1, -1, -1, -1, 0}; + int[] seeds = {6}; + run(new Small1GraphInit<>(), expected, seeds); + } + + @Test + public void testSmallGraphTwoSeeds() throws Exception { + int[] expected = {0, 1, 2, 2, 2, 2, 3, 4, 5, 5, 5, 1, 2, 2, 2, 0}; + int[] seeds = {0, 15}; + run(new Graph1Init<>(), expected, seeds); + } + + @Test + public void testSmallGraphTwoCloseSeeds() throws Exception { + int[] expected = {1, 0, 1, 0, 1, 1, 1, 2, 3, 3, 3, 2, 3, 3, 3, 3}; + int[] seeds = {1, 3}; + run(new Graph1Init<>(), expected, seeds); + } + + @Test + public void testMultipleComponentGraphCloseSeeds() throws Exception { + int[] expected = {2, 1, 0, 1, 2, 3, 3, 3, 2, 2, 2, 2, 1, 0, 2, -1, -1, -1, -1, -1, -1}; + int[] seeds = {13, 2}; + run(new Graph2Init(), expected, seeds); + } + + @Test + public void testMultipleComponentGraphFarSeeds() throws Exception { + int[] expected = {3, 2, 3, 2, 1, 0, 1, 2, 1, 2, 3, 3, 2, 3, 3, 3, 2, 1, 0, 1, -1}; + int[] seeds = {5, 18}; + run(new Graph2Init(), expected, seeds); + } + + + public class Graph1Init<I extends WritableComparable, V extends Writable, E extends Writable> + implements TestGraphModifier<I, V, E> { + + @Override + public void modifyGraph(NumericTestGraph<I, V, E> graph) { + graph.addVertex(0, (Number) null, null, 1); + graph.addVertex(1, (Number) null, null, 0,2,3,4,5); + graph.addVertex(2, (Number) null, null, 1,3,4,5); + graph.addVertex(3, (Number) null, null, 1,2,4,5,6); + graph.addVertex(4, (Number) null, null, 1,2,3,5); + graph.addVertex(5, (Number) null, null, 1,2,3,4,11); + graph.addVertex(6, (Number) null, null, 3,7); + graph.addVertex(7, (Number) null, null, 6,8,9,10); + graph.addVertex(8, (Number) null, null, 7,9,10); + graph.addVertex(9, (Number) null, null, 7,8,10); + graph.addVertex(10, (Number) null, null, 7,8,9); + graph.addVertex(11, (Number) null, null, 5,12,13,14,15); + graph.addVertex(12, (Number) null, null, 11); + graph.addVertex(13, (Number) null, null, 11); + graph.addVertex(14, (Number) null, null, 11); + graph.addVertex(15, (Number) null, null, 11); + } + } + + public class Graph2Init<I extends WritableComparable, V extends Writable, E extends Writable> + implements TestGraphModifier<I, V, E> { + + @Override + public void modifyGraph(NumericTestGraph<I, V, E> graph) { + graph.addVertex(0, (Number) null, null, 1); + graph.addVertex(1, (Number) null, null, 0,2,3,4); + graph.addVertex(2, (Number) null, null, 1,3); + graph.addVertex(3, (Number) null, null, 2,4,9,10,11); + graph.addVertex(4, (Number) null, null, 1,3,5,6,7); + graph.addVertex(5, (Number) null, null, 4,6,5,8); + graph.addVertex(6, (Number) null, null, 4,5,7); + graph.addVertex(7, (Number) null, null, 4,5,6); + graph.addVertex(8, (Number) null, null, 5,9,12); + graph.addVertex(9, (Number) null, null, 3,8,10,11,12); + graph.addVertex(10, (Number) null, null, 3,9,11); + graph.addVertex(11, (Number) null, null, 3,9,10); + graph.addVertex(12, (Number) null, null, 8,9,13,14); + graph.addVertex(13, (Number) null, null, 12); + graph.addVertex(14, (Number) null, null, 12); + graph.addVertex(15, (Number) null, null, 16); + graph.addVertex(16, (Number) null, null, 15,17,19); + graph.addVertex(17, (Number) null, null, 16,18); + graph.addVertex(18, (Number) null, null, 17,19); + graph.addVertex(19, (Number) null, null, 16,18); + graph.addVertex(20); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestDistributedIndependentSet.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestDistributedIndependentSet.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestDistributedIndependentSet.java new file mode 100644 index 0000000..fbd380b --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/algo/TestDistributedIndependentSet.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.algo; + +import java.util.ArrayList; +import java.util.HashMap; + +import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.TestGraphModifier; +import org.apache.giraph.block_app.test_setup.TestGraphUtils; +import org.apache.giraph.edge.Edge; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Assert; +import org.junit.Test; + +public class TestDistributedIndependentSet { + + private void runTest( + TestGraphModifier<LongWritable, DistributedIndependentSetVertexValue, NullWritable> + graphLoader) throws Exception { + TestGraphUtils.runTest(graphLoader, (graph) -> { + checkDecomposition(graph); + }, (conf) -> { + BlockUtils.setBlockFactoryClass(conf, DistributedIndependentSetBlockFactory.class); + }); + } + + private static void checkDecomposition( + NumericTestGraph<LongWritable, DistributedIndependentSetVertexValue, NullWritable> graph) { + final int UNASSIGNED = -1; + final int HAS_EDGE_TO_IND_SET = -2; + // Hold (id -> List of vertices) for each independent set id + HashMap<Integer, ArrayList<Integer>> indSets = new HashMap<>(); + int numVertices = graph.getVertexCount(); + + int numIndSets = 0; + for (int i = 0; i < numVertices; ++i) { + int indSetID = graph.getVertex(i).getValue().getIndependentSetID().get(); + // Number of independent sets are less than or equal to the number of vertices in the input + // graph. + Assert.assertTrue(indSetID >= 0 && indSetID < numVertices); + // All tests assign (0, 1, ...) as independent set ids. The vertex assigned to the max id + // is a specifier for the total number of independent sets the input graph is decomposed + // into. + if (indSetID > numIndSets) + numIndSets = indSetID; + ArrayList<Integer> mapValue = indSets.get(indSetID); + if (mapValue == null) { + mapValue = new ArrayList<>(); + indSets.put(indSetID, mapValue); + } + mapValue.add(i); + } + numIndSets++; + + int[] label = new int[numVertices]; + for (int i = 0; i < numVertices; ++i) + label[i] = UNASSIGNED; + + for (int i = 0; i < numIndSets; ++i) { + ArrayList<Integer> indSet = indSets.get(i); + + // All independent set ids are assigned consecutively starting from 0. There should be at + // least one vertex per assigned independent set. + Assert.assertTrue(indSet != null && indSet.size() > 0); + for (int v : indSet) { + // Check if vertices in this independent set is not assigned to any of the previous + // independent sets. + Assert.assertTrue(label[v] == UNASSIGNED); + label[v] = i; + } + + for (int v : indSet) { + for (Edge<LongWritable, NullWritable> edge : graph.getVertex(v).getEdges()) { + int u = (int) edge.getTargetVertexId().get(); + // Check all vertices in the current independent set do not have edge to each other. + Assert.assertTrue(label[u] != label[v]); + // Mark unassigned vertices neighboring this independent set. This is necessary to check + // if this independent set is 'maximal'. + if (label[u] == UNASSIGNED) + label[u] = HAS_EDGE_TO_IND_SET; + } + } + + // Check if the independent set is maximal. + for (int j = 0; j < numVertices; ++j) { + Assert.assertTrue(label[j] != UNASSIGNED); + // Reset marked vertices neighboring to this independent set. + if (label[j] == HAS_EDGE_TO_IND_SET) + label[j] = UNASSIGNED; + } + } + } + + private static void createVertices( + NumericTestGraph<LongWritable, DistributedIndependentSetVertexValue, NullWritable> graph, + int numVertices) { + for (int i = 0; i < numVertices; ++i) + graph.addVertex(i); + } + + @Test + public void testSmallGraph() throws Exception { + /* + * 1 5 + * / \ / \ 6 + * 0---2--3---4 + */ + final int NUM_VERTICES = 7; + runTest((graph) -> { + createVertices(graph, NUM_VERTICES); + graph.addSymmetricEdge(0, 1); + graph.addSymmetricEdge(0, 2); + graph.addSymmetricEdge(1, 2); + graph.addSymmetricEdge(2, 3); + graph.addSymmetricEdge(3, 4); + graph.addSymmetricEdge(3, 5); + graph.addSymmetricEdge(4, 5); + }); + } + + @Test + public void testSmallGraphOrderingEffect() throws Exception { + /* + * 4 5 + * / \ / \ 6 + * 0---2--1---3 + */ + final int NUM_VERTICES = 7; + runTest((graph) -> { + createVertices(graph, NUM_VERTICES); + graph.addSymmetricEdge(0, 4); + graph.addSymmetricEdge(0, 2); + graph.addSymmetricEdge(2, 4); + graph.addSymmetricEdge(1, 2); + graph.addSymmetricEdge(1, 5); + graph.addSymmetricEdge(1, 3); + graph.addSymmetricEdge(3, 5); + }); + } + + @Test + public void testRingOdd() throws Exception { + final int NUM_VERTICES = 13; + runTest((graph) -> { + createVertices(graph, NUM_VERTICES); + for (int i = 1; i < NUM_VERTICES; ++i) + graph.addSymmetricEdge(i - 1, i); + graph.addSymmetricEdge(0, NUM_VERTICES - 1); + }); + } + + @Test + public void testStarGraph() throws Exception { + final int NUM_VERTICES = 15; + runTest((graph) -> { + createVertices(graph, NUM_VERTICES); + for (int i = 1; i < NUM_VERTICES; ++i) + graph.addSymmetricEdge(0, i); + }); + } + + @Test + public void testMultipleStarGraphs() throws Exception { + final int NUM_VERTICES1 = 15; + final int NUM_VERTICES2 = 21; + runTest((graph) -> { + createVertices(graph, NUM_VERTICES1 + NUM_VERTICES2); + for (int i = 1; i < NUM_VERTICES1; ++i) + graph.addSymmetricEdge(0, i); + + for (int i = 1 + NUM_VERTICES1; i < NUM_VERTICES1 + NUM_VERTICES2; ++i) + graph.addSymmetricEdge(NUM_VERTICES1, i); + }); + } + + @Test + public void testMeshGraph() throws Exception { + final int M = 11; + final int N = 13; + runTest((graph) -> { + createVertices(graph, M * N); + for (int i = 0; i < M; ++i) + for (int j = 0; j < N; ++j) { + if (i != M - 1) + graph.addSymmetricEdge(i * N + j, (i + 1) * N + j); + if (j != N - 1) + graph.addSymmetricEdge(i * N + j, i * N + (j + 1)); + } + }); + } + + @Test + public void testCompleteGraph() throws Exception { + final int NUM_VERTICES = 17; + runTest((graph) -> { + createVertices(graph, NUM_VERTICES); + for (int i = 0; i < NUM_VERTICES; ++i) + for (int j = i + 1; j < NUM_VERTICES; ++j) + graph.addSymmetricEdge(i, j); + }); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/coarsening/TestCoarseningUtils.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/coarsening/TestCoarseningUtils.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/coarsening/TestCoarseningUtils.java new file mode 100644 index 0000000..1f5ece6 --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/coarsening/TestCoarseningUtils.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.coarsening; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.library.ReusableSuppliers; +import org.apache.giraph.block_app.test_setup.TestGraphUtils; +import org.apache.giraph.block_app.test_setup.graphs.EachVertexInit; +import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.hadoop.io.IntWritable; +import org.junit.Assert; +import org.junit.Test; + +public class TestCoarseningUtils { + + public static class TestCoarseningUtilsBlockFactory extends AbstractBlockFactory<Object> { + @Override + public Block createBlock(GiraphConfiguration conf) { + return CoarseningUtils.<IntWritable, IntWritable, IntWritable>createCoarseningBlock( + (ImmutableClassesGiraphConfiguration<IntWritable, IntWritable, IntWritable>) conf, + ReusableSuppliers.fromInt((vertex) -> { + int id = vertex.getId().get(); + if (id == 0 || id == 1) { + return -1; + } else if (id == 2 || id == 3) { + return -2; + } else if (id == 4 || id == 5) { + return -4; + } else return -id; + }), + id -> id.get() >= 0, + id -> id.get() < 0); + } + + @Override + public Object createExecutionStage(GiraphConfiguration conf) { + return new Object(); + } + + @Override + protected Class<IntWritable> getVertexIDClass(GiraphConfiguration conf) { + return IntWritable.class; + } + + @Override + protected Class<IntWritable> getVertexValueClass(GiraphConfiguration conf) { + return IntWritable.class; + } + + @Override + protected Class<IntWritable> getEdgeValueClass(GiraphConfiguration conf) { + return IntWritable.class; + } + } + + @Test + public void testSmallGraph() throws Exception { + /* We take small graph: + * + * 1 5 + * / \ / \ 6 + * 0---2--3---4 + * + * And coarsen it into 4 groups: (0,1), (2,3), (4,5), (6) + */ + + TestGraphUtils.<IntWritable, IntWritable, IntWritable>runTest( + TestGraphUtils.chainModifiers( + new Small1GraphInit<>(() -> new IntWritable(1)), + new EachVertexInit<>((vertex) -> vertex.getValue().set(1))), + (graph) -> { + for (int i : new int[] {-1, -4}) { + Assert.assertEquals(2, graph.getValue(i).get()); + Assert.assertEquals(2, graph.getVertex(i).getNumEdges()); + + Map<Integer, Integer> edges = edgesToMap(graph.getVertex(i).getEdges()); + // self loop + Assert.assertEquals(2, edges.get(i).intValue()); + + Assert.assertEquals(2, edges.get(-2).intValue()); + } + for (int i : new int[] {-2}) { + Assert.assertEquals(2, graph.getValue(i).get()); + Assert.assertEquals(3, graph.getVertex(i).getNumEdges()); + + Map<Integer, Integer> edges = edgesToMap(graph.getVertex(i).getEdges()); + // self loop + Assert.assertEquals(2, edges.get(i).intValue()); + + Assert.assertEquals(2, edges.get(-1).intValue()); + Assert.assertEquals(2, edges.get(-4).intValue()); + } + for (int i : new int[] {-6}) { + Assert.assertEquals(1, graph.getValue(i).get()); + Assert.assertEquals(0, graph.getVertex(i).getNumEdges()); + } + }, + (conf) -> { + BlockUtils.setBlockFactoryClass(conf, TestCoarseningUtilsBlockFactory.class); + }); + } + + private static Map<Integer, Integer> edgesToMap(Iterable<Edge<IntWritable, IntWritable>> edges) { + Map<Integer, Integer> map = new HashMap<>(); + for(Edge<IntWritable, IntWritable> edge : edges) { + Assert.assertNull(map.put(edge.getTargetVertexId().get(), edge.getValue().get())); + } + return map; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java b/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java new file mode 100644 index 0000000..d8b4cc1 --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperJava8Test.java @@ -0,0 +1,174 @@ +package org.apache.giraph.writable.kryo; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Iterator; +import java.util.Random; + +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.block_app.library.striping.StripingUtils; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.Predicate; +import org.apache.giraph.function.Supplier; +import org.apache.giraph.function.primitive.Int2ObjFunction; +import org.apache.giraph.function.primitive.Obj2IntFunction; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.LongWritable; +import org.junit.Assert; +import org.junit.Test; + + +public class KryoWritableWrapperJava8Test { + // Copy from KryoWritableWrapperTest, since we cannot extend it, since tests are not in jars + public static <T> T kryoSerDeser(T t) throws IOException { + KryoWritableWrapper<T> wrapped = new KryoWritableWrapper<>(t); + KryoWritableWrapper<T> deser = new KryoWritableWrapper<>(); + WritableUtils.copyInto(wrapped, deser, true); + return deser.get(); + } + + @Test(expected = RuntimeException.class) + public void testNonSerializableLambda() throws IOException { + Runnable nonCapturing = () -> System.out.println("works"); + kryoSerDeser(nonCapturing).run(); + } + + @Test + public void testLambda() throws IOException { + Runnable nonCapturing = (Runnable & Serializable) () -> System.out.println("works"); + kryoSerDeser(nonCapturing).run(); + + String works = "works"; + Runnable capturing = (Runnable & Serializable) () -> System.out.println(works); + kryoSerDeser(capturing).run(); + } + + @Test + public void testLambdaCapturingThisRef() throws IOException { + KryoWritableWrapperJava8Test o = this; + Runnable capturingThisRef = (Runnable & Serializable) () -> + System.out.println(o); + kryoSerDeser(capturingThisRef).run(); + } + + @Test + public void testLambdaCapturingThis() throws IOException { + Runnable capturingThis = (Runnable & Serializable) () -> + System.out.println(this); + kryoSerDeser(capturingThis).run(); + } + + @Test + public void testLambdaCapturingLambda() throws IOException { + Supplier<Boolean> nonCapturing = () -> true; + Runnable capturingLambda = (Runnable & Serializable) () -> + System.out.println(nonCapturing); + kryoSerDeser(capturingLambda).run(); + } + + @Test + public void testLambdaCapturingLambdaWithCapture() throws IOException { + boolean trueVar = new Random().nextDouble() < 1; + Supplier<Boolean> capturing = () -> trueVar; + Runnable capturingLambda = (Runnable & Serializable) () -> + System.out.println(capturing + " " + trueVar); + kryoSerDeser(capturingLambda).run(); + } + + + @Test + public void testLambdaFunctions() throws IOException { + Supplier<Boolean> nonCapturing = () -> true; + Assert.assertTrue(kryoSerDeser(nonCapturing).get()); + + boolean trueVar = new Random().nextDouble() < 1; + Supplier<Boolean> capturing = () -> trueVar; + Assert.assertTrue(kryoSerDeser(capturing).get()); + } + + @Test + public void testLambdasFromCode() throws IOException { + Assert.assertNotNull(kryoSerDeser(StripingUtils.fastHashStriping(3))); + Assert.assertNotNull(kryoSerDeser(StripingUtils.fastHashStripingPredicate(3))); + + Assert.assertNotNull(kryoSerDeser( + (Int2ObjFunction<Obj2IntFunction<LongWritable>>) StripingUtils::fastHashStriping)); + + Int2ObjFunction<Int2ObjFunction<Predicate<LongWritable>>> stripingPredicate = StripingUtils::fastHashStripingPredicate; + Assert.assertNotNull(kryoSerDeser(stripingPredicate)); + + Assert.assertNotNull(kryoSerDeser(stripingPredicate.apply(3))); + + Assert.assertNotNull(kryoSerDeser(StripingUtils.fastHashStripingPredicate(3).apply(2))); + + Assert.assertNotNull(kryoSerDeser(stripingPredicate.apply(3).apply(2))); + + Predicate<LongWritable> predicate = stripingPredicate.apply(3).apply(2); + Assert.assertNotNull(kryoSerDeser(predicate)); + + Runnable capturingLambda = (Runnable & Serializable) () -> + System.out.println(predicate.apply(new LongWritable())); + kryoSerDeser(capturingLambda).run(); + + Assert.assertNotNull(kryoSerDeser( + StripingUtils.generateStripedBlock( + 5, + (filter) -> new Block() { + private final Predicate<LongWritable> test = filter; + { + Assert.assertTrue(filter instanceof Serializable); + } + + @Override + public Iterator<AbstractPiece> iterator() { + return null; + } + + @Override + public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) { } + }))); + } + + @Test + public void testLambdaCapturingSameReference() throws IOException { + ObjectTransfer<Integer> transfer = new ObjectTransfer<>(); + + Consumer<Integer> consumer = (t) -> transfer.apply(t); + Supplier<Integer> supplier = () -> transfer.get(); + + class TwoLambdaObject { + Consumer<Integer> consumer; + Supplier<Integer> supplier; + + public TwoLambdaObject(Consumer<Integer> consumer, Supplier<Integer> supplier) { + this.consumer = consumer; + this.supplier = supplier; + } + } + + TwoLambdaObject object = new TwoLambdaObject(consumer, supplier); + // test transfer before serialization + object.consumer.apply(5); + Assert.assertEquals(5, object.supplier.get().intValue()); + + // test transfer through serialization + object.consumer.apply(6); + TwoLambdaObject deser = kryoSerDeser(object); + Assert.assertEquals(6, deser.supplier.get().intValue()); + + // test that after serialization, both lambdas point to the same object + deser.consumer.apply(4); + Assert.assertEquals(4, deser.supplier.get().intValue()); + } + + // Bug in Java, have test to know when it becomes fixed + @Test //(expected=RuntimeException.class) + public void testNestedLambda() throws IOException { + Int2ObjFunction<Int2ObjFunction<Integer>> f = (x) -> (y) -> x+y; + Assert.assertNotNull(kryoSerDeser(f)); + Assert.assertNotNull(kryoSerDeser(f.apply(0))); + Assert.assertNotNull(kryoSerDeser(f.apply(0).apply(1))); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java index ad6ca15..d582cb2 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java @@ -144,6 +144,8 @@ public class LocalBlockRunner { Block block, Object executionStage, TestGraph<I, V, E> graph, final VertexSaver<I, V, E> vertexSaver ) { + Preconditions.checkNotNull(block); + Preconditions.checkNotNull(graph); ImmutableClassesGiraphConfiguration<I, V, E> conf = graph.getConf(); int numWorkers = NUM_THREADS.get(conf); boolean runAllChecks = RUN_ALL_CHECKS.get(conf); http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java index 88b78a3..d926cdd 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java @@ -32,6 +32,7 @@ import org.apache.giraph.block_app.library.internal.SendMessagePiece; import org.apache.giraph.block_app.library.internal.SendMessageWithCombinerPiece; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.PairConsumer; import org.apache.giraph.function.vertex.ConsumerWithVertex; import org.apache.giraph.function.vertex.SupplierFromVertex; import org.apache.giraph.graph.Vertex; @@ -170,10 +171,38 @@ public class Pieces { <S, R extends Writable, I extends WritableComparable, V extends Writable, E extends Writable> Piece<I, V, E, NoMessage, Object> reduce( + String name, + ReduceOperation<S, R> reduceOp, + SupplierFromVertex<I, V, E, S> valueSupplier, + final Consumer<R> reducedValueConsumer) { + return reduceWithMaster( + name, reduceOp, valueSupplier, + new PairConsumer<R, BlockMasterApi>() { + @Override + public void apply(R input, BlockMasterApi master) { + reducedValueConsumer.apply(input); + } + }); + } + + /** + * Creates single reducer piece - given reduce class, supplier of values on + * worker, reduces and passes the result to given consumer on master. + * + * @param <S> Single value type, objects passed on workers + * @param <R> Reduced value type + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ + public static + <S, R extends Writable, I extends WritableComparable, V extends Writable, + E extends Writable> + Piece<I, V, E, NoMessage, Object> reduceWithMaster( final String name, final ReduceOperation<S, R> reduceOp, final SupplierFromVertex<I, V, E, S> valueSupplier, - final Consumer<R> reducedValueConsumer) { + final PairConsumer<R, BlockMasterApi> reducedValueConsumer) { return new Piece<I, V, E, NoMessage, Object>() { private ReducerHandle<S, R> handle; @@ -197,7 +226,7 @@ public class Pieces { @Override public void masterCompute(BlockMasterApi master, Object executionStage) { - reducedValueConsumer.apply(handle.getReducedValue(master)); + reducedValueConsumer.apply(handle.getReducedValue(master), master); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java index b606a34..d1efd5b 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/SendMessageChain.java @@ -19,12 +19,14 @@ package org.apache.giraph.block_app.library; import java.util.Iterator; +import org.apache.giraph.block_app.framework.api.BlockMasterApi; import org.apache.giraph.block_app.framework.block.Block; import org.apache.giraph.block_app.framework.block.SequenceBlock; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.function.Consumer; import org.apache.giraph.function.Function; import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.PairConsumer; import org.apache.giraph.function.vertex.ConsumerWithVertex; import org.apache.giraph.function.vertex.FunctionWithVertex; import org.apache.giraph.function.vertex.SupplierFromVertex; @@ -145,6 +147,18 @@ public class SendMessageChain<I extends WritableComparable, V extends Writable, } /** + * Start chain by providing a function that will produce Block representing + * beginning of the chain, given a consumer of messages send + * by the last link in the created block. + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable, P extends Writable> + SendMessageChain<I, V, E, P> startCustom( + Function<ConsumerWithVertex<I, V, E, P>, Block> createStartingBlock) { + return new SendMessageChain<>(createStartingBlock); + } + + /** * Give previously received message(s) to messageSupplier, and send message * it returns to all targets provided by targetsSupplier. */ @@ -245,23 +259,51 @@ public class SendMessageChain<I extends WritableComparable, V extends Writable, * by reducedValueConsumer. */ public <S, R extends Writable> - Block endReduce(String name, ReduceOperation<S, R> reduceOp, + Block endReduce(final String name, final ReduceOperation<S, R> reduceOp, final FunctionWithVertex<I, V, E, P, S> valueSupplier, - Consumer<R> reducedValueConsumer) { - final ObjectTransfer<P> prevMessagesTransfer = new ObjectTransfer<>(); + final Consumer<R> reducedValueConsumer) { + return endCustom(new Function<SupplierFromVertex<I, V, E, P>, Block>() { + @Override + public Block apply(final SupplierFromVertex<I, V, E, P> prevMessages) { + return Pieces.reduce( + name, + reduceOp, + new SupplierFromVertex<I, V, E, S>() { + @Override + public S get(Vertex<I, V, E> vertex) { + return valueSupplier.apply(vertex, prevMessages.get(vertex)); + } + }, + reducedValueConsumer); + } + }); + } - return new SequenceBlock( - blockCreator.apply(prevMessagesTransfer.<I, V, E>castToConsumer()), - Pieces.reduce( - name, - reduceOp, - new SupplierFromVertex<I, V, E, S>() { - @Override - public S get(Vertex<I, V, E> vertex) { - return valueSupplier.apply(vertex, prevMessagesTransfer.get()); - } - }, - reducedValueConsumer)); + /** + * End chain by giving received messages to valueSupplier, + * to produce value that should be reduced, and consumed on master + * by reducedValueConsumer. + */ + public <S, R extends Writable> + Block endReduceWithMaster( + final String name, final ReduceOperation<S, R> reduceOp, + final FunctionWithVertex<I, V, E, P, S> valueSupplier, + final PairConsumer<R, BlockMasterApi> reducedValueConsumer) { + return endCustom(new Function<SupplierFromVertex<I, V, E, P>, Block>() { + @Override + public Block apply(final SupplierFromVertex<I, V, E, P> prevMessages) { + return Pieces.reduceWithMaster( + name, + reduceOp, + new SupplierFromVertex<I, V, E, S>() { + @Override + public S get(Vertex<I, V, E> vertex) { + return valueSupplier.apply(vertex, prevMessages.get(vertex)); + } + }, + reducedValueConsumer); + } + }); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java new file mode 100644 index 0000000..83ef3fd --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/WorkerGCPiece.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.gc; + +import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; +import org.apache.giraph.block_app.framework.piece.PieceWithWorkerContext; +import org.apache.giraph.types.NoMessage; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +/** + * Dummy piece to hint System.gc() + */ +public class WorkerGCPiece extends PieceWithWorkerContext<WritableComparable, + Writable, Writable, NoMessage, Object, NoMessage, Object> { + + @Override + @SuppressFBWarnings(value = "DM_GC") + public void workerContextSend( + BlockWorkerContextSendApi<NoMessage> workerContextApi, + Object executionStage, + Object workerValue) { + System.gc(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/package-info.java new file mode 100644 index 0000000..bdf5ded --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/gc/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * GC utility pieces. + */ +package org.apache.giraph.block_app.library.gc; http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Double2ObjFunction.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Double2ObjFunction.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Double2ObjFunction.java new file mode 100644 index 0000000..8b3c0a3 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/Double2ObjFunction.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function.primitive; + +import java.io.Serializable; + +/** + * Primitive specialization of Function: + * (double) -> T + * + * @param <T> Result type + */ +public interface Double2ObjFunction<T> extends Serializable { + /** + * Returns the result of applying this function to given {@code input}. + * + * The returned object may or may not be a new instance, + * depending on the implementation. + */ + T apply(double input); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/DoubleConsumer.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/DoubleConsumer.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/DoubleConsumer.java new file mode 100644 index 0000000..4725d4b --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/DoubleConsumer.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function.primitive; + +import java.io.Serializable; + + +/** + * Primitive specialization of Function: + * (double) -> void + */ +public interface DoubleConsumer extends Serializable { + /** + * Applies this function to {@code input} + */ + void apply(double input); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-core/src/main/java/org/apache/giraph/combiner/MaxMessageCombiner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MaxMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MaxMessageCombiner.java new file mode 100644 index 0000000..5a7c101 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MaxMessageCombiner.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.combiner; + +import org.apache.giraph.types.ops.DoubleTypeOps; +import org.apache.giraph.types.ops.FloatTypeOps; +import org.apache.giraph.types.ops.IntTypeOps; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.giraph.types.ops.NumericTypeOps; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Message combiner which calculates max of all messages. + * + * @param <M> Message type + */ +public class MaxMessageCombiner<M extends WritableComparable> + implements MessageCombiner<WritableComparable, M> { + /** DoubleWritable specialization */ + public static final MaxMessageCombiner<DoubleWritable> DOUBLE = + new MaxMessageCombiner<>(DoubleTypeOps.INSTANCE); + /** DoubleWritable specialization */ + public static final MaxMessageCombiner<FloatWritable> FLOAT = + new MaxMessageCombiner<>(FloatTypeOps.INSTANCE); + /** LongWritable specialization */ + public static final MaxMessageCombiner<LongWritable> LONG = + new MaxMessageCombiner<>(LongTypeOps.INSTANCE); + /** IntWritable specialization */ + public static final MaxMessageCombiner<IntWritable> INT = + new MaxMessageCombiner<>(IntTypeOps.INSTANCE); + + /** Value type operations */ + private final NumericTypeOps<M> typeOps; + + /** + * Constructor + * @param typeOps Value type operations + */ + public MaxMessageCombiner(NumericTypeOps<M> typeOps) { + this.typeOps = typeOps; + } + + @Override + public void combine( + WritableComparable vertexIndex, M originalMessage, M messageToCombine) { + if (originalMessage.compareTo(messageToCombine) < 0) { + typeOps.set(originalMessage, messageToCombine); + } + } + + @Override + public M createInitialMessage() { + return typeOps.createZero(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/3b7c68e5/giraph-core/src/main/java/org/apache/giraph/reducers/impl/PairReduce.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/PairReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/PairReduce.java new file mode 100644 index 0000000..1aea293 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/PairReduce.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.reducers.impl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.writable.tuple.PairWritable; +import org.apache.hadoop.io.Writable; +import org.python.google.common.base.Preconditions; + +/** + * Combines two individual reducers, to create a single reducer of pairs that + * reduces each of them individually. + * + * @param <S1> First single value type + * @param <R1> First reduced value type + * @param <S2> Second single value type + * @param <R2> Second reduced value type + */ +public class PairReduce<S1, R1 extends Writable, S2, R2 extends Writable> + implements ReduceOperation<Pair<S1, S2>, PairWritable<R1, R2>> { + /** First reduceOp */ + private ReduceOperation<S1, R1> reduce1; + /** Second reduceOp */ + private ReduceOperation<S2, R2> reduce2; + + /** Constructor */ + public PairReduce() { + } + + /** + * Constructor + * @param reduce1 First reduceOp + * @param reduce2 Second reduceOp + */ + public PairReduce( + ReduceOperation<S1, R1> reduce1, ReduceOperation<S2, R2> reduce2) { + this.reduce1 = reduce1; + this.reduce2 = reduce2; + } + + + @Override + public PairWritable<R1, R2> createInitialValue() { + return new PairWritable<>( + reduce1.createInitialValue(), reduce2.createInitialValue()); + } + + @Override + public PairWritable<R1, R2> reduce( + PairWritable<R1, R2> curValue, Pair<S1, S2> valueToReduce) { + Preconditions.checkState( + curValue.getLeft() == + reduce1.reduce(curValue.getLeft(), valueToReduce.getLeft())); + Preconditions.checkState( + curValue.getRight() == + reduce2.reduce(curValue.getRight(), valueToReduce.getRight())); + return curValue; + } + + @Override + public PairWritable<R1, R2> reduceMerge( + PairWritable<R1, R2> curValue, PairWritable<R1, R2> valueToReduce) { + Preconditions.checkState( + curValue.getLeft() == + reduce1.reduceMerge(curValue.getLeft(), valueToReduce.getLeft())); + Preconditions.checkState( + curValue.getRight() == + reduce2.reduceMerge(curValue.getRight(), valueToReduce.getRight())); + return curValue; + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeWritableObject(reduce1, out); + WritableUtils.writeWritableObject(reduce2, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + reduce1 = WritableUtils.readWritableObject(in, null); + reduce2 = WritableUtils.readWritableObject(in, null); + } +}
