http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java new file mode 100644 index 0000000..7171f04 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendReducedToMasterRequest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.comm.requests; + +import java.io.IOException; + +import org.apache.giraph.master.MasterAggregatorHandler; + +/** + * Request to send final aggregated values from worker which owns + * aggregators to the master + */ +public class SendReducedToMasterRequest extends ByteArrayRequest + implements MasterRequest { + + /** + * Constructor + * + * @param data Serialized aggregator data + */ + public SendReducedToMasterRequest(byte[] data) { + super(data); + } + + /** + * Constructor used for reflection only + */ + public SendReducedToMasterRequest() { + } + + @Override + public void doRequest(MasterAggregatorHandler aggregatorHandler) { + try { + aggregatorHandler.acceptReducedValues(getDataInput()); + } catch (IOException e) { + throw new IllegalStateException("doRequest: " + + "IOException occurred while processing request", e); + } + } + + @Override + public RequestType getType() { + return RequestType.SEND_AGGREGATORS_TO_MASTER_REQUEST; + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java index 00a0c26..2f76e6e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java @@ -18,15 +18,15 @@ package org.apache.giraph.comm.requests; +import java.io.DataInput; +import java.io.IOException; + +import org.apache.giraph.comm.GlobalCommType; import org.apache.giraph.comm.ServerData; -import org.apache.giraph.comm.aggregators.AggregatorUtils; import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; -import java.io.DataInput; -import java.io.IOException; - /** * Request to send partial aggregated values for current superstep (values * which were computed by one worker's vertices) @@ -56,20 +56,23 @@ public class SendWorkerAggregatorsRequest extends OwnerAggregatorServerData aggregatorData = serverData.getOwnerAggregatorData(); try { - int numAggregators = input.readInt(); - for (int i = 0; i < numAggregators; i++) { - String aggregatorName = input.readUTF(); - if (aggregatorName.equals( - AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { - LongWritable count = new LongWritable(0); - count.readFields(input); - aggregatorData.receivedRequestCountFromWorker(count.get(), + int num = input.readInt(); + for (int i = 0; i < num; i++) { + String name = input.readUTF(); + GlobalCommType type = GlobalCommType.values()[input.readByte()]; + if (type == GlobalCommType.SPECIAL_COUNT) { + LongWritable value = new LongWritable(); + value.readFields(input); + aggregatorData.receivedRequestCountFromWorker( + value.get(), getSenderTaskId()); + } else if (type == GlobalCommType.REDUCED_VALUE) { + Writable value = aggregatorData.createInitialValue(name); + value.readFields(input); + aggregatorData.reduce(name, value); } else { - Writable aggregatedValue = - aggregatorData.createAggregatorInitialValue(aggregatorName); - aggregatedValue.readFields(input); - aggregatorData.aggregate(aggregatorName, aggregatedValue); + throw new IllegalStateException( + "SendWorkerAggregatorsRequest received " + type); } } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java index e7c3084..1ea6603 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/AbstractComputation.java @@ -18,20 +18,20 @@ package org.apache.giraph.graph; +import java.io.IOException; +import java.util.Iterator; + import org.apache.giraph.comm.WorkerClientRequestProcessor; -import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.OutEdges; -import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.giraph.worker.WorkerAggregatorDelegator; import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.worker.WorkerGlobalCommUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; import org.apache.log4j.Logger; -import java.io.IOException; -import java.util.Iterator; - /** * See {@link Computation} for explanation of the interface. * @@ -52,7 +52,7 @@ import java.util.Iterator; public abstract class AbstractComputation<I extends WritableComparable, V extends Writable, E extends Writable, M1 extends Writable, M2 extends Writable> - extends DefaultImmutableClassesGiraphConfigurable<I, V, E> + extends WorkerAggregatorDelegator<I, V, E> implements Computation<I, V, E, M1, M2> { /** Logger */ private static final Logger LOG = Logger.getLogger(AbstractComputation.class); @@ -63,8 +63,6 @@ public abstract class AbstractComputation<I extends WritableComparable, private WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor; /** Graph-wide BSP Mapper for this Computation */ private GraphTaskManager<I, V, E> graphTaskManager; - /** Worker aggregator usage */ - private WorkerAggregatorUsage workerAggregatorUsage; /** Worker context */ private WorkerContext workerContext; @@ -76,6 +74,7 @@ public abstract class AbstractComputation<I extends WritableComparable, * superstep. Each message is only guaranteed to have * a life expectancy as long as next() is not called. */ + @Override public abstract void compute(Vertex<I, V, E> vertex, Iterable<M1> messages) throws IOException; @@ -103,7 +102,7 @@ public abstract class AbstractComputation<I extends WritableComparable, * @param graphState Graph state * @param workerClientRequestProcessor Processor for handling requests * @param graphTaskManager Graph-wide BSP Mapper for this Vertex - * @param workerAggregatorUsage Worker aggregator usage + * @param workerGlobalCommUsage Worker global communication usage * @param workerContext Worker context */ @Override @@ -111,12 +110,12 @@ public abstract class AbstractComputation<I extends WritableComparable, GraphState graphState, WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor, GraphTaskManager<I, V, E> graphTaskManager, - WorkerAggregatorUsage workerAggregatorUsage, + WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext workerContext) { this.graphState = graphState; this.workerClientRequestProcessor = workerClientRequestProcessor; this.graphTaskManager = graphTaskManager; - this.workerAggregatorUsage = workerAggregatorUsage; + this.setWorkerGlobalCommUsage(workerGlobalCommUsage); this.workerContext = workerContext; } @@ -274,14 +273,4 @@ public abstract class AbstractComputation<I extends WritableComparable, public <W extends WorkerContext> W getWorkerContext() { return (W) workerContext; } - - @Override - public <A extends Writable> void aggregate(String name, A value) { - workerAggregatorUsage.aggregate(name, value); - } - - @Override - public <A extends Writable> A getAggregatedValue(String name) { - return workerAggregatorUsage.<A>getAggregatedValue(name); - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java index 7a7b40f..d310da9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java @@ -17,6 +17,9 @@ */ package org.apache.giraph.graph; +import java.io.IOException; +import java.util.Iterator; + import org.apache.giraph.comm.WorkerClientRequestProcessor; import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable; import org.apache.giraph.conf.TypesHolder; @@ -24,13 +27,11 @@ import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.OutEdges; import org.apache.giraph.worker.WorkerAggregatorUsage; import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.worker.WorkerGlobalCommUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import java.io.IOException; -import java.util.Iterator; - /** * Interface for an application for computation. * @@ -55,7 +56,7 @@ public interface Computation<I extends WritableComparable, M2 extends Writable> extends TypesHolder<I, V, E, M1, M2>, ImmutableClassesGiraphConfigurable<I, V, E>, - WorkerAggregatorUsage { + WorkerGlobalCommUsage, WorkerAggregatorUsage { /** * Must be defined by user to do computation on a single Vertex. * @@ -87,13 +88,13 @@ public interface Computation<I extends WritableComparable, * @param graphState Graph state * @param workerClientRequestProcessor Processor for handling requests * @param graphTaskManager Graph-wide BSP Mapper for this Vertex - * @param workerAggregatorUsage Worker aggregator usage + * @param workerGlobalCommUsage Worker global communication usage * @param workerContext Worker context */ void initialize(GraphState graphState, WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor, GraphTaskManager<I, V, E> graphTaskManager, - WorkerAggregatorUsage workerAggregatorUsage, WorkerContext workerContext); + WorkerGlobalCommUsage workerGlobalCommUsage, WorkerContext workerContext); /** * Retrieves the current superstep. http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java index d9c4302..33f2255 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java @@ -36,7 +36,7 @@ import org.apache.giraph.utils.TimedLogger; import org.apache.giraph.utils.Trimmable; import org.apache.giraph.worker.WorkerContext; import org.apache.giraph.worker.WorkerProgress; -import org.apache.giraph.worker.WorkerThreadAggregatorUsage; +import org.apache.giraph.worker.WorkerThreadGlobalCommUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; @@ -135,7 +135,7 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable, WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor = new NettyWorkerClientRequestProcessor<I, V, E>( context, configuration, serviceWorker); - WorkerThreadAggregatorUsage aggregatorUsage = + WorkerThreadGlobalCommUsage aggregatorUsage = serviceWorker.getAggregatorHandler().newThreadAggregatorUsage(); WorkerContext workerContext = serviceWorker.getWorkerContext(); http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index ba5d2fa..eb9fad3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -18,6 +18,19 @@ package org.apache.giraph.graph; +import java.io.IOException; +import java.net.URL; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.bsp.CentralizedServiceWorker; @@ -26,9 +39,7 @@ import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.job.JobProgressTracker; -import org.apache.giraph.scripting.ScriptLoader; import org.apache.giraph.master.BspServiceMaster; -import org.apache.giraph.master.MasterAggregatorUsage; import org.apache.giraph.master.MasterThread; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.GiraphMetricsRegistry; @@ -40,6 +51,7 @@ import org.apache.giraph.partition.Partition; import org.apache.giraph.partition.PartitionOwner; import org.apache.giraph.partition.PartitionStats; import org.apache.giraph.partition.PartitionStore; +import org.apache.giraph.scripting.ScriptLoader; import org.apache.giraph.utils.CallableFactory; import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.ProgressableUtils; @@ -60,19 +72,6 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; -import java.io.IOException; -import java.net.URL; -import java.net.URLDecoder; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Enumeration; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - /** * The Giraph-specific business logic for a single BSP * compute node in whatever underlying type of cluster @@ -149,7 +148,7 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, /** Timer for WorkerContext#preSuperstep() */ private GiraphTimer wcPreSuperstepTimer; /** The Hadoop Mapper#Context for this job */ - private Mapper<?, ?, ?, ?>.Context context; + private final Mapper<?, ?, ?, ?>.Context context; /** is this GraphTaskManager the master? */ private boolean isMaster; @@ -497,15 +496,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, return graphFunctions; } - /** - * Get master aggregator usage, a subset of the functionality - * - * @return Master aggregator usage interface - */ - public final MasterAggregatorUsage getMasterAggregatorUsage() { - return serviceMaster.getAggregatorHandler(); - } - public final WorkerContext getWorkerContext() { return serviceWorker.getWorkerContext(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java index 1bc48e3..83a0369 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/EdgeReader.java @@ -19,9 +19,9 @@ package org.apache.giraph.io; import java.io.IOException; -import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; + import org.apache.giraph.edge.Edge; -import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.giraph.worker.WorkerAggregatorDelegator; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; @@ -36,11 +36,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; */ @SuppressWarnings("rawtypes") public abstract class EdgeReader<I extends WritableComparable, - E extends Writable> extends DefaultImmutableClassesGiraphConfigurable< - I, Writable, E> implements WorkerAggregatorUsage { - - /** Aggregator usage for edge reader */ - private WorkerAggregatorUsage workerAggregatorUsage; + E extends Writable> extends WorkerAggregatorDelegator< + I, Writable, E> { /** * Use the input split and context to setup reading the edges. @@ -56,21 +53,6 @@ public abstract class EdgeReader<I extends WritableComparable, throws IOException, InterruptedException; /** - * Set aggregator usage. It provides the functionality - * of aggregation operation in reading an edge. - * It is invoked just after initialization. - * E.g., - * edgeReader.initialize(inputSplit, context); - * edgeReader.setAggregator(aggregatorUsage); - * This method is only for use by the infrastructure. - * - * @param agg aggregator usage for edge reader - */ - public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) { - workerAggregatorUsage = agg; - } - - /** * Read the next edge. * * @return false iff there are no more edges @@ -117,14 +99,4 @@ public abstract class EdgeReader<I extends WritableComparable, * @throws InterruptedException */ public abstract float getProgress() throws IOException, InterruptedException; - - @Override - public <A extends Writable> void aggregate(String name, A value) { - workerAggregatorUsage.aggregate(name, value); - } - - @Override - public <A extends Writable> A getAggregatedValue(String name) { - return workerAggregatorUsage.<A>getAggregatedValue(name); - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java index b7ce97c..7c71585 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/MappingReader.java @@ -18,16 +18,15 @@ package org.apache.giraph.io; -import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import java.io.IOException; + import org.apache.giraph.mapping.MappingEntry; -import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.giraph.worker.WorkerAggregatorDelegator; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import java.io.IOException; - /** * Will read the mapping from an input split. * @@ -38,12 +37,7 @@ import java.io.IOException; */ public abstract class MappingReader<I extends WritableComparable, V extends Writable, E extends Writable, B extends Writable> - extends DefaultImmutableClassesGiraphConfigurable<I, V, E> - implements WorkerAggregatorUsage { - - /** Aggregator usage for vertex reader */ - private WorkerAggregatorUsage workerAggregatorUsage; - + extends WorkerAggregatorDelegator<I, V, E> { /** * Use the input split and context to setup reading the vertices. * Guaranteed to be called prior to any other function. @@ -57,22 +51,6 @@ public abstract class MappingReader<I extends WritableComparable, TaskAttemptContext context) throws IOException, InterruptedException; - - /** - * Set aggregator usage. It provides the functionality - * of aggregation operation in reading a vertex. - * It is invoked just after initialization. - * E.g., - * vertexReader.initialize(inputSplit, context); - * vertexReader.setAggregator(aggregatorUsage); - * This method is only for use by the infrastructure. - * - * @param agg aggregator usage for vertex reader - */ - public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) { - workerAggregatorUsage = agg; - } - /** * * @return false iff there are no more vertices @@ -111,14 +89,4 @@ public abstract class MappingReader<I extends WritableComparable, * @throws InterruptedException */ public abstract float getProgress() throws IOException, InterruptedException; - - @Override - public <A extends Writable> void aggregate(String name, A value) { - workerAggregatorUsage.aggregate(name, value); - } - - @Override - public <A extends Writable> A getAggregatedValue(String name) { - return workerAggregatorUsage.getAggregatedValue(name); - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java index 94a4083..64ec800 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/VertexReader.java @@ -18,16 +18,15 @@ package org.apache.giraph.io; -import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import java.io.IOException; + import org.apache.giraph.graph.Vertex; -import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.giraph.worker.WorkerAggregatorDelegator; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import java.io.IOException; - /** * Analogous to Hadoop's RecordReader for vertices. Will read the * vertices from an input split. @@ -39,11 +38,7 @@ import java.io.IOException; @SuppressWarnings("rawtypes") public abstract class VertexReader<I extends WritableComparable, V extends Writable, E extends Writable> extends - DefaultImmutableClassesGiraphConfigurable<I, V, E> - implements WorkerAggregatorUsage { - /** Aggregator usage for vertex reader */ - private WorkerAggregatorUsage workerAggregatorUsage; - + WorkerAggregatorDelegator<I, V, E> { /** * Use the input split and context to setup reading the vertices. * Guaranteed to be called prior to any other function. @@ -58,21 +53,6 @@ public abstract class VertexReader<I extends WritableComparable, throws IOException, InterruptedException; /** - * Set aggregator usage. It provides the functionality - * of aggregation operation in reading a vertex. - * It is invoked just after initialization. - * E.g., - * vertexReader.initialize(inputSplit, context); - * vertexReader.setAggregator(aggregatorUsage); - * This method is only for use by the infrastructure. - * - * @param agg aggregator usage for vertex reader - */ - public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) { - workerAggregatorUsage = agg; - } - - /** * * @return false iff there are no more vertices * @throws IOException @@ -108,14 +88,4 @@ public abstract class VertexReader<I extends WritableComparable, * @throws InterruptedException */ public abstract float getProgress() throws IOException, InterruptedException; - - @Override - public <A extends Writable> void aggregate(String name, A value) { - workerAggregatorUsage.aggregate(name, value); - } - - @Override - public <A extends Writable> A getAggregatedValue(String name) { - return workerAggregatorUsage.<A>getAggregatedValue(name); - } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java index 9b5e8c6..05dd5bc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedEdgeReader.java @@ -18,18 +18,18 @@ package org.apache.giraph.io.internal; +import java.io.IOException; + import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.Edge; import org.apache.giraph.io.EdgeReader; import org.apache.giraph.job.HadoopUtils; -import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.giraph.worker.WorkerGlobalCommUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import java.io.IOException; - /** * For internal use only. * @@ -72,9 +72,10 @@ public class WrappedEdgeReader<I extends WritableComparable, } @Override - public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) { - // Set aggregator usage for edge reader - baseEdgeReader.setWorkerAggregatorUse(agg); + public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) { + super.setWorkerGlobalCommUsage(usage); + // Set global communication usage for edge reader + baseEdgeReader.setWorkerGlobalCommUsage(usage); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java index 7d1c4c9..659776b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedMappingReader.java @@ -18,18 +18,18 @@ package org.apache.giraph.io.internal; +import java.io.IOException; + import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.io.MappingReader; import org.apache.giraph.job.HadoopUtils; import org.apache.giraph.mapping.MappingEntry; -import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.giraph.worker.WorkerGlobalCommUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import java.io.IOException; - /** * For internal use only. * @@ -74,11 +74,11 @@ public class WrappedMappingReader<I extends WritableComparable, HadoopUtils.makeTaskAttemptContext(getConf(), context)); } - @Override - public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) { - // Set aggregator usage for vertex reader - baseMappingReader.setWorkerAggregatorUse(agg); + public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) { + super.setWorkerGlobalCommUsage(usage); + // Set global communication usage for edge reader + baseMappingReader.setWorkerGlobalCommUsage(usage); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java index 8e25602..8c23cba 100644 --- a/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java +++ b/giraph-core/src/main/java/org/apache/giraph/io/internal/WrappedVertexReader.java @@ -18,18 +18,18 @@ package org.apache.giraph.io.internal; +import java.io.IOException; + import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.VertexReader; import org.apache.giraph.job.HadoopUtils; -import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.giraph.worker.WorkerGlobalCommUsage; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import java.io.IOException; - /** * For internal use only. * @@ -73,9 +73,10 @@ public class WrappedVertexReader<I extends WritableComparable, } @Override - public void setWorkerAggregatorUse(WorkerAggregatorUsage agg) { + public void setWorkerGlobalCommUsage(WorkerGlobalCommUsage usage) { + super.setWorkerGlobalCommUsage(usage); // Set aggregator usage for vertex reader - baseVertexReader.setWorkerAggregatorUse(agg); + baseVertexReader.setWorkerGlobalCommUsage(usage); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java new file mode 100644 index 0000000..1673f6d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.master; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.reducers.OnSameReduceOperation; +import org.apache.giraph.utils.WritableFactory; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.Writable; + +/** + * Translates aggregation operation to reduce operations. + * + * @param <A> Aggregation object type + */ +public class AggregatorReduceOperation<A extends Writable> + extends OnSameReduceOperation<A> { + /** Aggregator factory */ + private WritableFactory<? extends Aggregator<A>> aggregatorFactory; + /** Aggregator */ + private Aggregator<A> aggregator; + + /** Constructor */ + public AggregatorReduceOperation() { + } + + /** + * Constructor + * @param aggregatorFactory Aggregator factory + */ + public AggregatorReduceOperation( + WritableFactory<? extends Aggregator<A>> aggregatorFactory) { + this.aggregatorFactory = aggregatorFactory; + this.aggregator = aggregatorFactory.create(); + this.aggregator.setAggregatedValue(null); + } + + @Override + public A createInitialValue() { + return aggregator.createInitialValue(); + } + + /** + * Creates copy of this object + * @return copy + */ + public AggregatorReduceOperation<A> createCopy() { + return new AggregatorReduceOperation<>(aggregatorFactory); + } + + @Override + public synchronized void reduceSingle(A curValue, A valueToReduce) { + aggregator.setAggregatedValue(curValue); + aggregator.aggregate(valueToReduce); + if (curValue != aggregator.getAggregatedValue()) { + throw new IllegalStateException( + "Aggregator " + aggregator + " aggregates by creating new value"); + } + aggregator.setAggregatedValue(null); + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeWritableObject(aggregatorFactory, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + aggregatorFactory = WritableUtils.readWritableObject(in, null); + aggregator = aggregatorFactory.create(); + this.aggregator.setAggregatedValue(null); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java new file mode 100644 index 0000000..7492fc7 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.master; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map.Entry; + +import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.aggregators.ClassAggregatorFactory; +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.utils.WritableFactory; +import org.apache.hadoop.io.Writable; + +import com.google.common.base.Preconditions; + +/** + * Class that translates aggregator handling on the master to + * reduce and broadcast operations supported by the MasterAggregatorHandler. + */ +public class AggregatorToGlobalCommTranslation + extends DefaultImmutableClassesGiraphConfigurable + implements MasterAggregatorUsage, Writable { + /** Class providing reduce and broadcast interface to use */ + private final MasterGlobalCommUsage globalComm; + /** List of registered aggregators */ + private final HashMap<String, AggregatorWrapper<Writable>> + registeredAggregators = new HashMap<>(); + + /** + * Constructor + * @param globalComm Global communication interface + */ + public AggregatorToGlobalCommTranslation(MasterGlobalCommUsage globalComm) { + this.globalComm = globalComm; + } + + @Override + public <A extends Writable> A getAggregatedValue(String name) { + return globalComm.getReduced(name); + } + + @Override + public <A extends Writable> void setAggregatedValue(String name, A value) { + AggregatorWrapper<Writable> aggregator = registeredAggregators.get(name); + aggregator.setCurrentValue(value); + } + + /** + * Called after master compute, to do aggregator->reduce/broadcast + * translation + */ + public void postMasterCompute() { + // broadcast what master set, or if it didn't broadcast reduced value + // register reduce with the same value + for (Entry<String, AggregatorWrapper<Writable>> entry : + registeredAggregators.entrySet()) { + Writable value = entry.getValue().currentValue != null ? + entry.getValue().getCurrentValue() : + globalComm.getReduced(entry.getKey()); + if (value == null) { + value = entry.getValue().getReduceOp().createInitialValue(); + } + + globalComm.broadcast(entry.getKey(), value); + // Always register clean instance of reduceOp, not to conflict with + // reduceOp from previous superstep. + AggregatorReduceOperation<Writable> cleanReduceOp = + entry.getValue().createReduceOp(); + if (entry.getValue().isPersistent()) { + globalComm.registerReduce( + entry.getKey(), cleanReduceOp, value); + } else { + globalComm.registerReduce( + entry.getKey(), cleanReduceOp); + } + entry.getValue().setCurrentValue(null); + } + } + + @Override + public <A extends Writable> boolean registerAggregator(String name, + Class<? extends Aggregator<A>> aggregatorClass) throws + InstantiationException, IllegalAccessException { + ClassAggregatorFactory<A> aggregatorFactory = + new ClassAggregatorFactory<A>(aggregatorClass); + return registerAggregator(name, aggregatorFactory, false) != null; + } + + @Override + public <A extends Writable> boolean registerAggregator(String name, + WritableFactory<? extends Aggregator<A>> aggregator) throws + InstantiationException, IllegalAccessException { + return registerAggregator(name, aggregator, false) != null; + } + + @Override + public <A extends Writable> boolean registerPersistentAggregator(String name, + Class<? extends Aggregator<A>> aggregatorClass) throws + InstantiationException, IllegalAccessException { + ClassAggregatorFactory<A> aggregatorFactory = + new ClassAggregatorFactory<A>(aggregatorClass); + return registerAggregator(name, aggregatorFactory, true) != null; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(registeredAggregators.size()); + for (Entry<String, AggregatorWrapper<Writable>> entry : + registeredAggregators.entrySet()) { + out.writeUTF(entry.getKey()); + entry.getValue().write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + registeredAggregators.clear(); + int numAggregators = in.readInt(); + for (int i = 0; i < numAggregators; i++) { + String name = in.readUTF(); + AggregatorWrapper<Writable> agg = new AggregatorWrapper<>(); + agg.readFields(in); + registeredAggregators.put(name, agg); + } + } + + /** + * Helper function for registering aggregators. + * + * @param name Name of the aggregator + * @param aggregatorFactory Aggregator factory + * @param persistent Whether aggregator is persistent or not + * @param <A> Aggregated value type + * @return Newly registered aggregator or aggregator which was previously + * created with selected name, if any + */ + private <A extends Writable> AggregatorWrapper<A> registerAggregator + (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory, + boolean persistent) throws InstantiationException, + IllegalAccessException { + AggregatorWrapper<A> aggregatorWrapper = + (AggregatorWrapper<A>) registeredAggregators.get(name); + if (aggregatorWrapper == null) { + aggregatorWrapper = + new AggregatorWrapper<A>(aggregatorFactory, persistent); + registeredAggregators.put( + name, (AggregatorWrapper<Writable>) aggregatorWrapper); + } + return aggregatorWrapper; + } + + /** + * Object holding all needed data related to single Aggregator + * @param <A> Aggregated value type + */ + private static class AggregatorWrapper<A extends Writable> + implements Writable { + /** False iff aggregator should be reset at the end of each super step */ + private boolean persistent; + /** Translation of aggregator to reduce operations */ + private AggregatorReduceOperation<A> reduceOp; + /** Current value, set by master manually */ + private A currentValue; + + /** Constructor */ + public AggregatorWrapper() { + } + + /** + * Constructor + * @param aggregatorFactory Aggregator factory + * @param persistent Is persistent + */ + public AggregatorWrapper( + WritableFactory<? extends Aggregator<A>> aggregatorFactory, + boolean persistent) { + this.persistent = persistent; + this.reduceOp = new AggregatorReduceOperation<>(aggregatorFactory); + } + + public AggregatorReduceOperation<A> getReduceOp() { + return reduceOp; + } + + /** + * Create a fresh instance of AggregatorReduceOperation + * @return fresh instance of AggregatorReduceOperation + */ + public AggregatorReduceOperation<A> createReduceOp() { + return reduceOp.createCopy(); + } + + public A getCurrentValue() { + return currentValue; + } + + public void setCurrentValue(A currentValue) { + this.currentValue = currentValue; + } + + public boolean isPersistent() { + return persistent; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeBoolean(persistent); + reduceOp.write(out); + + Preconditions.checkState(currentValue == null, "AggregatorWrapper " + + "shouldn't have value at the end of the superstep"); + } + + @Override + public void readFields(DataInput in) throws IOException { + persistent = in.readBoolean(); + reduceOp = new AggregatorReduceOperation<>(); + reduceOp.readFields(in); + currentValue = null; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index efa5b87..ab1289d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -18,11 +18,39 @@ package org.apache.giraph.master; +import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT; +import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA; +import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT; +import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import net.iharder.Base64; + import org.apache.commons.io.FilenameUtils; import org.apache.giraph.bsp.ApplicationState; import org.apache.giraph.bsp.BspInputFormat; +import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.bsp.CheckpointStatus; import org.apache.giraph.bsp.SuperstepState; @@ -33,23 +61,17 @@ import org.apache.giraph.comm.netty.NettyMasterServer; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.counters.GiraphStats; -import org.apache.giraph.graph.InputSplitPaths; -import org.apache.giraph.graph.GlobalStats; import org.apache.giraph.graph.AddressesAndPartitionsWritable; +import org.apache.giraph.graph.GlobalStats; import org.apache.giraph.graph.GraphFunctions; -import org.apache.giraph.graph.InputSplitEvents; -import org.apache.giraph.bsp.BspService; import org.apache.giraph.graph.GraphState; +import org.apache.giraph.graph.GraphTaskManager; +import org.apache.giraph.graph.InputSplitEvents; +import org.apache.giraph.graph.InputSplitPaths; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.GiraphInputFormat; -import org.apache.giraph.graph.GraphTaskManager; import org.apache.giraph.io.MappingInputFormat; import org.apache.giraph.io.VertexInputFormat; -import org.apache.giraph.partition.BasicPartitionOwner; -import org.apache.giraph.partition.MasterGraphPartitioner; -import org.apache.giraph.partition.PartitionOwner; -import org.apache.giraph.partition.PartitionStats; -import org.apache.giraph.partition.PartitionUtils; import org.apache.giraph.metrics.AggregatedMetrics; import org.apache.giraph.metrics.GiraphMetrics; import org.apache.giraph.metrics.GiraphTimer; @@ -57,13 +79,18 @@ import org.apache.giraph.metrics.GiraphTimerContext; import org.apache.giraph.metrics.ResetSuperstepMetricsObserver; import org.apache.giraph.metrics.SuperstepMetricsRegistry; import org.apache.giraph.metrics.WorkerSuperstepMetrics; -import org.apache.giraph.utils.CheckpointingUtils; -import org.apache.giraph.utils.JMapHistoDumper; -import org.apache.giraph.utils.ReactiveJMapHistoDumper; -import org.apache.giraph.utils.ProgressableUtils; +import org.apache.giraph.partition.BasicPartitionOwner; +import org.apache.giraph.partition.MasterGraphPartitioner; +import org.apache.giraph.partition.PartitionOwner; +import org.apache.giraph.partition.PartitionStats; +import org.apache.giraph.partition.PartitionUtils; import org.apache.giraph.time.SystemTime; import org.apache.giraph.time.Time; +import org.apache.giraph.utils.CheckpointingUtils; +import org.apache.giraph.utils.JMapHistoDumper; import org.apache.giraph.utils.LogStacktraceCallable; +import org.apache.giraph.utils.ProgressableUtils; +import org.apache.giraph.utils.ReactiveJMapHistoDumper; import org.apache.giraph.utils.WritableUtils; import org.apache.giraph.worker.WorkerInfo; import org.apache.giraph.zk.BspEvent; @@ -89,32 +116,6 @@ import org.json.JSONObject; import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import net.iharder.Base64; - -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT; -import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA; -import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT; -import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY; /** * ZooKeeper-based implementation of {@link CentralizedServiceMaster}. @@ -167,8 +168,10 @@ public class BspServiceMaster<I extends WritableComparable, /** All the partition stats from the last superstep */ private final List<PartitionStats> allPartitionStatsList = new ArrayList<PartitionStats>(); - /** Handler for aggregators */ - private MasterAggregatorHandler aggregatorHandler; + /** Handler for global communication */ + private MasterAggregatorHandler globalCommHandler; + /** Handler for aggregators to reduce/broadcast translation */ + private AggregatorToGlobalCommTranslation aggregatorTranslation; /** Master class */ private MasterCompute masterCompute; /** IPC Client */ @@ -232,7 +235,7 @@ public class BspServiceMaster<I extends WritableComparable, this.checkpointStatus = CheckpointStatus.NONE; GiraphMetrics.get().addSuperstepResetObserver(this); - GiraphStats.init((Mapper.Context) context); + GiraphStats.init(context); } @Override @@ -738,8 +741,13 @@ public class BspServiceMaster<I extends WritableComparable, } @Override - public MasterAggregatorHandler getAggregatorHandler() { - return aggregatorHandler; + public MasterAggregatorHandler getGlobalCommHandler() { + return globalCommHandler; + } + + @Override + public AggregatorToGlobalCommTranslation getAggregatorTranslationHandler() { + return aggregatorTranslation; } @Override @@ -811,7 +819,8 @@ public class BspServiceMaster<I extends WritableComparable, }); - aggregatorHandler.readFields(finalizedStream); + globalCommHandler.readFields(finalizedStream); + aggregatorTranslation.readFields(finalizedStream); masterCompute.readFields(finalizedStream); finalizedStream.close(); @@ -883,9 +892,12 @@ public class BspServiceMaster<I extends WritableComparable, if (masterChildArr.get(0).equals(myBid)) { GiraphStats.getInstance().getCurrentMasterTaskPartition(). setValue(getTaskPartition()); - aggregatorHandler = new MasterAggregatorHandler(getConfiguration(), - getContext()); - aggregatorHandler.initialize(this); + globalCommHandler = new MasterAggregatorHandler( + getConfiguration(), getContext()); + aggregatorTranslation = new AggregatorToGlobalCommTranslation( + globalCommHandler); + + globalCommHandler.initialize(this); masterCompute = getConfiguration().createMasterCompute(); masterCompute.setMasterService(this); @@ -1097,7 +1109,8 @@ public class BspServiceMaster<I extends WritableComparable, for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) { finalizedOutputStream.writeInt(chosenWorkerInfo.getTaskId()); } - aggregatorHandler.write(finalizedOutputStream); + globalCommHandler.write(finalizedOutputStream); + aggregatorTranslation.write(finalizedOutputStream); masterCompute.write(finalizedOutputStream); finalizedOutputStream.close(); lastCheckpointedSuperstep = superstep; @@ -1502,7 +1515,8 @@ public class BspServiceMaster<I extends WritableComparable, */ private void initializeAggregatorInputSuperstep() throws InterruptedException { - aggregatorHandler.prepareSuperstep(masterClient); + globalCommHandler.prepareSuperstep(); + prepareMasterCompute(getSuperstep()); try { masterCompute.initialize(); @@ -1516,7 +1530,10 @@ public class BspServiceMaster<I extends WritableComparable, throw new RuntimeException( "initializeAggregatorInputSuperstep: Failed in access", e); } - aggregatorHandler.finishSuperstep(masterClient); + aggregatorTranslation.postMasterCompute(); + globalCommHandler.finishSuperstep(); + + globalCommHandler.sendDataToOwners(masterClient); } /** @@ -1579,18 +1596,18 @@ public class BspServiceMaster<I extends WritableComparable, } } + // We need to finalize aggregators from previous superstep + if (getSuperstep() >= 0) { + aggregatorTranslation.postMasterCompute(); + globalCommHandler.finishSuperstep(); + } + masterClient.openConnections(); GiraphStats.getInstance(). getCurrentWorkers().setValue(chosenWorkerInfoList.size()); assignPartitionOwners(); - // We need to finalize aggregators from previous superstep (send them to - // worker owners) after new worker assignments - if (getSuperstep() >= 0) { - aggregatorHandler.finishSuperstep(masterClient); - } - // Finalize the valid checkpoint file prefixes and possibly // the aggregators. if (checkpointStatus != CheckpointStatus.NONE) { @@ -1616,6 +1633,11 @@ public class BspServiceMaster<I extends WritableComparable, } } + // We need to send aggregators to worker owners after new worker assignments + if (getSuperstep() >= 0) { + globalCommHandler.sendDataToOwners(masterClient); + } + if (getSuperstep() == INPUT_SUPERSTEP) { // Initialize aggregators before coordinating initializeAggregatorInputSuperstep(); @@ -1645,7 +1667,7 @@ public class BspServiceMaster<I extends WritableComparable, // Collect aggregator values, then run the master.compute() and // finally save the aggregator values - aggregatorHandler.prepareSuperstep(masterClient); + globalCommHandler.prepareSuperstep(); SuperstepClasses superstepClasses = prepareMasterCompute(getSuperstep() + 1); doMasterCompute(); @@ -1710,7 +1732,7 @@ public class BspServiceMaster<I extends WritableComparable, } else { superstepState = SuperstepState.THIS_SUPERSTEP_DONE; } - aggregatorHandler.writeAggregators(getSuperstep(), superstepState); + globalCommHandler.writeAggregators(getSuperstep(), superstepState); return superstepState; } @@ -1935,7 +1957,7 @@ public class BspServiceMaster<I extends WritableComparable, failJob(new Exception("Checkpoint and halt requested. " + "Killing this job.")); } - aggregatorHandler.close(); + globalCommHandler.close(); masterClient.closeConnections(); masterServer.close(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java index 2b0cdd6..5f7bd73 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java @@ -15,263 +15,224 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.giraph.master; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.AbstractMap; import java.util.Map; +import java.util.Map.Entry; -import org.apache.giraph.aggregators.Aggregator; -import org.apache.giraph.aggregators.AggregatorWrapper; import org.apache.giraph.aggregators.AggregatorWriter; -import org.apache.giraph.aggregators.ClassAggregatorFactory; import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.SuperstepState; +import org.apache.giraph.comm.GlobalCommType; import org.apache.giraph.comm.MasterClient; -import org.apache.giraph.comm.aggregators.AggregatorUtils; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.utils.MasterLoggingAggregator; -import org.apache.giraph.utils.WritableFactory; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.reducers.Reducer; import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; -/** Handler for aggregators on master */ -public class MasterAggregatorHandler implements MasterAggregatorUsage, - Writable { +/** Handler for reduce/broadcast on the master */ +public class MasterAggregatorHandler + implements MasterGlobalCommUsage, Writable { /** Class logger */ private static final Logger LOG = Logger.getLogger(MasterAggregatorHandler.class); - /** - * Map of aggregators. - * This map is used to store final aggregated values received from worker - * owners, and also to read and write values provided during master.compute. - */ - private final Map<String, AggregatorWrapper<Writable>> aggregatorMap = + + /** Map of reducers registered for the next worker computation */ + private final Map<String, Reducer<Object, Writable>> reducerMap = + Maps.newHashMap(); + /** Map of values to be sent to workers for next computation */ + private final Map<String, Writable> broadcastMap = Maps.newHashMap(); - /** Aggregator writer */ + /** Values reduced from previous computation */ + private final Map<String, Writable> reducedMap = + Maps.newHashMap(); + + /** Aggregator writer - for writing reduced values */ private final AggregatorWriter aggregatorWriter; /** Progressable used to report progress */ private final Progressable progressable; - /** Giraph configuration */ - private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf; /** * Constructor * - * @param conf Giraph configuration - * @param progressable Progressable used for reporting progress + * @param conf Configuration + * @param progressable Progress reporter */ public MasterAggregatorHandler( ImmutableClassesGiraphConfiguration<?, ?, ?> conf, Progressable progressable) { - this.conf = conf; this.progressable = progressable; aggregatorWriter = conf.createAggregatorWriter(); - MasterLoggingAggregator.registerAggregator(this, conf); } @Override - public <A extends Writable> A getAggregatedValue(String name) { - AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name); - if (aggregator == null) { - LOG.warn("getAggregatedValue: " + - AggregatorUtils.getUnregisteredAggregatorMessage(name, - aggregatorMap.size() != 0, conf)); - return null; - } else { - return (A) aggregator.getPreviousAggregatedValue(); - } + public final <S, R extends Writable> void registerReduce( + String name, ReduceOperation<S, R> reduceOp) { + registerReduce(name, reduceOp, reduceOp.createInitialValue()); } @Override - public <A extends Writable> void setAggregatedValue(String name, A value) { - AggregatorWrapper<? extends Writable> aggregator = aggregatorMap.get(name); - if (aggregator == null) { - throw new IllegalStateException( - "setAggregatedValue: " + - AggregatorUtils.getUnregisteredAggregatorMessage(name, - aggregatorMap.size() != 0, conf)); + public <S, R extends Writable> void registerReduce( + String name, ReduceOperation<S, R> reduceOp, + R globalInitialValue) { + if (reducerMap.containsKey(name)) { + throw new IllegalArgumentException( + "Reducer with name " + name + " was already registered"); + } + if (reduceOp == null) { + throw new IllegalArgumentException("null reduce cannot be registered"); } - ((AggregatorWrapper<A>) aggregator).setCurrentAggregatedValue(value); - } - @Override - public <A extends Writable> boolean registerAggregator(String name, - Class<? extends Aggregator<A>> aggregatorClass) throws - InstantiationException, IllegalAccessException { - checkAggregatorName(name); - ClassAggregatorFactory<A> aggregatorFactory = - new ClassAggregatorFactory<A>(aggregatorClass, conf); - return registerAggregator(name, aggregatorFactory, false) != null; + Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue); + reducerMap.put(name, (Reducer<Object, Writable>) reducer); } @Override - public <A extends Writable> boolean registerAggregator(String name, - WritableFactory<? extends Aggregator<A>> aggregator) throws - InstantiationException, IllegalAccessException { - checkAggregatorName(name); - return registerAggregator(name, aggregator, false) != null; + public <T extends Writable> T getReduced(String name) { + return (T) reducedMap.get(name); } @Override - public <A extends Writable> boolean registerPersistentAggregator(String name, - Class<? extends Aggregator<A>> aggregatorClass) throws - InstantiationException, IllegalAccessException { - checkAggregatorName(name); - ClassAggregatorFactory<A> aggregatorFactory = - new ClassAggregatorFactory<A>(aggregatorClass, conf); - return registerAggregator(name, aggregatorFactory, true) != null; - } - - /** - * Make sure user doesn't use AggregatorUtils.SPECIAL_COUNT_AGGREGATOR as - * the name of aggregator. Throw an exception if he tries to use it. - * - * @param name Name of the aggregator to check. - */ - private void checkAggregatorName(String name) { - if (name.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { - throw new IllegalStateException("checkAggregatorName: " + - AggregatorUtils.SPECIAL_COUNT_AGGREGATOR + - " is not allowed for the name of aggregator"); + public void broadcast(String name, Writable object) { + if (broadcastMap.containsKey(name)) { + throw new IllegalArgumentException( + "Value already broadcasted for name " + name); } - } - - /** - * Helper function for registering aggregators. - * - * @param name Name of the aggregator - * @param aggregatorFactory Aggregator factory - * @param persistent Whether aggregator is persistent or not - * @param <A> Aggregated value type - * @return Newly registered aggregator or aggregator which was previously - * created with selected name, if any - */ - private <A extends Writable> AggregatorWrapper<A> registerAggregator - (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory, - boolean persistent) throws InstantiationException, - IllegalAccessException { - AggregatorWrapper<A> aggregatorWrapper = - (AggregatorWrapper<A>) aggregatorMap.get(name); - if (aggregatorWrapper == null) { - aggregatorWrapper = - new AggregatorWrapper<A>(aggregatorFactory, persistent, conf); - aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper); + if (object == null) { + throw new IllegalArgumentException("null cannot be broadcasted"); } - return aggregatorWrapper; + + broadcastMap.put(name, object); } - /** - * Prepare aggregators for current superstep - * - * @param masterClient IPC client on master - */ - public void prepareSuperstep(MasterClient masterClient) { + /** Prepare reduced values for current superstep's master compute */ + public void prepareSuperstep() { if (LOG.isDebugEnabled()) { - LOG.debug("prepareSuperstep: Start preparing aggregators"); + LOG.debug("prepareSuperstep: Start preparing reducers"); } - // prepare aggregators for master compute - for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) { - if (aggregator.isPersistent()) { - aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue()); + + Preconditions.checkState(reducedMap.isEmpty(), + "reducedMap must be empty before start of the superstep"); + Preconditions.checkState(broadcastMap.isEmpty(), + "broadcastMap must be empty before start of the superstep"); + + for (Entry<String, Reducer<Object, Writable>> entry : + reducerMap.entrySet()) { + Writable value = entry.getValue().getCurrentValue(); + if (value == null) { + value = entry.getValue().createInitialValue(); } - aggregator.setPreviousAggregatedValue( - aggregator.getCurrentAggregatedValue()); - aggregator.resetCurrentAggregator(); - progressable.progress(); + + reducedMap.put(entry.getKey(), value); } - MasterLoggingAggregator.logAggregatedValue(this, conf); + + reducerMap.clear(); + if (LOG.isDebugEnabled()) { LOG.debug("prepareSuperstep: Aggregators prepared"); } } - /** - * Finalize aggregators for current superstep and share them with workers - * - * @param masterClient IPC client on master - */ - public void finishSuperstep(MasterClient masterClient) { + /** Finalize aggregators for current superstep */ + public void finishSuperstep() { if (LOG.isDebugEnabled()) { LOG.debug("finishSuperstep: Start finishing aggregators"); } - for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) { - if (aggregator.isChanged()) { - // if master compute changed the value, use the one he chose - aggregator.setPreviousAggregatedValue( - aggregator.getCurrentAggregatedValue()); - // reset aggregator for the next superstep - aggregator.resetCurrentAggregator(); - } - progressable.progress(); + + reducedMap.clear(); + + if (LOG.isDebugEnabled()) { + LOG.debug("finishSuperstep: Aggregators finished"); } + } - // send aggregators to their owners - // TODO: if aggregator owner and it's value didn't change, - // we don't need to resend it + /** + * Send data to workers (through owner workers) + * + * @param masterClient IPC client on master + */ + public void sendDataToOwners(MasterClient masterClient) { + // send broadcast values and reduceOperations to their owners try { - for (Map.Entry<String, AggregatorWrapper<Writable>> entry : - aggregatorMap.entrySet()) { - masterClient.sendAggregator(entry.getKey(), - entry.getValue().getAggregatorFactory(), - entry.getValue().getPreviousAggregatedValue()); + for (Entry<String, Reducer<Object, Writable>> entry : + reducerMap.entrySet()) { + masterClient.sendToOwner(entry.getKey(), + GlobalCommType.REDUCE_OPERATIONS, + entry.getValue().getReduceOp()); progressable.progress(); } - masterClient.finishSendingAggregatedValues(); + + for (Entry<String, Writable> entry : broadcastMap.entrySet()) { + masterClient.sendToOwner(entry.getKey(), + GlobalCommType.BROADCAST, + entry.getValue()); + progressable.progress(); + } + masterClient.finishSendingValues(); + + broadcastMap.clear(); } catch (IOException e) { throw new IllegalStateException("finishSuperstep: " + "IOException occurred while sending aggregators", e); } - if (LOG.isDebugEnabled()) { - LOG.debug("finishSuperstep: Aggregators finished"); - } } /** - * Accept aggregated values sent by worker. Every aggregator will be sent + * Accept reduced values sent by worker. Every value will be sent * only once, by its owner. * We don't need to count the number of these requests because global * superstep barrier will happen after workers ensure all requests of this * type have been received and processed by master. * - * @param aggregatedValuesInput Input in which aggregated values are + * @param reducedValuesInput Input in which aggregated values are * written in the following format: - * number_of_aggregators - * name_1 value_1 - * name_2 value_2 + * numReducers + * name_1 REDUCED_VALUE value_1 + * name_2 REDUCED_VALUE value_2 * ... * @throws IOException */ - public void acceptAggregatedValues( - DataInput aggregatedValuesInput) throws IOException { - int numAggregators = aggregatedValuesInput.readInt(); - for (int i = 0; i < numAggregators; i++) { - String aggregatorName = aggregatedValuesInput.readUTF(); - AggregatorWrapper<Writable> aggregator = - aggregatorMap.get(aggregatorName); - if (aggregator == null) { + public void acceptReducedValues( + DataInput reducedValuesInput) throws IOException { + int numReducers = reducedValuesInput.readInt(); + for (int i = 0; i < numReducers; i++) { + String name = reducedValuesInput.readUTF(); + GlobalCommType type = + GlobalCommType.values()[reducedValuesInput.readByte()]; + if (type != GlobalCommType.REDUCED_VALUE) { throw new IllegalStateException( - "acceptAggregatedValues: " + - "Master received aggregator which isn't registered: " + - aggregatorName); + "SendReducedToMasterRequest received " + type); + } + Reducer<Object, Writable> reducer = reducerMap.get(name); + if (reducer == null) { + throw new IllegalStateException( + "acceptReducedValues: " + + "Master received reduced value which isn't registered: " + + name); + } + + Writable valueToReduce = reducer.createInitialValue(); + valueToReduce.readFields(reducedValuesInput); + + if (reducer.getCurrentValue() != null) { + reducer.reducePartial(valueToReduce); + } else { + reducer.setCurrentValue(valueToReduce); } - Writable aggregatorValue = aggregator.createInitialValue(); - aggregatorValue.readFields(aggregatedValuesInput); - aggregator.setCurrentAggregatedValue(aggregatorValue); progressable.progress(); } if (LOG.isDebugEnabled()) { - LOG.debug("acceptAggregatedValues: Accepted one set with " + - numAggregators + " aggregated values"); + LOG.debug("acceptReducedValues: Accepted one set with " + + numReducers + " aggregated values"); } } @@ -281,23 +242,10 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage, * @param superstep Superstep which just finished * @param superstepState State of the superstep which just finished */ - public void writeAggregators(long superstep, SuperstepState superstepState) { + public void writeAggregators( + long superstep, SuperstepState superstepState) { try { - Iterable<Map.Entry<String, Writable>> iter = - Iterables.transform( - aggregatorMap.entrySet(), - new Function<Map.Entry<String, AggregatorWrapper<Writable>>, - Map.Entry<String, Writable>>() { - @Override - public Map.Entry<String, Writable> apply( - Map.Entry<String, AggregatorWrapper<Writable>> entry) { - progressable.progress(); - return new AbstractMap.SimpleEntry<String, - Writable>(entry.getKey(), - entry.getValue().getPreviousAggregatedValue()); - } - }); - aggregatorWriter.writeAggregator(iter, + aggregatorWriter.writeAggregator(reducedMap.entrySet(), (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE) ? AggregatorWriter.LAST_SUPERSTEP : superstep); } catch (IOException e) { @@ -333,43 +281,44 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage, @Override public void write(DataOutput out) throws IOException { - out.writeInt(aggregatorMap.size()); - for (Map.Entry<String, AggregatorWrapper<Writable>> entry : - aggregatorMap.entrySet()) { + // At the end of superstep, only reduceOpMap can be non-empty + Preconditions.checkState(reducedMap.isEmpty(), + "reducedMap must be empty at the end of the superstep"); + + out.writeInt(reducerMap.size()); + for (Entry<String, Reducer<Object, Writable>> entry : + reducerMap.entrySet()) { out.writeUTF(entry.getKey()); - WritableUtils.writeWritableObject( - entry.getValue().getAggregatorFactory(), out); - out.writeBoolean(entry.getValue().isPersistent()); - entry.getValue().getPreviousAggregatedValue().write(out); + entry.getValue().write(out); progressable.progress(); } + + out.writeInt(broadcastMap.size()); + for (Entry<String, Writable> entry : broadcastMap.entrySet()) { + out.writeUTF(entry.getKey()); + WritableUtils.writeWritableObject(entry.getValue(), out); + } } @Override public void readFields(DataInput in) throws IOException { - aggregatorMap.clear(); - int numAggregators = in.readInt(); - try { - for (int i = 0; i < numAggregators; i++) { - String aggregatorName = in.readUTF(); - WritableFactory<Aggregator<Writable>> aggregatorFactory = - WritableUtils.readWritableObject(in, conf); - boolean isPersistent = in.readBoolean(); - AggregatorWrapper<Writable> aggregatorWrapper = registerAggregator( - aggregatorName, - aggregatorFactory, - isPersistent); - Writable value = aggregatorWrapper.createInitialValue(); - value.readFields(in); - aggregatorWrapper.setPreviousAggregatedValue(value); - progressable.progress(); - } - } catch (InstantiationException e) { - throw new IllegalStateException("readFields: " + - "InstantiationException occurred", e); - } catch (IllegalAccessException e) { - throw new IllegalStateException("readFields: " + - "IllegalAccessException occurred", e); + reducedMap.clear(); + broadcastMap.clear(); + reducerMap.clear(); + + int numReducers = in.readInt(); + for (int i = 0; i < numReducers; i++) { + String name = in.readUTF(); + Reducer<Object, Writable> reducer = new Reducer<>(); + reducer.readFields(in); + reducerMap.put(name, reducer); + } + + int numBroadcast = in.readInt(); + for (int i = 0; i < numBroadcast; i++) { + String name = in.readUTF(); + Writable value = WritableUtils.readWritableObject(in, null); + broadcastMap.put(name, value); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java index 552cca9..72e4d0a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java @@ -24,6 +24,7 @@ import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.giraph.graph.Computation; import org.apache.giraph.graph.GraphState; +import org.apache.giraph.reducers.ReduceOperation; import org.apache.giraph.utils.WritableFactory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Mapper; @@ -43,7 +44,7 @@ import org.apache.hadoop.mapreduce.Mapper; */ public abstract class MasterCompute extends DefaultImmutableClassesGiraphConfigurable - implements MasterAggregatorUsage, Writable { + implements MasterAggregatorUsage, MasterGlobalCommUsage, Writable { /** If true, do not do anymore computation on this vertex. */ private boolean halt = false; /** Master aggregator usage */ @@ -190,10 +191,33 @@ public abstract class MasterCompute } @Override + public final <S, R extends Writable> void registerReduce( + String name, ReduceOperation<S, R> reduceOp) { + serviceMaster.getGlobalCommHandler().registerReduce(name, reduceOp); + } + + @Override + public final <S, R extends Writable> void registerReduce( + String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) { + serviceMaster.getGlobalCommHandler().registerReduce( + name, reduceOp, globalInitialValue); + } + + @Override + public final <T extends Writable> T getReduced(String name) { + return serviceMaster.getGlobalCommHandler().getReduced(name); + } + + @Override + public final void broadcast(String name, Writable object) { + serviceMaster.getGlobalCommHandler().broadcast(name, object); + } + + @Override public final <A extends Writable> boolean registerAggregator( String name, Class<? extends Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException { - return serviceMaster.getAggregatorHandler().registerAggregator( + return serviceMaster.getAggregatorTranslationHandler().registerAggregator( name, aggregatorClass); } @@ -201,7 +225,7 @@ public abstract class MasterCompute public final <A extends Writable> boolean registerAggregator( String name, WritableFactory<? extends Aggregator<A>> aggregator) throws InstantiationException, IllegalAccessException { - return serviceMaster.getAggregatorHandler().registerAggregator( + return serviceMaster.getAggregatorTranslationHandler().registerAggregator( name, aggregator); } @@ -210,19 +234,21 @@ public abstract class MasterCompute String name, Class<? extends Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException { - return serviceMaster.getAggregatorHandler().registerPersistentAggregator( - name, aggregatorClass); + return serviceMaster.getAggregatorTranslationHandler() + .registerPersistentAggregator(name, aggregatorClass); } @Override public final <A extends Writable> A getAggregatedValue(String name) { - return serviceMaster.getAggregatorHandler().<A>getAggregatedValue(name); + return serviceMaster.getAggregatorTranslationHandler() + .<A>getAggregatedValue(name); } @Override public final <A extends Writable> void setAggregatedValue( String name, A value) { - serviceMaster.getAggregatorHandler().setAggregatedValue(name, value); + serviceMaster.getAggregatorTranslationHandler() + .setAggregatedValue(name, value); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java new file mode 100644 index 0000000..c3ce0ea --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.master; + +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.hadoop.io.Writable; + +/** + * Master compute can access reduce and broadcast methods + * through this interface, from masterCompute method. + */ +public interface MasterGlobalCommUsage { + /** + * Register reducer to be reduced in the next worker computation, + * using given name and operations. + * @param name Name of the reducer + * @param reduceOp Reduce operations + * @param <S> Single value type + * @param <R> Reduced value type + */ + <S, R extends Writable> void registerReduce( + String name, ReduceOperation<S, R> reduceOp); + + /** + * Register reducer to be reduced in the next worker computation, using + * given name and operations, starting globally from globalInitialValue. + * (globalInitialValue is reduced only once, each worker will still start + * from neutral initial value) + * + * @param name Name of the reducer + * @param reduceOp Reduce operations + * @param globalInitialValue Global initial value + * @param <S> Single value type + * @param <R> Reduced value type + */ + <S, R extends Writable> void registerReduce( + String name, ReduceOperation<S, R> reduceOp, R globalInitialValue); + + /** + * Get reduced value from previous worker computation. + * @param name Name of the reducer + * @return Reduced value + * @param <R> Reduced value type + */ + <R extends Writable> R getReduced(String name); + + /** + * Broadcast given value to all workers for next computation. + * @param name Name of the broadcast object + * @param value Value + */ + void broadcast(String name, Writable value); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java new file mode 100644 index 0000000..a675f4d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.reducers; + +import org.apache.hadoop.io.Writable; + +/** + * ReduceOperation object when single object being reduced is of + * same type as reduced value. + * + * @param <R> Reduced object type. + */ +public abstract class OnSameReduceOperation<R extends Writable> + implements ReduceOperation<R, R> { + @Override + public final void reducePartial(R curValue, R valueToReduce) { + reduceSingle(curValue, valueToReduce); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/f43f4500/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java new file mode 100644 index 0000000..434e21a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.reducers; + +import org.apache.hadoop.io.Writable; + +/** + * Reduce operations defining how to reduce single values + * passed on workers, into partial values on workers, and then + * into a single global reduced value. + * + * Object should be thread safe. Most frequently it should be + * immutable object, so that functions can execute concurrently. + * Rarely when object is mutable ({@link AggregatorReduceOperation}), + * i.e. stores reusable object inside, accesses should be synchronized. + * + * @param <S> Single value type, objects passed on workers + * @param <R> Reduced value type + */ +public interface ReduceOperation<S, R extends Writable> extends Writable { + /** + * Return new reduced value which is neutral to reduce operation. + * + * @return Neutral value + */ + R createInitialValue(); + /** + * Add a new value. + * Needs to be commutative and associative + * + * @param curValue Partial value into which to reduce and store the result + * @param valueToReduce Single value to be reduced + */ + void reduceSingle(R curValue, S valueToReduce); + /** + * Add partially reduced value to current partially reduced value. + * + * @param curValue Partial value into which to reduce and store the result + * @param valueToReduce Partial value to be reduced + */ + void reducePartial(R curValue, R valueToReduce); +}
