http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java new file mode 100644 index 0000000..9f821b4 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.reducers; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.Writable; + +/** + * Object responsible for performing reducing operation. + * Simple wrapper of ReduceOperation object and current value holding + * partially reduced result. + * + * @param <S> Single value type, objects passed on workers + * @param <R> Reduced value type + */ +public class Reducer<S, R extends Writable> implements Writable { + /** Reduce operations */ + private ReduceOperation<S, R> reduceOp; + /** Current (partially) reduced value*/ + private R currentValue; + + /** + * Constructor + */ + public Reducer() { + } + /** + * Constructor + * @param reduceOp Reduce operations + */ + public Reducer(ReduceOperation<S, R> reduceOp) { + this.reduceOp = reduceOp; + this.currentValue = reduceOp.createInitialValue(); + } + /** + * Constructor + * @param reduceOp Reduce operations + * @param currentValue current reduced value + */ + public Reducer(ReduceOperation<S, R> reduceOp, R currentValue) { + this.reduceOp = reduceOp; + this.currentValue = currentValue; + } + + /** + * Reduce given value into current reduced value. + * @param valueToReduce Single value to reduce + */ + public void reduceSingle(S valueToReduce) { + reduceOp.reduceSingle(currentValue, valueToReduce); + } + /** + * Reduce given partially reduced value into current reduced value. + * @param valueToReduce Partial value to reduce + */ + public void reducePartial(R valueToReduce) { + reduceOp.reducePartial(currentValue, valueToReduce); + } + /** + * Return new initial reduced value. + * @return New initial reduced value + */ + public R createInitialValue() { + return reduceOp.createInitialValue(); + } + + public ReduceOperation<S, R> getReduceOp() { + return reduceOp; + } + + public R getCurrentValue() { + return currentValue; + } + + public void setCurrentValue(R currentValue) { + this.currentValue = currentValue; + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeWritableObject(reduceOp, out); + currentValue.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + reduceOp = WritableUtils.readWritableObject(in, null); + currentValue = reduceOp.createInitialValue(); + currentValue.readFields(in); + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java b/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java new file mode 100644 index 0000000..eeefdeb --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of Giraph reducers. + */ +package org.apache.giraph.reducers; http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java index 5e046cc..3d654b4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java @@ -17,8 +17,12 @@ */ package org.apache.giraph.utils; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; +import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS; +import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS; + +import java.io.IOException; +import java.util.List; + import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -29,6 +33,7 @@ import org.apache.giraph.Algorithm; import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConfigurationSettable; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.GiraphTypes; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; @@ -57,11 +62,8 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.zookeeper.ZooKeeper; -import java.io.IOException; -import java.util.List; - -import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS; -import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; /** * Translate command line args into Configuration Key-Value pairs. @@ -147,6 +149,10 @@ public final class ConfigurationUtils { ImmutableClassesGiraphConfiguration configuration) { if (configuration != null) { configuration.configureIfPossible(object); + } else if (object instanceof GiraphConfigurationSettable) { + throw new IllegalArgumentException( + "Trying to configure configurable object without value, " + + object.getClass()); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java index 3c5cbad..923d369 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java @@ -739,4 +739,38 @@ public class WritableUtils { } } + /** + * Create a copy of Writable object, by serializing and deserializing it. + * + * @param reusableOut Reusable output stream to serialize into + * @param reusableIn Reusable input stream to deserialize out of + * @param original Original value of which to make a copy + * @param <T> Type of the object + * @return Copy of the original value + */ + public static <T extends Writable> T createCopy( + UnsafeByteArrayOutputStream reusableOut, + UnsafeReusableByteArrayInput reusableIn, T original) { + T copy = (T) createWritable(original.getClass(), null); + + try { + reusableOut.reset(); + original.write(reusableOut); + reusableIn.initialize( + reusableOut.getByteArray(), 0, reusableOut.getPos()); + copy.readFields(reusableIn); + + if (reusableIn.available() != 0) { + throw new RuntimeException("Serialization of " + + original.getClass() + " encountered issues, " + + reusableIn.available() + " bytes left to be read"); + } + } catch (IOException e) { + throw new IllegalStateException( + "IOException occurred while trying to create a copy " + + original.getClass(), e); + } + return copy; + } + } http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index 120678f..f61e817 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -18,6 +18,27 @@ package org.apache.giraph.worker; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; + +import net.iharder.Base64; + import org.apache.giraph.bsp.ApplicationState; import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceWorker; @@ -66,10 +87,10 @@ import org.apache.giraph.partition.PartitionStore; import org.apache.giraph.partition.WorkerGraphPartitioner; import org.apache.giraph.utils.CallableFactory; import org.apache.giraph.utils.JMapHistoDumper; -import org.apache.giraph.utils.ReactiveJMapHistoDumper; import org.apache.giraph.utils.LoggerUtils; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.ProgressableUtils; +import org.apache.giraph.utils.ReactiveJMapHistoDumper; import org.apache.giraph.utils.WritableUtils; import org.apache.giraph.zk.BspEvent; import org.apache.giraph.zk.PredicateLock; @@ -96,26 +117,6 @@ import org.json.JSONObject; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import net.iharder.Base64; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; /** * ZooKeeper-based implementation of {@link CentralizedServiceWorker}. @@ -162,10 +163,10 @@ public class BspServiceWorker<I extends WritableComparable, private final WorkerContext workerContext; /** Handler for aggregators */ - private final WorkerAggregatorHandler aggregatorHandler; + private final WorkerAggregatorHandler globalCommHandler; /** Superstep output */ - private SuperstepOutput<I, V, E> superstepOutput; + private final SuperstepOutput<I, V, E> superstepOutput; /** array of observers to call back to */ private final WorkerObserver[] observers; @@ -212,10 +213,10 @@ public class BspServiceWorker<I extends WritableComparable, workerAggregatorRequestProcessor = new NettyWorkerAggregatorRequestProcessor(getContext(), conf, this); - aggregatorHandler = new WorkerAggregatorHandler(this, conf, context); + globalCommHandler = new WorkerAggregatorHandler(this, conf, context); workerContext = conf.createWorkerContext(); - workerContext.setWorkerAggregatorUsage(aggregatorHandler); + workerContext.setWorkerGlobalCommUsage(globalCommHandler); superstepOutput = conf.createSuperstepOutput(context); @@ -584,7 +585,7 @@ public class BspServiceWorker<I extends WritableComparable, // Initialize aggregator at worker side during setup. // Do this just before vertex and edge loading. - aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor); + globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor); VertexEdgeCount vertexEdgeCount; long entriesLoaded; @@ -895,7 +896,7 @@ public class BspServiceWorker<I extends WritableComparable, postSuperstepCallbacks(); } - aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor); + globalCommHandler.finishSuperstep(workerAggregatorRequestProcessor); MessageStore<I, Writable> incomingMessageStore = getServerData().getIncomingMessageStore(); @@ -1920,15 +1921,16 @@ else[HADOOP_NON_SECURE]*/ return workerServer.getServerData(); } + @Override public WorkerAggregatorHandler getAggregatorHandler() { - return aggregatorHandler; + return globalCommHandler; } @Override public void prepareSuperstep() { if (getSuperstep() != INPUT_SUPERSTEP) { - aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor); + globalCommHandler.prepareSuperstep(workerAggregatorRequestProcessor); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java index 35ad94b..89f74b3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/EdgeInputSplitsCallable.java @@ -18,6 +18,8 @@ package org.apache.giraph.worker; +import java.io.IOException; + import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.Edge; import org.apache.giraph.graph.VertexEdgeCount; @@ -37,8 +39,6 @@ import org.apache.log4j.Logger; import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Meter; -import java.io.IOException; - /** * Load as many edge input splits as possible. * Every thread will has its own instance of WorkerClientRequestProcessor @@ -62,7 +62,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, EdgeInputSplitsCallable.class); /** Aggregator handler */ - private final WorkerThreadAggregatorUsage aggregatorUsage; + private final WorkerThreadGlobalCommUsage globalCommUsage; /** Bsp service worker (only use thread-safe methods) */ private final BspServiceWorker<I, V, E> bspServiceWorker; /** Edge input format */ @@ -105,7 +105,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, this.bspServiceWorker = bspServiceWorker; inputSplitMaxEdges = configuration.getInputSplitMaxEdges(); // Initialize aggregator usage. - this.aggregatorUsage = bspServiceWorker.getAggregatorHandler() + this.globalCommUsage = bspServiceWorker.getAggregatorHandler() .newThreadAggregatorUsage(); edgeInputFilter = configuration.getEdgeInputFilter(); canEmbedInIds = bspServiceWorker @@ -147,7 +147,7 @@ public class EdgeInputSplitsCallable<I extends WritableComparable, edgeReader.initialize(inputSplit, context); // Set aggregator usage to edge reader - edgeReader.setWorkerAggregatorUse(aggregatorUsage); + edgeReader.setWorkerGlobalCommUsage(globalCommUsage); long inputSplitEdgesLoaded = 0; long inputSplitEdgesFiltered = 0; http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java index a2279a9..f6dca25 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/MappingInputSplitsCallable.java @@ -18,21 +18,21 @@ package org.apache.giraph.worker; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.io.GiraphInputFormat; import org.apache.giraph.io.MappingInputFormat; import org.apache.giraph.io.MappingReader; -import org.apache.giraph.mapping.MappingStore; import org.apache.giraph.mapping.MappingEntry; +import org.apache.giraph.mapping.MappingStore; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Mapper; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - /** * Load as many mapping input splits as possible. * Every thread will has its own instance of WorkerClientRequestProcessor @@ -89,11 +89,11 @@ public class MappingInputSplitsCallable<I extends WritableComparable, mappingInputFormat.createMappingReader(inputSplit, context); mappingReader.setConf(configuration); - WorkerThreadAggregatorUsage aggregatorUsage = this.bspServiceWorker + WorkerThreadGlobalCommUsage globalCommUsage = this.bspServiceWorker .getAggregatorHandler().newThreadAggregatorUsage(); mappingReader.initialize(inputSplit, context); - mappingReader.setWorkerAggregatorUse(aggregatorUsage); + mappingReader.setWorkerGlobalCommUsage(globalCommUsage); int entriesLoaded = 0; MappingStore<I, B> mappingStore = http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java index 4c85765..00a2781 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/VertexInputSplitsCallable.java @@ -18,6 +18,8 @@ package org.apache.giraph.worker; +import java.io.IOException; + import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.OutEdges; @@ -42,8 +44,6 @@ import org.apache.log4j.Logger; import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Meter; -import java.io.IOException; - /** * Load as many vertex input splits as possible. * Every thread will has its own instance of WorkerClientRequestProcessor @@ -79,7 +79,7 @@ public class VertexInputSplitsCallable<I extends WritableComparable, * Whether the chosen {@link OutEdges} implementation allows for Edge * reuse. */ - private boolean reuseEdgeObjects; + private final boolean reuseEdgeObjects; /** Used to translate Edges during vertex input phase based on localData */ private final TranslateEdge<I, E> translateEdge; @@ -152,13 +152,13 @@ public class VertexInputSplitsCallable<I extends WritableComparable, vertexInputFormat.createVertexReader(inputSplit, context); vertexReader.setConf(configuration); - WorkerThreadAggregatorUsage aggregatorUsage = + WorkerThreadGlobalCommUsage globalCommUsage = this.bspServiceWorker .getAggregatorHandler().newThreadAggregatorUsage(); vertexReader.initialize(inputSplit, context); // Set aggregator usage to vertex reader - vertexReader.setWorkerAggregatorUse(aggregatorUsage); + vertexReader.setWorkerGlobalCommUsage(globalCommUsage); long inputSplitVerticesLoaded = 0; long inputSplitVerticesFiltered = 0; http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java new file mode 100644 index 0000000..5238a07 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.worker; + +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Class for delegating WorkerAggregatorUsage and + * WorkerGlobalCommUsage methods to corresponding interface. + * + * @param <I> Vertex id + * @param <V> Vertex data + * @param <E> Edge data + */ +public abstract class WorkerAggregatorDelegator<I extends WritableComparable, + V extends Writable, E extends Writable> + extends DefaultImmutableClassesGiraphConfigurable<I, V, E> + implements WorkerAggregatorUsage, WorkerGlobalCommUsage { + + /** Worker aggregator usage */ + private WorkerGlobalCommUsage workerGlobalCommUsage; + + /** + * Set worker global communication usage + * + * @param workerGlobalCommUsage Worker global communication usage + */ + public void setWorkerGlobalCommUsage( + WorkerGlobalCommUsage workerGlobalCommUsage) { + this.workerGlobalCommUsage = workerGlobalCommUsage; + } + + @Override + public final void reduce(String name, Object value) { + workerGlobalCommUsage.reduce(name, value); + } + + @Override + public final <B extends Writable> B getBroadcast(String name) { + return workerGlobalCommUsage.getBroadcast(name); + } + + @Override + public final <A extends Writable> void aggregate(String name, A value) { + reduce(name, value); + } + + @Override + public final <A extends Writable> A getAggregatedValue(String name) { + return this.<A>getBroadcast(name); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java index 45ca665..05a13a7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java @@ -15,22 +15,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.giraph.worker; import java.io.IOException; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; -import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream; +import org.apache.giraph.comm.GlobalCommType; import org.apache.giraph.comm.aggregators.AggregatorUtils; import org.apache.giraph.comm.aggregators.AllAggregatorServerData; +import org.apache.giraph.comm.aggregators.GlobalCommValueOutputStream; import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData; import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.utils.Factory; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.reducers.Reducer; +import org.apache.giraph.utils.UnsafeByteArrayOutputStream; +import org.apache.giraph.utils.UnsafeReusableByteArrayInput; +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; @@ -38,35 +42,18 @@ import org.apache.log4j.Logger; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -/** - * Handler for aggregators on worker. Provides the aggregated values and - * performs aggregations from user vertex code (thread-safe). Also has - * methods for all superstep coordination related to aggregators. - * - * At the beginning of any superstep any worker calls prepareSuperstep(), - * which blocks until the final aggregates from the previous superstep have - * been delivered to the worker. - * Next, during the superstep worker can call aggregate() and - * getAggregatedValue() (both methods are thread safe) the former - * computes partial aggregates for this superstep from the worker, - * the latter returns (read-only) final aggregates from the previous superstep. - * Finally, at the end of the superstep, the worker calls finishSuperstep(), - * which propagates non-owned partial aggregates to the owner workers, - * and sends the final aggregate from the owner worker to the master. - */ -public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { +/** Handler for reduce/broadcast on the workers */ +public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage { /** Class logger */ private static final Logger LOG = Logger.getLogger(WorkerAggregatorHandler.class); - /** Map of values from previous superstep */ - private final Map<String, Writable> previousAggregatedValueMap = + /** Map of broadcasted values */ + private final Map<String, Writable> broadcastedMap = Maps.newHashMap(); - /** Map of aggregator factories for current superstep */ - private final Map<String, Factory<Aggregator<Writable>>> - currentAggregatorFactoryMap = Maps.newHashMap(); - /** Map of aggregators for current superstep */ - private final Map<String, Aggregator<Writable>> currentAggregatorMap = + /** Map of reducers currently being reduced */ + private final Map<String, Reducer<Object, Writable>> reducerMap = Maps.newHashMap(); + /** Service worker */ private final CentralizedServiceWorker<?, ?, ?> serviceWorker; /** Progressable for reporting progress */ @@ -96,29 +83,48 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { } @Override - public <A extends Writable> void aggregate(String name, A value) { - Aggregator<Writable> aggregator = currentAggregatorMap.get(name); - if (aggregator != null) { + public <B extends Writable> B getBroadcast(String name) { + B value = (B) broadcastedMap.get(name); + if (value == null) { + LOG.warn("getBroadcast: " + + AggregatorUtils.getUnregisteredAggregatorMessage(name, + broadcastedMap.size() != 0, conf)); + } + return value; + } + + @Override + public void reduce(String name, Object value) { + Reducer<Object, Writable> reducer = reducerMap.get(name); + if (reducer != null) { progressable.progress(); - synchronized (aggregator) { - aggregator.aggregate(value); + synchronized (reducer) { + reducer.reduceSingle(value); } } else { - throw new IllegalStateException("aggregate: " + + throw new IllegalStateException("reduce: " + AggregatorUtils.getUnregisteredAggregatorMessage(name, - currentAggregatorMap.size() != 0, conf)); + reducerMap.size() != 0, conf)); } } - @Override - public <A extends Writable> A getAggregatedValue(String name) { - A value = (A) previousAggregatedValueMap.get(name); - if (value == null) { - LOG.warn("getAggregatedValue: " + + /** + * Combine partially reduced value into currently reduced value. + * @param name Name of the reducer + * @param valueToReduce Partial value to reduce + */ + protected void reducePartial(String name, Writable valueToReduce) { + Reducer<Object, Writable> reducer = reducerMap.get(name); + if (reducer != null) { + progressable.progress(); + synchronized (reducer) { + reducer.reducePartial(valueToReduce); + } + } else { + throw new IllegalStateException("reduce: " + AggregatorUtils.getUnregisteredAggregatorMessage(name, - previousAggregatedValueMap.size() != 0, conf)); + reducerMap.size() != 0, conf)); } - return value; } /** @@ -128,53 +134,35 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { */ public void prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor) { + broadcastedMap.clear(); + reducerMap.clear(); + if (LOG.isDebugEnabled()) { LOG.debug("prepareSuperstep: Start preparing aggregators"); } - AllAggregatorServerData allAggregatorData = + AllAggregatorServerData allGlobalCommData = serviceWorker.getServerData().getAllAggregatorData(); // Wait for my aggregators Iterable<byte[]> dataToDistribute = - allAggregatorData.getDataFromMasterWhenReady( + allGlobalCommData.getDataFromMasterWhenReady( serviceWorker.getMasterInfo()); try { // Distribute my aggregators - requestProcessor.distributeAggregators(dataToDistribute); + requestProcessor.distributeReducedValues(dataToDistribute); } catch (IOException e) { throw new IllegalStateException("prepareSuperstep: " + "IOException occurred while trying to distribute aggregators", e); } // Wait for all other aggregators and store them - allAggregatorData.fillNextSuperstepMapsWhenReady( - getOtherWorkerIdsSet(), previousAggregatedValueMap, - currentAggregatorFactoryMap); - fillAndInitAggregatorsMap(currentAggregatorMap); - allAggregatorData.reset(); + allGlobalCommData.fillNextSuperstepMapsWhenReady( + getOtherWorkerIdsSet(), broadcastedMap, + reducerMap); if (LOG.isDebugEnabled()) { LOG.debug("prepareSuperstep: Aggregators prepared"); } } /** - * Fills aggregators map from currentAggregatorFactoryMap. - * All aggregators in this map will be set to initial value. - * @param aggregatorMap Map to fill. - */ - private void fillAndInitAggregatorsMap( - Map<String, Aggregator<Writable>> aggregatorMap) { - for (Map.Entry<String, Factory<Aggregator<Writable>>> entry : - currentAggregatorFactoryMap.entrySet()) { - Aggregator<Writable> aggregator = - aggregatorMap.get(entry.getKey()); - if (aggregator == null) { - aggregatorMap.put(entry.getKey(), entry.getValue().create()); - } else { - aggregator.reset(); - } - } - } - - /** * Send aggregators to their owners and in the end to the master * * @param requestProcessor Request processor for aggregators @@ -186,19 +174,19 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { "workers will send their aggregated values " + "once they are done with superstep computation"); } - OwnerAggregatorServerData ownerAggregatorData = + OwnerAggregatorServerData ownerGlobalCommData = serviceWorker.getServerData().getOwnerAggregatorData(); // First send partial aggregated values to their owners and determine // which aggregators belong to this worker - for (Map.Entry<String, Aggregator<Writable>> entry : - currentAggregatorMap.entrySet()) { + for (Map.Entry<String, Reducer<Object, Writable>> entry : + reducerMap.entrySet()) { try { - boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(), - entry.getValue().getAggregatedValue()); + boolean sent = requestProcessor.sendReducedValue(entry.getKey(), + entry.getValue().getCurrentValue()); if (!sent) { // If it's my aggregator, add it directly - ownerAggregatorData.aggregate(entry.getKey(), - entry.getValue().getAggregatedValue()); + ownerGlobalCommData.reduce(entry.getKey(), + entry.getValue().getCurrentValue()); } } catch (IOException e) { throw new IllegalStateException("finishSuperstep: " + @@ -216,20 +204,21 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { } // Wait to receive partial aggregated values from all other workers - Iterable<Map.Entry<String, Writable>> myAggregators = - ownerAggregatorData.getMyAggregatorValuesWhenReady( + Iterable<Map.Entry<String, Writable>> myReducedValues = + ownerGlobalCommData.getMyReducedValuesWhenReady( getOtherWorkerIdsSet()); // Send final aggregated values to master - AggregatedValueOutputStream aggregatorOutput = - new AggregatedValueOutputStream(); - for (Map.Entry<String, Writable> entry : myAggregators) { + GlobalCommValueOutputStream globalOutput = + new GlobalCommValueOutputStream(false); + for (Map.Entry<String, Writable> entry : myReducedValues) { try { - int currentSize = aggregatorOutput.addAggregator(entry.getKey(), + int currentSize = globalOutput.addValue(entry.getKey(), + GlobalCommType.REDUCED_VALUE, entry.getValue()); if (currentSize > maxBytesPerAggregatorRequest) { - requestProcessor.sendAggregatedValuesToMaster( - aggregatorOutput.flush()); + requestProcessor.sendReducedValuesToMaster( + globalOutput.flush()); } progressable.progress(); } catch (IOException e) { @@ -239,7 +228,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { } } try { - requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush()); + requestProcessor.sendReducedValuesToMaster(globalOutput.flush()); } catch (IOException e) { throw new IllegalStateException("finishSuperstep: " + "IOException occured while sending aggregators to master", e); @@ -247,7 +236,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { // Wait for master to receive aggregated values before proceeding serviceWorker.getWorkerClient().waitAllRequests(); - ownerAggregatorData.reset(); + ownerGlobalCommData.reset(); if (LOG.isDebugEnabled()) { LOG.debug("finishSuperstep: Aggregators finished"); } @@ -259,9 +248,9 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { * * @return New aggregator usage */ - public WorkerThreadAggregatorUsage newThreadAggregatorUsage() { + public WorkerThreadGlobalCommUsage newThreadAggregatorUsage() { if (AggregatorUtils.useThreadLocalAggregators(conf)) { - return new ThreadLocalWorkerAggregatorUsage(); + return new ThreadLocalWorkerGlobalCommUsage(); } else { return this; } @@ -290,56 +279,70 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { } /** - * Not thread-safe implementation of {@link WorkerThreadAggregatorUsage}. - * We can use one instance of this object per thread to prevent - * synchronizing on each aggregate() call. In the end of superstep, - * values from each of these will be aggregated back to {@link - * WorkerAggregatorHandler} - */ - public class ThreadLocalWorkerAggregatorUsage - implements WorkerThreadAggregatorUsage { - /** Thread-local aggregator map */ - private final Map<String, Aggregator<Writable>> threadAggregatorMap; + * Not thread-safe implementation of {@link WorkerThreadGlobalCommUsage}. + * We can use one instance of this object per thread to prevent + * synchronizing on each aggregate() call. In the end of superstep, + * values from each of these will be aggregated back to {@link + * WorkerThreadGlobalCommUsage} + */ + public class ThreadLocalWorkerGlobalCommUsage + implements WorkerThreadGlobalCommUsage { + /** Thread-local reducer map */ + private final Map<String, Reducer<Object, Writable>> threadReducerMap; /** - * Constructor - * - * Creates new instances of all aggregators from - * {@link WorkerAggregatorHandler} - */ - public ThreadLocalWorkerAggregatorUsage() { - threadAggregatorMap = Maps.newHashMapWithExpectedSize( - WorkerAggregatorHandler.this.currentAggregatorMap.size()); - fillAndInitAggregatorsMap(threadAggregatorMap); + * Constructor + * + * Creates new instances of all reducers from + * {@link WorkerAggregatorHandler} + */ + public ThreadLocalWorkerGlobalCommUsage() { + threadReducerMap = Maps.newHashMapWithExpectedSize( + WorkerAggregatorHandler.this.reducerMap.size()); + + UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream(); + UnsafeReusableByteArrayInput in = new UnsafeReusableByteArrayInput(); + + for (Entry<String, Reducer<Object, Writable>> entry : + reducerMap.entrySet()) { + ReduceOperation<Object, Writable> globalReduceOp = + entry.getValue().getReduceOp(); + + ReduceOperation<Object, Writable> threadLocalCopy = + WritableUtils.createCopy(out, in, globalReduceOp); + + threadReducerMap.put(entry.getKey(), new Reducer<>(threadLocalCopy)); + } } @Override - public <A extends Writable> void aggregate(String name, A value) { - Aggregator<Writable> aggregator = threadAggregatorMap.get(name); - if (aggregator != null) { + public void reduce(String name, Object value) { + Reducer<Object, Writable> reducer = threadReducerMap.get(name); + if (reducer != null) { progressable.progress(); - aggregator.aggregate(value); + reducer.reduceSingle(value); } else { - throw new IllegalStateException("aggregate: " + + throw new IllegalStateException("reduce: " + AggregatorUtils.getUnregisteredAggregatorMessage(name, - threadAggregatorMap.size() != 0, conf)); + threadReducerMap.size() != 0, conf)); } } @Override - public <A extends Writable> A getAggregatedValue(String name) { - return WorkerAggregatorHandler.this.<A>getAggregatedValue(name); + public <B extends Writable> B getBroadcast(String name) { + return WorkerAggregatorHandler.this.getBroadcast(name); } @Override public void finishThreadComputation() { // Aggregate the values this thread's vertices provided back to // WorkerAggregatorHandler - for (Map.Entry<String, Aggregator<Writable>> entry : - threadAggregatorMap.entrySet()) { - WorkerAggregatorHandler.this.aggregate(entry.getKey(), - entry.getValue().getAggregatedValue()); + for (Entry<String, Reducer<Object, Writable>> entry : + threadReducerMap.entrySet()) { + WorkerAggregatorHandler.this.reducePartial(entry.getKey(), + entry.getValue().getCurrentValue()); } } } + } http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java index 7a55d56..b977ea1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerContext.java @@ -18,30 +18,28 @@ package org.apache.giraph.worker; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.requests.SendWorkerToWorkerMessageRequest; -import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.giraph.graph.GraphState; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; - /** * WorkerContext allows for the execution of user code * on a per-worker basis. There's one WorkerContext per worker. */ @SuppressWarnings("rawtypes") public abstract class WorkerContext - extends DefaultImmutableClassesGiraphConfigurable - implements WorkerAggregatorUsage, Writable { + extends WorkerAggregatorDelegator<WritableComparable, Writable, Writable> + implements Writable { /** Global graph state */ private GraphState graphState; - /** Worker aggregator usage */ - private WorkerAggregatorUsage workerAggregatorUsage; /** Service worker */ private CentralizedServiceWorker serviceWorker; @@ -71,16 +69,6 @@ public abstract class WorkerContext } /** - * Set worker aggregator usage - * - * @param workerAggregatorUsage Worker aggregator usage - */ - public void setWorkerAggregatorUsage( - WorkerAggregatorUsage workerAggregatorUsage) { - this.workerAggregatorUsage = workerAggregatorUsage; - } - - /** * Initialize the WorkerContext. * This method is executed once on each Worker before the first * superstep starts. @@ -196,16 +184,6 @@ public abstract class WorkerContext return graphState.getContext(); } - @Override - public <A extends Writable> void aggregate(String name, A value) { - workerAggregatorUsage.aggregate(name, value); - } - - @Override - public <A extends Writable> A getAggregatedValue(String name) { - return workerAggregatorUsage.<A>getAggregatedValue(name); - } - /** * Call this to log a line to command line of the job. Use in moderation - * it's a synchronous call to Job client http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java new file mode 100644 index 0000000..39566f5 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.worker; + +import org.apache.hadoop.io.Writable; + +/** + * Methods on worker can access broadcasted values and provide + * values to reduce through this interface + */ +public interface WorkerGlobalCommUsage { + /** + * Reduce given value. + * @param name Name of the reducer + * @param value Single value to reduce + */ + void reduce(String name, Object value); + /** + * Get value broadcasted from master + * @param name Name of the broadcasted value + * @return Broadcasted value + * @param <B> Broadcast value type + */ + <B extends Writable> B getBroadcast(String name); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java deleted file mode 100644 index 194127e..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadAggregatorUsage.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.worker; - -/** - * {@link WorkerAggregatorUsage} which can be used in each of the - * computation threads. - */ -public interface WorkerThreadAggregatorUsage extends WorkerAggregatorUsage { - /** - * Call this after thread's computation is finished, - * i.e. when all vertices have provided their values to aggregators - */ - void finishThreadComputation(); -} http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java new file mode 100644 index 0000000..8edbdc7 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerThreadGlobalCommUsage.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.worker; + + +/** + * {@link WorkerAggregatorUsage} which can be used in each of the + * computation threads. + */ +public interface WorkerThreadGlobalCommUsage extends WorkerGlobalCommUsage { + /** + * Call this after thread's computation is finished, + * i.e. when all vertices have provided their values to aggregators + */ + void finishThreadComputation(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java index 488e1ea..26459c0 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java +++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java @@ -18,6 +18,24 @@ package org.apache.giraph; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.giraph.aggregators.TextAggregatorWriter; import org.apache.giraph.combiner.SimpleSumMessageCombiner; import org.apache.giraph.conf.GiraphConfiguration; @@ -62,24 +80,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Closeables; -import java.io.BufferedReader; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - /** * Unit test for many simple BSP applications. */ @@ -456,8 +456,8 @@ public class assertEquals(maxSuperstep + 2, maxValues.size()); assertEquals(maxSuperstep + 2, vertexCounts.size()); - assertEquals(maxPageRank, (double) maxValues.get(maxSuperstep), 0d); - assertEquals(minPageRank, (double) minValues.get(maxSuperstep), 0d); + assertEquals(maxPageRank, maxValues.get(maxSuperstep), 0d); + assertEquals(minPageRank, minValues.get(maxSuperstep), 0d); assertEquals(numVertices, (long) vertexCounts.get(maxSuperstep)); } finally { http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java index eb3f686..602ab32 100644 --- a/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java +++ b/giraph-examples/src/test/java/org/apache/giraph/aggregators/TestAggregatorsHandling.java @@ -18,34 +18,19 @@ package org.apache.giraph.aggregators; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + import org.apache.giraph.BspCase; import org.apache.giraph.comm.aggregators.AggregatorUtils; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConstants; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.examples.AggregatorsTestComputation; import org.apache.giraph.examples.SimpleCheckpoint; import org.apache.giraph.job.GiraphJob; -import org.apache.giraph.master.MasterAggregatorHandler; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.util.Progressable; import org.junit.Test; -import org.mockito.Mockito; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; /** Tests if aggregators are handled on a proper way */ public class TestAggregatorsHandling extends BspCase { @@ -54,21 +39,6 @@ public class TestAggregatorsHandling extends BspCase { super(TestAggregatorsHandling.class.getName()); } - private Map<String, AggregatorWrapper<Writable>> getAggregatorMap - (MasterAggregatorHandler aggregatorHandler) { - try { - Field aggregtorMapField = aggregatorHandler.getClass().getDeclaredField - ("aggregatorMap"); - aggregtorMapField.setAccessible(true); - return (Map<String, AggregatorWrapper<Writable>>) - aggregtorMapField.get(aggregatorHandler); - } catch (IllegalAccessException e) { - throw new IllegalStateException(e); - } catch (NoSuchFieldException e) { - throw new IllegalStateException(e); - } - } - /** Tests if aggregators are handled on a proper way during supersteps */ @Test public void testAggregatorsHandling() throws IOException, @@ -88,64 +58,6 @@ public class TestAggregatorsHandling extends BspCase { assertTrue(job.run(true)); } - /** Test if aggregators serialization captures everything */ - @Test - public void testMasterAggregatorsSerialization() throws - IllegalAccessException, InstantiationException, IOException { - ImmutableClassesGiraphConfiguration conf = - Mockito.mock(ImmutableClassesGiraphConfiguration.class); - Mockito.when(conf.getAggregatorWriterClass()).thenReturn( - TextAggregatorWriter.class); - Progressable progressable = Mockito.mock(Progressable.class); - MasterAggregatorHandler handler = - new MasterAggregatorHandler(conf, progressable); - - String regularAggName = "regular"; - LongWritable regularValue = new LongWritable(5); - handler.registerAggregator(regularAggName, LongSumAggregator.class); - handler.setAggregatedValue(regularAggName, regularValue); - - String persistentAggName = "persistent"; - DoubleWritable persistentValue = new DoubleWritable(10.5); - handler.registerPersistentAggregator(persistentAggName, - DoubleOverwriteAggregator.class); - handler.setAggregatedValue(persistentAggName, persistentValue); - - for (AggregatorWrapper<Writable> aggregator : - getAggregatorMap(handler).values()) { - aggregator.setPreviousAggregatedValue( - aggregator.getCurrentAggregatedValue()); - } - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - handler.write(new DataOutputStream(out)); - - MasterAggregatorHandler restartedHandler = - new MasterAggregatorHandler(conf, progressable); - restartedHandler.readFields( - new DataInputStream(new ByteArrayInputStream(out.toByteArray()))); - - assertEquals(2, getAggregatorMap(restartedHandler).size()); - - AggregatorWrapper<Writable> regularAgg = - getAggregatorMap(restartedHandler).get(regularAggName); - assertTrue(regularAgg.getAggregatorFactory().create().getClass().equals( - LongSumAggregator.class)); - assertEquals(regularValue, regularAgg.getPreviousAggregatedValue()); - assertEquals(regularValue, - restartedHandler.<LongWritable>getAggregatedValue(regularAggName)); - assertFalse(regularAgg.isPersistent()); - - AggregatorWrapper<Writable> persistentAgg = - getAggregatorMap(restartedHandler).get(persistentAggName); - assertTrue(persistentAgg.getAggregatorFactory().create().getClass().equals - (DoubleOverwriteAggregator.class)); - assertEquals(persistentValue, persistentAgg.getPreviousAggregatedValue()); - assertEquals(persistentValue, - restartedHandler.<LongWritable>getAggregatedValue(persistentAggName)); - assertTrue(persistentAgg.isPersistent()); - } - /** * Test if aggregators are are handled properly when restarting from a * checkpoint
