Repository: giraph Updated Branches: refs/heads/trunk f9dc6b59e -> d7e4bde11
http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestConnectedComponents.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestConnectedComponents.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestConnectedComponents.java new file mode 100644 index 0000000..c5c91ce --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestConnectedComponents.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.prepare_graph; + +import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.library.VertexSuppliers; +import org.apache.giraph.block_app.library.prepare_graph.vertex.ConnectedComponentVertexValue; +import org.apache.giraph.block_app.library.prepare_graph.vertex.WeaklyConnectedComponentVertexValue; +import org.apache.giraph.block_app.library.prepare_graph.vertex.WeaklyConnectedComponentVertexValueImpl; +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit; +import org.apache.giraph.block_app.test_setup.graphs.Small2GraphInit; +import org.apache.giraph.block_app.test_setup.graphs.SmallDirectedForestGraphInit; +import org.apache.giraph.block_app.test_setup.graphs.SmallDirectedTreeGraphInit; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.junit.Assert; +import org.junit.Test; + +public class TestConnectedComponents { + + private static final + SupplierFromVertex<LongWritable, LongWritable, Writable, LongWritable> GET_COMPONENT = + VertexSuppliers.<LongWritable, LongWritable, Writable>vertexValueSupplier(); + + private static final + ConsumerWithVertex<LongWritable, LongWritable, Writable, LongWritable> SET_COMPONENT = + (vertex, value) -> vertex.getValue().set(value.get()); + + + private <V extends Writable> + NumericTestGraph<LongWritable, V, NullWritable> createEmptyGraph(Class<V> vertexValue) { + GiraphConfiguration conf = new GiraphConfiguration(); + + GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class); + GiraphConstants.VERTEX_VALUE_CLASS.set(conf, vertexValue); + GiraphConstants.EDGE_VALUE_CLASS.set(conf, NullWritable.class); + + NumericTestGraph<LongWritable, V, NullWritable> graph = new NumericTestGraph<>(conf); + return graph; + } + + private NumericTestGraph<LongWritable, LongWritable, NullWritable> createEmptyGraph() { + return createEmptyGraph(LongWritable.class); + } + + @Test + public void testCC() throws Exception { + NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = createEmptyGraph(); + new Small1GraphInit<LongWritable, LongWritable, NullWritable>().modifyGraph(graph); + + Block ccBlock = UndirectedConnectedComponents.calculateConnectedComponents( + 100, GET_COMPONENT, SET_COMPONENT); + LocalBlockRunner.runBlock(graph.getTestGraph(), ccBlock, new Object()); + + for (int i : new int[] {0, 1, 2, 3, 4, 5}) { + Assert.assertEquals(0, graph.getValue(i).get()); + } + for (int i : new int[] {6}) { + Assert.assertEquals(6, graph.getValue(i).get()); + } + + Block keepLargestBlock = UndirectedConnectedComponents.calculateAndKeepLargestComponent( + 100, GET_COMPONENT, SET_COMPONENT); + + LocalBlockRunner.runBlock(graph.getTestGraph(), keepLargestBlock, new Object()); + + for (int i : new int[] {0, 1, 2, 3, 4, 5}) { + Assert.assertEquals(0, graph.getValue(i).get()); + } + + for (int i : new int[] {6}) { + Assert.assertNull(graph.getVertex(i)); + } + } + + @Test + public void testMultipleComponentCC() throws Exception { + NumericTestGraph<LongWritable, LongWritable, NullWritable> graph = createEmptyGraph(); + new Small2GraphInit<LongWritable, LongWritable, NullWritable>().modifyGraph(graph); + + Block ccBlock = UndirectedConnectedComponents.calculateConnectedComponents( + 100, GET_COMPONENT, SET_COMPONENT); + LocalBlockRunner.runBlock(graph.getTestGraph(), ccBlock, new Object()); + + for (int i : new int[] {0, 1, 2}) { + Assert.assertEquals(0, graph.getValue(i).get()); + } + for (int i : new int[] {3, 4, 5}) { + Assert.assertEquals(3, graph.getValue(i).get()); + } + for (int i : new int[] {6}) { + Assert.assertEquals(6, graph.getValue(i).get()); + } + + Block keepAbove3 = UndirectedConnectedComponents.calculateAndKeepComponentAboveThreshold( + 100, 3, GET_COMPONENT, SET_COMPONENT); + + LocalBlockRunner.runBlock(graph.getTestGraph(), keepAbove3, new Object()); + + for (int i : new int[] {0, 1, 2}) { + Assert.assertEquals(0, graph.getValue(i).get()); + } + for (int i : new int[] {3, 4, 5}) { + Assert.assertEquals(3, graph.getValue(i).get()); + } + + for (int i : new int[] {6}) { + Assert.assertNull(graph.getVertex(i)); + } + + Block keepAbove4 = UndirectedConnectedComponents.calculateAndKeepComponentAboveThreshold( + 100, 4, GET_COMPONENT, SET_COMPONENT); + + LocalBlockRunner.runBlock(graph.getTestGraph(), keepAbove4, new Object()); + + for (int i : new int[] {0, 1, 2, 3, 4, 5}) { + Assert.assertNull(graph.getVertex(i)); + } + } + + @Test + public void testWeaklyCCOnTree() { + NumericTestGraph<LongWritable, WeaklyConnectedComponentVertexValueImpl, NullWritable> graph = + createEmptyGraph(WeaklyConnectedComponentVertexValueImpl.class); + new SmallDirectedTreeGraphInit<LongWritable, WeaklyConnectedComponentVertexValueImpl, NullWritable>().modifyGraph(graph); + + Block weaklyCC = WeaklyConnectedComponents.calculateConnectedComponents( + 200, + ConnectedComponentVertexValue.getComponentSupplier(), + ConnectedComponentVertexValue.setComponentConsumer(), + WeaklyConnectedComponentVertexValue.getEdgeIdsSupplier(), + WeaklyConnectedComponentVertexValue.setEdgeIdsConsumer(), + false); + + LocalBlockRunner.runBlock(graph.getTestGraph(), weaklyCC, new Object()); + + for (int i : new int[] {0, 1, 2, 3, 4, 5, 6}) { + Assert.assertEquals(0, graph.getValue(i).getComponent()); + } + } + + @Test + public void testWeaklyCCOnForest() { + NumericTestGraph<LongWritable, WeaklyConnectedComponentVertexValueImpl, NullWritable> graph = + createEmptyGraph(WeaklyConnectedComponentVertexValueImpl.class); + new SmallDirectedForestGraphInit<LongWritable, WeaklyConnectedComponentVertexValueImpl, NullWritable>().modifyGraph(graph); + + Block weaklyCC = WeaklyConnectedComponents.calculateConnectedComponents( + 200, + ConnectedComponentVertexValue.getComponentSupplier(), + ConnectedComponentVertexValue.setComponentConsumer(), + WeaklyConnectedComponentVertexValue.getEdgeIdsSupplier(), + WeaklyConnectedComponentVertexValue.setEdgeIdsConsumer(), + false); + + LocalBlockRunner.runBlock(graph.getTestGraph(), weaklyCC, new Object()); + + for (int i : new int[] {0, 1, 2, 3}) { + Assert.assertEquals(0, graph.getValue(i).getComponent()); + } + + for (int i : new int[] {4, 5}) { + Assert.assertEquals(4, graph.getValue(i).getComponent()); + } + + for (int i : new int[] {6, 7, 8}) { + Assert.assertEquals(6, graph.getValue(i).getComponent()); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestPrepareGraph.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestPrepareGraph.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestPrepareGraph.java new file mode 100644 index 0000000..bd4957b --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestPrepareGraph.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.prepare_graph; + +import org.apache.giraph.block_app.framework.AbstractBlockFactory; +import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.library.prepare_graph.TestPrepareGraph.TmpBlockFactory.TmpRunBlockFactory; +import org.apache.giraph.block_app.library.prepare_graph.TestPrepareGraph.TmpBlockFactory.TmpRunBlockFactory2; +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.TestGraphModifier; +import org.apache.giraph.block_app.test_setup.TestGraphUtils; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.junit.Assert; +import org.junit.experimental.theories.DataPoints; +import org.junit.experimental.theories.Theories; +import org.junit.experimental.theories.Theory; +import org.junit.runner.RunWith; + +@RunWith(Theories.class) +public class TestPrepareGraph { + public static @DataPoints boolean[] booleanOptions = {false, true}; + + @Theory + public void test1(boolean fullGiraphEnv) throws Exception { + TestGraphUtils.runTest( + new CreateTestGraph(), + (graph) -> { + Assert.assertTrue(connected(graph, 0, 1)); + Assert.assertTrue(connected(graph, 1, 0)); + Assert.assertTrue(!connected(graph, 1, 2)); + Assert.assertNull(graph.getVertex(2)); + Assert.assertNull(graph.getVertex(3)); + }, + (conf) -> { + BlockUtils.setBlockFactoryClass(conf, TmpRunBlockFactory.class); + TestGraphUtils.USE_FULL_GIRAPH_ENV_IN_TESTS.set(conf, fullGiraphEnv); + }); + } + + @Theory + public void test2(boolean fullGiraphEnv) throws Exception { + TestGraphUtils.runTest( + new CreateTestGraph(), + (graph) -> { + Assert.assertTrue(connected(graph, 0, 1)); + Assert.assertTrue(connected(graph, 1, 0)); + Assert.assertTrue(connected(graph, 1, 2)); + Assert.assertTrue(connected(graph, 2, 1)); + Assert.assertNull(graph.getVertex(3)); + }, + (conf) -> { + BlockUtils.setBlockFactoryClass(conf, TmpRunBlockFactory2.class); + TestGraphUtils.USE_FULL_GIRAPH_ENV_IN_TESTS.set(conf, fullGiraphEnv); + }); + } + + private static boolean connected( + NumericTestGraph<LongWritable, NullWritable, NullWritable> graph, int from, + int to) { + for (Edge<LongWritable, NullWritable> edge : graph.getVertex(from).getEdges()) { + if (edge.getTargetVertexId().get() == to) { + return true; + } + } + return false; + } + + private static class CreateTestGraph + implements TestGraphModifier<LongWritable, NullWritable, NullWritable> { + @Override + public void modifyGraph(NumericTestGraph<LongWritable, NullWritable, NullWritable> graph) { + graph.addEdge(0, 1); + graph.addEdge(1, 0); + graph.addEdge(1, 2); + } + } + + + /** + * class used for testing for building the block to test + */ + public static abstract class TmpBlockFactory extends AbstractBlockFactory<Object> { + @Override + public Object createExecutionStage(GiraphConfiguration conf) { + return new Object(); + } + + @Override + protected Class<LongWritable> getVertexIDClass(GiraphConfiguration conf) { + return LongWritable.class; + } + + @Override + protected Class<NullWritable> getVertexValueClass(GiraphConfiguration conf) { + return NullWritable.class; + } + + @Override + protected Class<NullWritable> getEdgeValueClass(GiraphConfiguration conf) { + return NullWritable.class; + } + + /** + * Temporary factory that creates a sequence of pieces to remove asymmetric edges + * and standing alone vertices.(Used for testing) + */ + public static class TmpRunBlockFactory extends TmpBlockFactory { + @Override + public Block createBlock(GiraphConfiguration conf) { + return new SequenceBlock(PrepareGraphPieces.removeAsymEdges(LongTypeOps.INSTANCE), + PrepareGraphPieces.removeStandAloneVertices()); + } + } + + /** + * Temporary factory that creates a sequence of pieces to make the graph symmetric + * and remove standing alone vertices.(Used for testing) + */ + public static class TmpRunBlockFactory2 extends TmpBlockFactory { + // relies on shouldCreateVertexOnMsgs=true + + @Override + public Block createBlock(GiraphConfiguration conf) { + return new SequenceBlock(PrepareGraphPieces.makeSymmetricUnweighted(LongTypeOps.INSTANCE), + PrepareGraphPieces.removeStandAloneVertices()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-block-app/pom.xml b/giraph-block-app/pom.xml index a05c1c5..3524e19 100644 --- a/giraph-block-app/pom.xml +++ b/giraph-block-app/pom.xml @@ -110,6 +110,10 @@ under the License. <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>annotations</artifactId> + </dependency> <!-- runtime dependency --> <dependency> http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java index be5d4fe..815cbfa 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/reducers/array/HugeArrayUtils.java @@ -48,24 +48,26 @@ import org.apache.hadoop.io.Writable; * In Giraph, for each reducer there is a worker machine which is it's owner, * which does partial aggregation for it. So if we have only single huge * reducer - other workers will have to wait, while that single worker is doing - * huge reducing operation. + * huge reducing operation. Additionally single reducer should be smaller then + * max netty message, which is 1MB. * On the other hand, each reducer has a meaningful overhead, so we should try - * to keep number of reducers as low as possible (in total less then 10k is a - * good number). - * What we want is to split such huge reducers into slightly more then number - * of worker reducers, and NUM_REDUCERS = 50000 is used here as a good middle - * ground. + * to keep number of reducers as low as possible. + * + * By default we are being conservative, to keep individual reducers small, + * with striping into 500k reducers by default. If you know exact sizes of + * your objects you can specify exact number you want. * * So when we have huge array, we don't want one reducer/broadcast for each * element, but we also don't want one reducer/broadcast for the whole array. * * This class allows transparent split into reasonable number of reducers - * (~50000), which solves both of the above issues. + * (~500k), which solves both of the above issues. */ public class HugeArrayUtils { - // Striping perfectly reducers of up to 25GB (i.e. 500KB * NUM_STRIPES). + // Even with 100GB object, average stripe will be 200KB on average, + // keeping outliers mostly under 1MB limit private static final IntConfOption NUM_STRIPES = new IntConfOption( - "giraph.reducers.HugeArrayUtils.num_stripes", 50000, + "giraph.reducers.HugeArrayUtils.num_stripes", 500000, "Number of distict reducers to create. If array is smaller then this" + "number, each element will be it's own reducer"); http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java index 4886c80..c5d2fb1 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/NumericTestGraph.java @@ -180,7 +180,9 @@ public class NumericTestGraph<I extends WritableComparable, */ public void addEdge(Number fromVertex, Number toVertex, E edgeValue) { testGraph.addEdge( - numberToVertexId(fromVertex), numberToVertexId(toVertex), edgeValue); + numberToVertexId(fromVertex), + numberToVertexId(toVertex), + edgeValueOrCreate(edgeValue)); } /** @@ -310,6 +312,11 @@ public class NumericTestGraph<I extends WritableComparable, numberToEdgeValue.apply(edgeValue) : getConf().createEdgeValue(); } + public E edgeValueOrCreate(E edgeValue) { + return edgeValue != null ? edgeValue : getConf().createEdgeValue(); + } + + public Vertex<I, V, E> createVertex() { return getConf().createVertex(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java index ecec024..b014791 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small1GraphInit.java @@ -40,23 +40,30 @@ public class Small1GraphInit<I extends WritableComparable, V extends Writable, E extends Writable> implements TestGraphModifier<I, V, E> { - private final Supplier<V> valueSupplier; private final Supplier<E> edgeSupplier; - public Small1GraphInit( - Supplier<V> valueSupplier, Supplier<E> edgeSupplier) { - this.valueSupplier = valueSupplier; + public Small1GraphInit() { + this(null); + } + + public Small1GraphInit(Supplier<E> edgeSupplier) { this.edgeSupplier = edgeSupplier; } @Override public void modifyGraph(NumericTestGraph<I, V, E> graph) { - graph.addVertex(0, valueSupplier.get(), edgeSupplier, 1, 2); - graph.addVertex(1, valueSupplier.get(), edgeSupplier, 0, 2); - graph.addVertex(2, valueSupplier.get(), edgeSupplier, 0, 1, 3); - graph.addVertex(3, valueSupplier.get(), edgeSupplier, 2, 4, 5); - graph.addVertex(4, valueSupplier.get(), edgeSupplier, 3, 5); - graph.addVertex(5, valueSupplier.get(), edgeSupplier, 3, 4); - graph.addVertex(6, valueSupplier.get(), edgeSupplier); + graph.addSymmetricEdge(0, 1, createEdgeValue()); + graph.addSymmetricEdge(0, 2, createEdgeValue()); + graph.addSymmetricEdge(1, 2, createEdgeValue()); + graph.addSymmetricEdge(2, 3, createEdgeValue()); + graph.addSymmetricEdge(3, 4, createEdgeValue()); + graph.addSymmetricEdge(3, 5, createEdgeValue()); + graph.addSymmetricEdge(4, 5, createEdgeValue()); + + graph.addVertex(6); + } + + private E createEdgeValue() { + return edgeSupplier != null ? edgeSupplier.get() : null; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java index eb38c45..bfa603c 100644 --- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/Small2GraphInit.java @@ -39,25 +39,30 @@ import org.apache.hadoop.io.WritableComparable; public class Small2GraphInit<I extends WritableComparable, V extends Writable, E extends Writable> implements TestGraphModifier<I, V, E> { - - private final Supplier<V> valueSupplier; private final Supplier<E> edgeSupplier; - public Small2GraphInit( - Supplier<V> valueSupplier, Supplier<E> edgeSupplier) { - this.valueSupplier = valueSupplier; + public Small2GraphInit() { + this(null); + } + + public Small2GraphInit(Supplier<E> edgeSupplier) { this.edgeSupplier = edgeSupplier; } @Override public void modifyGraph(NumericTestGraph<I, V, E> graph) { - graph.addVertex(0, valueSupplier.get(), edgeSupplier, 1, 2); - graph.addVertex(1, valueSupplier.get(), edgeSupplier, 0, 2); - graph.addVertex(2, valueSupplier.get(), edgeSupplier, 0, 1); - graph.addVertex(3, valueSupplier.get(), edgeSupplier, 4, 5); - graph.addVertex(4, valueSupplier.get(), edgeSupplier, 3, 5); - graph.addVertex(5, valueSupplier.get(), edgeSupplier, 3, 4); - graph.addVertex(6, valueSupplier.get(), edgeSupplier); + graph.addSymmetricEdge(0, 1, createEdgeValue()); + graph.addSymmetricEdge(0, 2, createEdgeValue()); + graph.addSymmetricEdge(1, 2, createEdgeValue()); + graph.addSymmetricEdge(3, 4, createEdgeValue()); + graph.addSymmetricEdge(3, 5, createEdgeValue()); + graph.addSymmetricEdge(4, 5, createEdgeValue()); + + graph.addVertex(6); + } + + private E createEdgeValue() { + return edgeSupplier != null ? edgeSupplier.get() : null; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedForestGraphInit.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedForestGraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedForestGraphInit.java new file mode 100644 index 0000000..a2194b9 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedForestGraphInit.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.test_setup.graphs; + +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.TestGraphModifier; +import org.apache.giraph.function.Supplier; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + + +/** + * Create a directed forest that looks like: + * + * 0 4 6 + * / \ | / \ + * 1 2 5 7 8 + * | + * 3 + * + * Edges are directed from top to bottom. + * Vertices with no edges are created. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +public class SmallDirectedForestGraphInit<I extends WritableComparable, + V extends Writable, E extends Writable> + implements TestGraphModifier<I, V, E> { + + private final Supplier<E> edgeSupplier; + + public SmallDirectedForestGraphInit() { + this(null); + } + + public SmallDirectedForestGraphInit(Supplier<E> edgeSupplier) { + this.edgeSupplier = edgeSupplier; + } + + @Override + public void modifyGraph(NumericTestGraph<I, V, E> graph) { + graph.addEdge(0, 1, createEdgeValue()); + graph.addEdge(0, 2, createEdgeValue()); + graph.addEdge(2, 3, createEdgeValue()); + graph.addEdge(4, 5, createEdgeValue()); + graph.addEdge(6, 7, createEdgeValue()); + graph.addEdge(6, 8, createEdgeValue()); + + graph.addVertex(1); + graph.addVertex(3); + graph.addVertex(5); + graph.addVertex(7); + graph.addVertex(8); + } + + private E createEdgeValue() { + return edgeSupplier != null ? edgeSupplier.get() : null; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedTreeGraphInit.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedTreeGraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedTreeGraphInit.java new file mode 100644 index 0000000..6522e62 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SmallDirectedTreeGraphInit.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.test_setup.graphs; + +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.TestGraphModifier; +import org.apache.giraph.function.Supplier; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + + +/** + * Create a directed tree that looks like: + * + * 0 __ + * / \ \ + * 1 2 6 + * | |\ + * 3 4 5 + * + * Edges are directed from top to bottom. + * Vertices with no edges are created. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +public class SmallDirectedTreeGraphInit<I extends WritableComparable, + V extends Writable, E extends Writable> + implements TestGraphModifier<I, V, E> { + + private final Supplier<E> edgeSupplier; + + public SmallDirectedTreeGraphInit() { + this(null); + } + + public SmallDirectedTreeGraphInit(Supplier<E> edgeSupplier) { + this.edgeSupplier = edgeSupplier; + } + + @Override + public void modifyGraph(NumericTestGraph<I, V, E> graph) { + graph.addEdge(0, 1, createEdgeValue()); + graph.addEdge(0, 2, createEdgeValue()); + graph.addEdge(0, 6, createEdgeValue()); + graph.addEdge(1, 3, createEdgeValue()); + graph.addEdge(2, 4, createEdgeValue()); + graph.addEdge(2, 5, createEdgeValue()); + + graph.addVertex(3); + graph.addVertex(4); + graph.addVertex(5); + graph.addVertex(6); + } + + private E createEdgeValue() { + return edgeSupplier != null ? edgeSupplier.get() : null; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/object/MultiSizedReusable.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/object/MultiSizedReusable.java b/giraph-block-app/src/main/java/org/apache/giraph/object/MultiSizedReusable.java new file mode 100644 index 0000000..2a91e8f --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/object/MultiSizedReusable.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.object; + +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.primitive.Int2ObjFunction; +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.BasicSet; + +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +/** + * Holds reusable objects of multiple sizes. + * Example usecase, is when we need a hashmap - that we will insert and iterate + * on, both clear() and iterate method depend on size. And if we want to reuse + * objects, we want to have multiple objects of different sizes, that we will + * reuse. + * + * Instead of creating object for each distinct size, it creates objects with + * first larger power of 2. + * + * @param <T> Type of reusable object + */ +public class MultiSizedReusable<T> implements Int2ObjFunction<T> { + private final Int2ObjFunction<T> createSized; + private final Consumer<T> init; + @SuppressWarnings("unchecked") + @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") + private final transient T[] holder = (T[]) new Object[Integer.SIZE]; + + // No-arg constructor Kryo can call to initialize holder + MultiSizedReusable() { + this(null, null); + } + + public MultiSizedReusable(Int2ObjFunction<T> createSized, Consumer<T> init) { + this.createSized = createSized; + this.init = init; + } + + @Override + public T apply(int size) { + Preconditions.checkArgument(size >= 0); + int shiftBits = (Integer.SIZE - + Integer.numberOfLeadingZeros(Math.max(0, size - 1))) / 2; + T result = holder[shiftBits]; + if (result == null) { + if (shiftBits >= 15) { + result = createSized.apply(Integer.MAX_VALUE); + } else { + result = createSized.apply(1 << (shiftBits * 2 + 1)); + } + holder[shiftBits] = result; + } + if (init != null) { + init.apply(result); + } + return result; + } + + public static <I> MultiSizedReusable<BasicSet<I>> createForBasicSet( + final PrimitiveIdTypeOps<I> idTypeOps) { + return new MultiSizedReusable<>( + new Int2ObjFunction<BasicSet<I>>() { + @Override + public BasicSet<I> apply(int value) { + return idTypeOps.createOpenHashSet(value); + } + }, + new Consumer<BasicSet<I>>() { + @Override + public void apply(BasicSet<I> t) { + t.clear(); + } + }); + } + + public static <K, V> + MultiSizedReusable<Basic2ObjectMap<K, V>> createForBasic2ObjectMap( + final PrimitiveIdTypeOps<K> idTypeOps) { + return new MultiSizedReusable<>( + new Int2ObjFunction<Basic2ObjectMap<K, V>>() { + @Override + public Basic2ObjectMap<K, V> apply(int value) { + return idTypeOps.create2ObjectOpenHashMap(value, null); + } + }, + new Consumer<Basic2ObjectMap<K, V>>() { + @Override + public void apply(Basic2ObjectMap<K, V> t) { + t.clear(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/main/java/org/apache/giraph/object/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/object/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/object/package-info.java new file mode 100644 index 0000000..bd578ef --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/object/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Object handling related utilities. + */ +package org.apache.giraph.object; http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java index 170f307..021a24c 100644 --- a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/BlockExecutionTest.java @@ -58,10 +58,9 @@ public class BlockExecutionTest { return conf; } - private static TestGraph<LongWritable, LongWritable, NullWritable> createTestGraph( - GiraphConfiguration conf) { + private static TestGraph<LongWritable, LongWritable, NullWritable> createTestGraph() { TestGraph<LongWritable, LongWritable, NullWritable> graph = - new TestGraph<LongWritable, LongWritable, NullWritable>(conf); + new TestGraph<LongWritable, LongWritable, NullWritable>(createConf()); graph.addVertex(new LongWritable(1), new LongWritable()); graph.addVertex(new LongWritable(2), new LongWritable()); graph.addVertex(new LongWritable(3), new LongWritable()); @@ -76,8 +75,7 @@ public class BlockExecutionTest { @Test public void testMessageSending() { - GiraphConfiguration conf = createConf(); - TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(conf); + TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(); LocalBlockRunner.runBlock(graph, new Piece<WritableComparable, LongWritable, Writable, BooleanWritable, Object>() { @Override @@ -119,8 +117,7 @@ public class BlockExecutionTest { @Test public void testReducing() { - GiraphConfiguration conf = createConf(); - TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(conf); + TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(); final LongWritable value = new LongWritable(); @@ -153,4 +150,29 @@ public class BlockExecutionTest { Assert.assertEquals(4, value.get()); } + + public void testVertexRemoval() { + TestGraph<LongWritable, LongWritable, NullWritable> graph = createTestGraph(); + LocalBlockRunner.runBlock(graph, new Piece<LongWritable, Writable, Writable, NoMessage, Object>() { + @Override + public VertexSender<LongWritable, Writable, Writable> getVertexSender( + final BlockWorkerSendApi<LongWritable, Writable, Writable, NoMessage> workerApi, + Object executionStage) { + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex<LongWritable, Writable, Writable> vertex) { + long id = vertex.getId().get(); + if (id == 1 || id == 3) { + workerApi.removeVertexRequest(vertex.getId()); + } + } + }; + } + }, new Object()); + + Assert.assertNull(graph.getVertex(new LongWritable(1))); + Assert.assertNotNull(graph.getVertex(new LongWritable(2))); + Assert.assertNull(graph.getVertex(new LongWritable(3))); + Assert.assertNotNull(graph.getVertex(new LongWritable(4))); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java index 04f6bdf..c8dc288 100644 --- a/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java +++ b/giraph-core/src/main/java/org/apache/giraph/combiner/DoubleSumMessageCombiner.java @@ -23,10 +23,12 @@ import org.apache.hadoop.io.WritableComparable; /** * A combiner that sums double-valued messages + * + * Use SumMessageCombiner.DOUBLE instead. */ +@Deprecated() public class DoubleSumMessageCombiner - extends - MessageCombiner<WritableComparable, DoubleWritable> { + implements MessageCombiner<WritableComparable, DoubleWritable> { @Override public void combine(WritableComparable vertexIndex, DoubleWritable originalMessage, DoubleWritable messageToCombine) { http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java index 3015e2b..4db0097 100644 --- a/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java +++ b/giraph-core/src/main/java/org/apache/giraph/combiner/FloatSumMessageCombiner.java @@ -23,10 +23,12 @@ import org.apache.hadoop.io.WritableComparable; /** * A combiner that sums float-valued messages + * + * Use SumMessageCombiner.DOUBLE instead. */ +@Deprecated public class FloatSumMessageCombiner - extends - MessageCombiner<WritableComparable, FloatWritable> { + implements MessageCombiner<WritableComparable, FloatWritable> { @Override public void combine(WritableComparable vertexIndex, FloatWritable originalMessage, FloatWritable messageToCombine) { http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java index e53ab3f..1a19eee 100644 --- a/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java +++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MessageCombiner.java @@ -29,7 +29,7 @@ import org.apache.hadoop.io.WritableComparable; * @param <I> Vertex id * @param <M> Message data */ -public abstract class MessageCombiner<I extends WritableComparable, +public interface MessageCombiner<I extends WritableComparable, M extends Writable> { /** * Combine messageToCombine with originalMessage, by modifying @@ -44,7 +44,7 @@ public abstract class MessageCombiner<I extends WritableComparable, * (object may be reused - do not reference it or its * member objects) */ - public abstract void combine(I vertexIndex, M originalMessage, + void combine(I vertexIndex, M originalMessage, M messageToCombine); /** @@ -53,5 +53,5 @@ public abstract class MessageCombiner<I extends WritableComparable, * * @return Initial message */ - public abstract M createInitialMessage(); + M createInitialMessage(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java index db43008..9bebf81 100644 --- a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java +++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumDoubleMessageCombiner.java @@ -25,8 +25,7 @@ import org.apache.hadoop.io.WritableComparable; * MessageCombiner which finds the minimum of {@link DoubleWritable}. */ public class MinimumDoubleMessageCombiner - extends - MessageCombiner<WritableComparable, DoubleWritable> { + implements MessageCombiner<WritableComparable, DoubleWritable> { @Override public void combine(WritableComparable vertexIndex, DoubleWritable originalMessage, DoubleWritable messageToCombine) { http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java index df80b8f..542f4be 100644 --- a/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java +++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MinimumIntMessageCombiner.java @@ -25,7 +25,7 @@ import org.apache.hadoop.io.WritableComparable; * {@link MessageCombiner} that finds the minimum {@link IntWritable} */ public class MinimumIntMessageCombiner - extends MessageCombiner<WritableComparable, IntWritable> { + implements MessageCombiner<WritableComparable, IntWritable> { @Override public void combine(WritableComparable vertexIndex, IntWritable originalMessage, IntWritable messageToCombine) { http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java index e3ae597..cd51bdd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java +++ b/giraph-core/src/main/java/org/apache/giraph/combiner/SimpleSumMessageCombiner.java @@ -23,9 +23,12 @@ import org.apache.hadoop.io.WritableComparable; /** * MessageCombiner which sums up {@link IntWritable} message values. + * + * Use SumMessageCombiner.INT instead. */ +@Deprecated public class SimpleSumMessageCombiner - extends MessageCombiner<WritableComparable, IntWritable> { + implements MessageCombiner<WritableComparable, IntWritable> { @Override public void combine(WritableComparable vertexIndex, http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/combiner/SumMessageCombiner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/SumMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/SumMessageCombiner.java new file mode 100644 index 0000000..9395142 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/combiner/SumMessageCombiner.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.combiner; + +import org.apache.giraph.types.ops.DoubleTypeOps; +import org.apache.giraph.types.ops.FloatTypeOps; +import org.apache.giraph.types.ops.IntTypeOps; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.giraph.types.ops.NumericTypeOps; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Message combiner which sums all messages. + * + * @param <M> Message type + */ +public class SumMessageCombiner<M extends Writable> + implements MessageCombiner<WritableComparable, M> { + /** DoubleWritable specialization */ + public static final SumMessageCombiner<DoubleWritable> DOUBLE = + new SumMessageCombiner<>(DoubleTypeOps.INSTANCE); + /** DoubleWritable specialization */ + public static final SumMessageCombiner<FloatWritable> FLOAT = + new SumMessageCombiner<>(FloatTypeOps.INSTANCE); + /** LongWritable specialization */ + public static final SumMessageCombiner<LongWritable> LONG = + new SumMessageCombiner<>(LongTypeOps.INSTANCE); + /** IntWritable specialization */ + public static final SumMessageCombiner<IntWritable> INT = + new SumMessageCombiner<>(IntTypeOps.INSTANCE); + + /** Value type operations */ + private final NumericTypeOps<M> typeOps; + + /** + * Constructor + * @param typeOps Value type operations + */ + public SumMessageCombiner(NumericTypeOps<M> typeOps) { + this.typeOps = typeOps; + } + + @Override + public void combine( + WritableComparable vertexIndex, M originalMessage, M messageToCombine) { + typeOps.plusInto(originalMessage, messageToCombine); + } + + @Override + public M createInitialMessage() { + return typeOps.createZero(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxPairReducer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxPairReducer.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxPairReducer.java new file mode 100644 index 0000000..8527004 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxPairReducer.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.reducers.impl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.reducers.ReduceSameTypeOperation; +import org.apache.giraph.types.ops.TypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.writable.tuple.PairWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + + +/** + * Aggregating PairWritable<L, R>, by taking pair with largest second value. + * + * @param <L> Type of the left value + * @param <R> Type of the right value + */ +public class MaxPairReducer<L extends Writable, R extends WritableComparable> + extends ReduceSameTypeOperation<PairWritable<L, R>> { + + /** Left value TypeOps */ + private TypeOps<L> leftTypeOps; + /** Right value TypeOps */ + private TypeOps<R> rightTypeOps; + + /** Constructor used for deserialization only */ + public MaxPairReducer() { + } + + /** + * Constructor + * @param leftTypeOps Left value TypeOps + * @param rightTypeOps Right value TypeOps + */ + public MaxPairReducer(TypeOps<L> leftTypeOps, TypeOps<R> rightTypeOps) { + this.leftTypeOps = leftTypeOps; + this.rightTypeOps = rightTypeOps; + } + + @Override + public PairWritable<L, R> reduce( + PairWritable<L, R> curValue, PairWritable<L, R> valueToReduce) { + if (valueToReduce.getRight().compareTo(curValue.getRight()) > 0) { + leftTypeOps.set(curValue.getLeft(), valueToReduce.getLeft()); + rightTypeOps.set(curValue.getRight(), valueToReduce.getRight()); + } + return curValue; + } + + @Override + public PairWritable<L, R> createInitialValue() { + return new PairWritable<L, R>( + leftTypeOps.create(), rightTypeOps.create()); + } + + @Override + public void write(DataOutput out) throws IOException { + TypeOpsUtils.writeTypeOps(leftTypeOps, out); + TypeOpsUtils.writeTypeOps(rightTypeOps, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + leftTypeOps = TypeOpsUtils.readTypeOps(in); + rightTypeOps = TypeOpsUtils.readTypeOps(in); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java index 194bb5e..996fee8 100644 --- a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java +++ b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java @@ -144,7 +144,7 @@ public class TestComputationCombinerTypes { IntWritable> { } private static class NoOpMessageCombiner<I extends WritableComparable, - M extends Writable> extends MessageCombiner<I, M> { + M extends Writable> implements MessageCombiner<I, M> { @Override public void combine(I vertexIndex, M originalMessage, M messageToCombine) { } http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java index 8a034c2..d56c0fb 100644 --- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java +++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java @@ -195,7 +195,7 @@ public class TestSwitchClasses { } public static class MinimumMessageCombiner - extends MessageCombiner<IntWritable, + implements MessageCombiner<IntWritable, IntWritable> { @Override public void combine(IntWritable vertexIndex, IntWritable originalMessage, @@ -211,7 +211,7 @@ public class TestSwitchClasses { } public static class SumMessageCombiner - extends MessageCombiner<IntWritable, IntWritable> { + implements MessageCombiner<IntWritable, IntWritable> { @Override public void combine(IntWritable vertexIndex, IntWritable originalMessage, IntWritable messageToCombine) { http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java index 41726a9..ea1a74d 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java +++ b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java @@ -64,7 +64,7 @@ public class TestComputationTypes { * Matches the {@link GeneratedComputationMatch} */ public static class GeneratedVertexMatchMessageCombiner - extends + implements MessageCombiner<LongWritable, FloatWritable> { @Override public void combine(LongWritable vertexIndex, @@ -82,7 +82,7 @@ public class TestComputationTypes { * Mismatches the {@link GeneratedComputationMatch} */ public static class GeneratedVertexMismatchMessageCombiner - extends + implements MessageCombiner<LongWritable, DoubleWritable> { @Override public void combine(LongWritable vertexIndex, http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index ceca219..392b92c 100644 --- a/pom.xml +++ b/pom.xml @@ -270,6 +270,8 @@ under the License. <project.enforcer.skip>false</project.enforcer.skip> <project.enforcer.fail>true</project.enforcer.fail> <project.build.targetJdk>1.7</project.build.targetJdk> + <project.build.javaHome>${env.JAVA_HOME}</project.build.javaHome> + <project.enforcer.minJdk>1.7</project.enforcer.minJdk> <giraph.maven.dependency.plugin.skip>false</giraph.maven.dependency.plugin.skip> <giraph.maven.duplicate.finder.skip>false</giraph.maven.duplicate.finder.skip> <!-- This lets modules skip unit tests. More details: GIRAPH-957 --> @@ -567,7 +569,9 @@ under the License. <configuration> <source>${project.build.targetJdk}</source> <target>${project.build.targetJdk}</target> - </configuration> + <executable>${project.build.javaHome}/bin/javac</executable> + <fork>true</fork> + </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -626,7 +630,7 @@ under the License. <version>${project.maven.version}</version> </requireMavenVersion> <requireJavaVersion> - <version>${project.build.targetJdk}</version> + <version>${project.enforcer.minJdk}</version> </requireJavaVersion> </rules> </configuration> @@ -665,6 +669,9 @@ under the License. <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-javadoc-plugin</artifactId> <version>2.9</version> + <configuration> + <javadocExecutable>${project.build.javaHome}/bin/javadoc</javadocExecutable> + </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -774,6 +781,15 @@ under the License. </execution> </executions> </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <jvm>${project.build.javaHome}/bin/java</jvm> + <forkMode>once</forkMode> + </configuration> + </plugin> </plugins> </pluginManagement> <plugins> @@ -1061,6 +1077,9 @@ under the License. <profile> <id>hadoop_facebook</id> + <modules> + <module>giraph-block-app-8</module> + </modules> <properties> <hadoop.version>0.20.0</hadoop.version> <munge.symbols>HADOOP_NON_SECURE,HADOOP_NON_JOBCONTEXT_IS_INTERFACE,HADOOP_JOB_ID_AVAILABLE,STATIC_SASL_SYMBOL</munge.symbols> @@ -1719,6 +1738,11 @@ under the License. </dependency> <dependency> <groupId>org.apache.giraph</groupId> + <artifactId>giraph-block-app-8</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.giraph</groupId> <artifactId>giraph-examples</artifactId> <version>${project.version}</version> </dependency>
