Repository: giraph Updated Branches: refs/heads/trunk ca36f1d49 -> 241505905
Add PrepareGraphPieces.isSymmetricBlock to check for a symmetric graph Summary: PrepareGraphPieces.isSymmetricBlock is a reusable factory function for creating blocks that check if a graph is symmetric by XOR reducing a preditable hash of the edges pairs (V1, V2) Test Plan: Unit Tests for all changed files, testing on demo graphs. Will run a full test job. Reviewers: spupyrev, dionysis.logothetis, ikabiljo, maja.kabiljo Reviewed By: maja.kabiljo Subscribers: maja.kabiljo Differential Revision: https://reviews.facebook.net/D54411 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/24150590 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/24150590 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/24150590 Branch: refs/heads/trunk Commit: 24150590597c89aa7e322ed56f4c916b666b3b09 Parents: ca36f1d Author: Yuri Schimke <[email protected]> Authored: Thu Feb 25 10:42:21 2016 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Thu Feb 25 10:42:21 2016 -0800 ---------------------------------------------------------------------- .../prepare_graph/PrepareGraphPieces.java | 88 +++++++++++++- .../prepare_graph/TestSymmetryCheck.java | 117 +++++++++++++++++++ .../giraph/reducers/impl/LongXorReduce.java | 55 +++++++++ .../utils/hashing/LongWritableFunnel.java | 38 ++++++ .../giraph/utils/hashing/package-info.java | 22 ++++ .../giraph/reducers/impl/TestLongXorReduce.java | 41 +++++++ .../utils/hashing/TestLongWritableFunnel.java | 36 ++++++ 7 files changed, 396 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/24150590/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java index b4d40dc..41c315a 100644 --- a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java @@ -17,8 +17,11 @@ */ package org.apache.giraph.block_app.library.prepare_graph; +import com.google.common.hash.Funnel; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; import java.util.Iterator; - import org.apache.giraph.block_app.framework.api.BlockMasterApi; import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; @@ -42,8 +45,11 @@ import org.apache.giraph.function.ObjectTransfer; import org.apache.giraph.function.primitive.Int2ObjFunction; import org.apache.giraph.function.primitive.Obj2DoubleFunction; import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.graph.Vertex; import org.apache.giraph.object.MultiSizedReusable; import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.giraph.reducers.impl.LongXorReduce; import org.apache.giraph.types.NoMessage; import org.apache.giraph.types.ops.NumericTypeOps; import org.apache.giraph.types.ops.PrimitiveIdTypeOps; @@ -397,4 +403,84 @@ public class PrepareGraphPieces { }), sumEdgeWeights); } + + /** + * isSymmetricBlock using a sensible default HashFunction + * + * @see Hashing#murmur3_128() + * @see #isSymmetricBlock(Funnel, Consumer, HashFunction) + */ + public static <I extends WritableComparable> Block isSymmetricBlock( + Funnel<I> idHasher, + Consumer<Boolean> consumer) { + return isSymmetricBlock(idHasher, consumer, Hashing.murmur3_128()); + } + + /** + * Checks whether a graph is symmetric and returns the result to a consumer. + * + * @param idHasher Allows Vertex ids to submit themselves to hashing + * without artificially converting to an intermediate + * type e.g. Long or String. + * @param consumer the return store for whether the graph is symmetric + * @param <I> the type of Vertex id + * @return block that checks for symmetric graphs + */ + public static <I extends WritableComparable> Block isSymmetricBlock( + Funnel<I> idHasher, + Consumer<Boolean> consumer, + HashFunction hashFunction) { + + SupplierFromVertex<I, Writable, Writable, LongWritable> + s = (vertex) -> new LongWritable( + xorEdges(vertex, idHasher, hashFunction)); + + Consumer<LongWritable> longConsumer = + (xorValue) -> consumer.apply(xorValue.get() == 0L); + + return Pieces.reduce( + "HashEdges", + LongXorReduce.INSTANCE, + s, + longConsumer + ); + } + + /** + * Predictably XOR all edges for a single vertex. The value to be + * XORed is (smaller(v1|v2), larger(v1|v2)) and skipping self-loops + * since we want to detect asymmetric graphs. + * + * Uses a HashFunction to get high collision prevention and bit dispersion. + * + * @see HashFunction + */ + private static <I extends WritableComparable> long xorEdges( + Vertex<I, Writable, Writable> vertex, + Funnel<I> idHasher, HashFunction hashFunction) { + long result = 0L; + + for (Edge<I, Writable> e : vertex.getEdges()) { + Hasher h = hashFunction.newHasher(); + + I thisVertexId = vertex.getId(); + I thatVertexId = e.getTargetVertexId(); + + int comparison = thisVertexId.compareTo(thatVertexId); + + if (comparison != 0) { + if (comparison < 0) { + idHasher.funnel(thisVertexId, h); + idHasher.funnel(thatVertexId, h); + } else { + idHasher.funnel(thatVertexId, h); + idHasher.funnel(thisVertexId, h); + } + + result ^= h.hash().asLong(); + } + } + + return result; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/24150590/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestSymmetryCheck.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestSymmetryCheck.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestSymmetryCheck.java new file mode 100644 index 0000000..ee2e983 --- /dev/null +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/library/prepare_graph/TestSymmetryCheck.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.block_app.library.prepare_graph; + +import org.apache.giraph.block_app.framework.api.local.LocalBlockRunner; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.test_setup.NumericTestGraph; +import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit; +import org.apache.giraph.block_app.test_setup.graphs.SmallDirectedTreeGraphInit; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.function.ObjectHolder; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.giraph.utils.hashing.LongWritableFunnel; +import org.apache.hadoop.io.LongWritable; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestSymmetryCheck { + private long IGNORED = -1L; + + private NumericTestGraph<LongWritable, LongWritable, LongWritable> graph; + private ObjectHolder<Boolean> holder = new ObjectHolder<>(); + private Block isBlock = PrepareGraphPieces.isSymmetricBlock(LongWritableFunnel.INSTANCE, holder); + + @Before + public void initConf() { + GiraphConfiguration conf = new GiraphConfiguration(); + GiraphConstants.VERTEX_ID_CLASS.set(conf, LongWritable.class); + GiraphConstants.VERTEX_VALUE_CLASS.set(conf, LongWritable.class); + GiraphConstants.EDGE_VALUE_CLASS.set(conf, LongWritable.class); + + graph = new NumericTestGraph<>(conf); + } + + @Test + public void testSimpleLoop() throws Exception { + graph.addVertex(0l, IGNORED, 0l); + + LocalBlockRunner.runBlock(graph.getTestGraph(), isBlock, new Object()); + + assertTrue(holder.get()); + } + + @Test + public void testSimpleAsymmetric() throws Exception { + graph.addVertex(0l, IGNORED, IGNORED, 1l); + graph.addVertex(1l, IGNORED); + + LocalBlockRunner.runBlock(graph.getTestGraph(), isBlock, new Object()); + + assertFalse(holder.get()); + } + + @Test + public void testSimpleSymmetric() throws Exception { + graph.addVertex(0l, IGNORED, IGNORED, 1l); + graph.addVertex(1l, IGNORED, IGNORED, 0l); + + LocalBlockRunner.runBlock(graph.getTestGraph(), isBlock, new Object()); + + assertTrue(holder.get()); + } + + @Test + public void testSmall1Graph() throws Exception { + Small1GraphInit i = new Small1GraphInit<LongWritable, LongWritable, LongWritable>(); + i.modifyGraph(graph); + + LocalBlockRunner.runBlock(graph.getTestGraph(), isBlock, new Object()); + + assertTrue(holder.get()); + } + + @Test + public void testSmallDirectedTreeAsymmetric() throws Exception { + SmallDirectedTreeGraphInit i = new SmallDirectedTreeGraphInit<LongWritable, LongWritable, LongWritable>(); + i.modifyGraph(graph); + + LocalBlockRunner.runBlock(graph.getTestGraph(), isBlock, new Object()); + + assertFalse(holder.get()); + } + + @Test + public void testSmallDirectedTreeSymmetric() throws Exception { + SmallDirectedTreeGraphInit i = new SmallDirectedTreeGraphInit<LongWritable, LongWritable, LongWritable>(); + i.modifyGraph(graph); + + Block block = + new SequenceBlock(PrepareGraphPieces.makeSymmetricWeighted(LongTypeOps.INSTANCE, LongTypeOps.INSTANCE), + isBlock); + LocalBlockRunner.runBlock(graph.getTestGraph(), block, new Object()); + + assertTrue(holder.get()); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/24150590/giraph-core/src/main/java/org/apache/giraph/reducers/impl/LongXorReduce.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/LongXorReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/LongXorReduce.java new file mode 100644 index 0000000..741e278 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/LongXorReduce.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.reducers.impl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.giraph.reducers.ReduceSameTypeOperation; +import org.apache.hadoop.io.LongWritable; + +/** + * ReduceOperation that XORs (^) values together. + */ +public class LongXorReduce extends ReduceSameTypeOperation<LongWritable> { + /** + * Long XOR, equivalent to l1 ^ l2 + */ + public static final LongXorReduce INSTANCE = new LongXorReduce(); + + /** Constructor used for deserialization only */ + public LongXorReduce() { + } + + @Override public LongWritable createInitialValue() { + return new LongWritable(0L); + } + + @Override public LongWritable reduce(LongWritable curValue, + LongWritable valueToReduce) { + curValue.set(curValue.get() ^ valueToReduce.get()); + return curValue; + } + + @Override public void write(DataOutput out) throws IOException { + } + + @Override public void readFields(DataInput in) throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/24150590/giraph-core/src/main/java/org/apache/giraph/utils/hashing/LongWritableFunnel.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/hashing/LongWritableFunnel.java b/giraph-core/src/main/java/org/apache/giraph/utils/hashing/LongWritableFunnel.java new file mode 100644 index 0000000..59ffcaa --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/hashing/LongWritableFunnel.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils.hashing; + +import com.google.common.hash.Funnel; +import com.google.common.hash.PrimitiveSink; +import org.apache.hadoop.io.LongWritable; + +/** + * Hashing strategy for LongWritable. Implemented via an enum per advice + * in Funnel. + * + * @see Funnel + */ +public enum LongWritableFunnel implements Funnel<LongWritable> { + /** singleton */ + INSTANCE; + + @Override public void funnel(LongWritable w, PrimitiveSink into) { + into.putLong(w.get()); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/24150590/giraph-core/src/main/java/org/apache/giraph/utils/hashing/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/hashing/package-info.java b/giraph-core/src/main/java/org/apache/giraph/utils/hashing/package-info.java new file mode 100644 index 0000000..e2c35c0 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/hashing/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of utility classes supporting strong hash functions + * (currently from Guava). + */ +package org.apache.giraph.utils.hashing; http://git-wip-us.apache.org/repos/asf/giraph/blob/24150590/giraph-core/src/test/java/org/apache/giraph/reducers/impl/TestLongXorReduce.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/reducers/impl/TestLongXorReduce.java b/giraph-core/src/test/java/org/apache/giraph/reducers/impl/TestLongXorReduce.java new file mode 100644 index 0000000..2e9e717 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/reducers/impl/TestLongXorReduce.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.reducers.impl; + +import org.apache.hadoop.io.LongWritable; +import org.junit.Assert; +import org.junit.Test; + +public class TestLongXorReduce { + private LongXorReduce xor = LongXorReduce.INSTANCE; + + @Test + public void testZero() { + Assert.assertEquals(0b0L, xor.createInitialValue().get()); + } + + @Test + public void testXor() { + LongWritable curWritable = new LongWritable(0b000L); + + Assert.assertEquals(0b001L, xor.reduce(curWritable, new LongWritable(0b001L)).get()); + Assert.assertEquals(0b011L, xor.reduce(curWritable, new LongWritable(0b010L)).get()); + Assert.assertEquals(0b100L, xor.reduce(curWritable, new LongWritable(0b111L)).get()); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/24150590/giraph-core/src/test/java/org/apache/giraph/utils/hashing/TestLongWritableFunnel.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/hashing/TestLongWritableFunnel.java b/giraph-core/src/test/java/org/apache/giraph/utils/hashing/TestLongWritableFunnel.java new file mode 100644 index 0000000..f528bff --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/utils/hashing/TestLongWritableFunnel.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.utils.hashing; + +import com.google.common.hash.PrimitiveSink; +import org.apache.hadoop.io.LongWritable; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestLongWritableFunnel { + @Test + public void testWritesALong() { + PrimitiveSink sink = Mockito.mock(PrimitiveSink.class); + + LongWritableFunnel.INSTANCE.funnel(new LongWritable(10L), sink); + + Mockito.verify(sink).putLong(10L); + Mockito.verifyNoMoreInteractions(sink); + } +}
