Repository: giraph Updated Branches: refs/heads/trunk 1c7552b1a -> 0b1962253
GIRAPH-1060: Add combiner to connected components Summary: Connected components should use combiner to make it more efficient and require less memory. A few additional cleanups while at it. Test Plan: mvn clean verify Differential Revision: https://reviews.facebook.net/D57879 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/0b196225 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/0b196225 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/0b196225 Branch: refs/heads/trunk Commit: 0b1962253c8cd3e2194b1b0015c4352277c1ce0a Parents: 1c7552b Author: Maja Kabiljo <[email protected]> Authored: Mon May 9 11:07:48 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Mon May 9 11:22:35 2016 -0700 ---------------------------------------------------------------------- .../UndirectedConnectedComponents.java | 60 ++++++++++---------- .../giraph/combiner/MinMessageCombiner.java | 55 ++++++++++++++++++ .../apache/giraph/types/ops/ByteTypeOps.java | 5 ++ .../apache/giraph/types/ops/DoubleTypeOps.java | 5 ++ .../apache/giraph/types/ops/FloatTypeOps.java | 5 ++ .../org/apache/giraph/types/ops/IntTypeOps.java | 5 ++ .../apache/giraph/types/ops/LongTypeOps.java | 5 ++ .../apache/giraph/types/ops/NumericTypeOps.java | 10 ++++ giraph-core/templates/TypeTypeOps.java | 5 ++ 9 files changed, 126 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java index 2610436..fb04fa8 100644 --- a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java @@ -34,6 +34,7 @@ import org.apache.giraph.block_app.library.SendMessageChain; import org.apache.giraph.block_app.library.VertexSuppliers; import org.apache.giraph.block_app.reducers.map.BasicMapReduce; import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.combiner.MinMessageCombiner; import org.apache.giraph.combiner.SumMessageCombiner; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.Edge; @@ -47,6 +48,7 @@ import org.apache.giraph.reducers.impl.MaxPairReducer; import org.apache.giraph.reducers.impl.SumReduce; import org.apache.giraph.types.NoMessage; import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.giraph.types.ops.NumericTypeOps; import org.apache.giraph.types.ops.TypeOps; import org.apache.giraph.writable.tuple.LongLongWritable; import org.apache.giraph.writable.tuple.PairWritable; @@ -55,7 +57,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.log4j.Logger; -import com.google.common.base.Function; import com.google.common.collect.Iterators; /** @@ -98,6 +99,7 @@ public class UndirectedConnectedComponents { <I extends WritableComparable, V extends Writable> extends Piece<I, V, Writable, I, Object> { private final TypeOps<I> idTypeOps; + private final MinMessageCombiner<I, I> minMessageCombiner; private final Supplier<Boolean> vertexToPropagate; private final Consumer<Boolean> vertexUpdatedComponent; private final Consumer<Boolean> converged; @@ -118,6 +120,8 @@ public class UndirectedConnectedComponents { SupplierFromVertex<I, V, Writable, ? extends Iterable<? extends Edge<I, ?>>> edgeSupplier) { this.idTypeOps = idTypeOps; + this.minMessageCombiner = idTypeOps instanceof NumericTypeOps ? + new MinMessageCombiner<>((NumericTypeOps<I>) idTypeOps) : null; this.vertexToPropagate = vertexToPropagate; this.vertexUpdatedComponent = vertexUpdatedComponent; this.converged = converged; @@ -137,22 +141,14 @@ public class UndirectedConnectedComponents { final BlockWorkerSendApi<I, V, Writable, I> workerApi, Object executionStage) { final LongWritable one = new LongWritable(1); - return new InnerVertexSender() { - @Override - public void vertexSend(Vertex<I, V, Writable> vertex) { - if (vertexToPropagate.get()) { - workerApi.sendMessageToMultipleEdges( - Iterators.transform( - edgeSupplier.get(vertex).iterator(), - new Function<Edge<I, ?>, I>() { - @Override - public I apply(Edge<I, ?> edge) { - return edge.getTargetVertexId(); - } - }), - getComponent.get(vertex)); - propagatedAggregator.reduce(one); - } + return vertex -> { + if (vertexToPropagate.get()) { + workerApi.sendMessageToMultipleEdges( + Iterators.transform( + edgeSupplier.get(vertex).iterator(), + edge -> edge.getTargetVertexId()), + getComponent.get(vertex)); + propagatedAggregator.reduce(one); } }; } @@ -169,26 +165,21 @@ public class UndirectedConnectedComponents { public VertexReceiver<I, V, Writable, I> getVertexReceiver( BlockWorkerReceiveApi<I> workerApi, Object executionStage) { return new InnerVertexReceiver() { - private final I received = idTypeOps.create(); + private final I newComponent = idTypeOps.create(); @Override public void vertexReceive(Vertex<I, V, Writable> vertex, Iterable<I> messages) { - boolean first = true; + idTypeOps.set(newComponent, getComponent.get(vertex)); for (I value : messages) { - if (first) { - idTypeOps.set(received, value); - first = false; - } else { - if (received.compareTo(value) > 0) { - idTypeOps.set(received, value); - } + if (newComponent.compareTo(value) > 0) { + idTypeOps.set(newComponent, value); } } I cur = getComponent.get(vertex); - if (!first && cur.compareTo(received) > 0) { - setComponent.apply(vertex, received); + if (cur.compareTo(newComponent) > 0) { + setComponent.apply(vertex, newComponent); vertexUpdatedComponent.apply(true); } else { vertexUpdatedComponent.apply(false); @@ -198,8 +189,19 @@ public class UndirectedConnectedComponents { } @Override + public MessageCombiner<? super I, I> getMessageCombiner( + ImmutableClassesGiraphConfiguration conf) { + return minMessageCombiner; + } + + @Override public Class<I> getMessageClass() { - return idTypeOps.getTypeClass(); + return minMessageCombiner == null ? idTypeOps.getTypeClass() : null; + } + + @Override + protected boolean allowOneMessageToManyIdsEncoding() { + return true; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/combiner/MinMessageCombiner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/combiner/MinMessageCombiner.java b/giraph-core/src/main/java/org/apache/giraph/combiner/MinMessageCombiner.java new file mode 100644 index 0000000..3454366 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/combiner/MinMessageCombiner.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.combiner; + +import org.apache.giraph.types.ops.NumericTypeOps; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Keeps only the message with minimum value. + * + * @param <I> Vertex id + * @param <M> Message + */ +public class MinMessageCombiner<I extends WritableComparable, + M extends Writable> implements MessageCombiner<I, M> { + /** Numeric type ops for the value to combine */ + private final NumericTypeOps<M> numTypeOps; + + /** + * Combiner + * + * @param numTypeOps Type ops to use + */ + public MinMessageCombiner(NumericTypeOps<M> numTypeOps) { + this.numTypeOps = numTypeOps; + } + + @Override + public void combine(I vertexId, M originalMessage, M messageToCombine) { + if (numTypeOps.compare(originalMessage, messageToCombine) > 0) { + numTypeOps.set(originalMessage, messageToCombine); + } + } + + @Override + public M createInitialMessage() { + return this.numTypeOps.createMaxPositiveValue(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java index 6499c2b..71a30da 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/ByteTypeOps.java @@ -101,4 +101,9 @@ public enum ByteTypeOps implements public void negate(ByteWritable value) { value.set((byte) (-value.get())); } + + @Override + public int compare(ByteWritable value1, ByteWritable value2) { + return Byte.compare(value1.get(), value2.get()); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java index f549208..89e50b1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java @@ -101,4 +101,9 @@ public enum DoubleTypeOps implements public void negate(DoubleWritable value) { value.set(-value.get()); } + + @Override + public int compare(DoubleWritable value1, DoubleWritable value2) { + return Double.compare(value1.get(), value2.get()); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java index cf970c0..64279ce 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java @@ -101,4 +101,9 @@ public enum FloatTypeOps implements public void negate(FloatWritable value) { value.set(-value.get()); } + + @Override + public int compare(FloatWritable value1, FloatWritable value2) { + return Float.compare(value1.get(), value2.get()); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java index 943637c..a03cf94 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java @@ -126,4 +126,9 @@ public enum IntTypeOps implements public void negate(IntWritable value) { value.set(-value.get()); } + + @Override + public int compare(IntWritable value1, IntWritable value2) { + return Integer.compare(value1.get(), value2.get()); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java index 2e3c8e7..916d166 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java @@ -126,4 +126,9 @@ public enum LongTypeOps implements public void negate(LongWritable value) { value.set(-value.get()); } + + @Override + public int compare(LongWritable value1, LongWritable value2) { + return Long.compare(value1.get(), value2.get()); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java index 6420a1b..a9786a9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java @@ -71,4 +71,14 @@ public interface NumericTypeOps<T> extends TypeOps<T> { * @param value Value to negate */ void negate(T value); + + /** + * Compare two values + * + * @param value1 First value + * @param value2 Second value + * @return 0 if values are equal, negative value if value1<value2 and + * positive value if value1>value2 + */ + int compare(T value1, T value2); } http://git-wip-us.apache.org/repos/asf/giraph/blob/0b196225/giraph-core/templates/TypeTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/templates/TypeTypeOps.java b/giraph-core/templates/TypeTypeOps.java index 8d92e8a..bba752d 100644 --- a/giraph-core/templates/TypeTypeOps.java +++ b/giraph-core/templates/TypeTypeOps.java @@ -143,5 +143,10 @@ public enum ${type.camel}TypeOps implements public void negate(${type.camel}Writable value) { value.set(<@cast_if_needed_e expr="-value.get()"/>); } + + @Override + public int compare(${type.camel}Writable value1, ${type.camel}Writable value2) { + return ${type.boxed}.compare(value1.get(), value2.get()); + } </#if> }
