GIRAPH-934: Allow having state in aggregators (ikabiljo via majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ce97134d Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ce97134d Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ce97134d Branch: refs/heads/release-1.1 Commit: ce97134d253c4e9fca48b7cede2048e60f36ff79 Parents: 9303522 Author: Maja Kabiljo <[email protected]> Authored: Fri Aug 1 08:20:10 2014 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Fri Aug 1 08:20:10 2014 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/aggregators/AggregatorWrapper.java | 22 ++-- .../aggregators/ArrayAggregatorFactory.java | 128 +++++++++++++++++++ .../giraph/aggregators/BasicAggregator.java | 9 ++ .../aggregators/ClassAggregatorFactory.java | 87 +++++++++++++ .../org/apache/giraph/comm/MasterClient.java | 9 +- .../java/org/apache/giraph/comm/ServerData.java | 14 +- .../aggregators/AggregatorOutputStream.java | 12 +- .../comm/aggregators/AggregatorUtils.java | 39 +----- .../aggregators/AllAggregatorServerData.java | 69 ++++------ .../aggregators/OwnerAggregatorServerData.java | 26 ++-- .../comm/aggregators/SendAggregatorCache.java | 15 ++- .../giraph/comm/netty/NettyMasterClient.java | 17 +-- .../requests/SendAggregatorsToOwnerRequest.java | 19 +-- .../SendAggregatorsToWorkerRequest.java | 17 +-- .../giraph/comm/requests/WritableRequest.java | 2 +- .../giraph/master/MasterAggregatorHandler.java | 66 ++++++---- .../giraph/master/MasterAggregatorUsage.java | 16 +++ .../org/apache/giraph/master/MasterCompute.java | 8 ++ .../org/apache/giraph/utils/ArrayWritable.java | 100 +++++++++++++++ .../apache/giraph/utils/WritableFactory.java | 28 ++++ .../org/apache/giraph/utils/WritableUtils.java | 84 ++++++++++-- .../giraph/worker/WorkerAggregatorHandler.java | 52 +++++--- .../giraph/aggregators/TestArrayAggregator.java | 50 ++++++++ 24 files changed, 686 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 54ed3a3..dbb134a 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-934: Allow having state in aggregators (ikabiljo via majakabiljo) + GIRAPH-932: Adding .arcconfig to GIRAPH for Arcanist support (aching) GIRAPH-927: Decouple netty server threads from message processing (edunov via pavanka) http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java index 7150402..fa18a64 100644 --- a/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java +++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/AggregatorWrapper.java @@ -19,7 +19,7 @@ package org.apache.giraph.aggregators; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.utils.ReflectionUtils; +import org.apache.giraph.utils.WritableFactory; import org.apache.hadoop.io.Writable; /** @@ -34,21 +34,25 @@ public class AggregatorWrapper<A extends Writable> { private final boolean persistent; /** Value aggregated in previous super step */ private A previousAggregatedValue; + /** Aggregator factory */ + private final WritableFactory<? extends Aggregator<A>> aggregatorFactory; /** Aggregator for next super step */ private final Aggregator<A> currentAggregator; /** Whether anyone changed current value since the moment it was reset */ private boolean changed; /** - * @param aggregatorClass Class type of the aggregator - * @param persistent False iff aggregator should be reset at the end of - * each super step - * @param conf Configuration + * @param aggregatorFactory Aggregator Factory + * @param persistent False iff aggregator should be reset at the end + * of each super step + * @param conf Configuration */ - public AggregatorWrapper(Class<? extends Aggregator<A>> aggregatorClass, + public AggregatorWrapper( + WritableFactory<? extends Aggregator<A>> aggregatorFactory, boolean persistent, ImmutableClassesGiraphConfiguration conf) { this.persistent = persistent; - currentAggregator = ReflectionUtils.newInstance(aggregatorClass, conf); + this.aggregatorFactory = aggregatorFactory; + currentAggregator = aggregatorFactory.create(); changed = false; previousAggregatedValue = currentAggregator.createInitialValue(); } @@ -140,7 +144,7 @@ public class AggregatorWrapper<A extends Writable> { * * @return Aggregator class */ - public Class<? extends Aggregator> getAggregatorClass() { - return currentAggregator.getClass(); + public WritableFactory<? extends Aggregator<A>> getAggregatorFactory() { + return aggregatorFactory; } } http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java new file mode 100644 index 0000000..c977c57 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.aggregators; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Array; + +import org.apache.giraph.utils.ArrayWritable; +import org.apache.giraph.utils.WritableFactory; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.Writable; + +/** + * Generic array aggregator factory, used to aggregate elements + * of ArrayWritable via passed element aggregator. + * + * @param <A> Type of individual element + */ +public class ArrayAggregatorFactory<A extends Writable> + implements WritableFactory<Aggregator<ArrayWritable<A>>> { + /** number of elements in array */ + private int n; + /** element aggregator class */ + private WritableFactory<? extends Aggregator<A>> elementAggregatorFactory; + + /** + * Constructor + * @param n Number of elements in array + * @param elementAggregatorClass Type of element aggregator + */ + public ArrayAggregatorFactory( + int n, Class<? extends Aggregator<A>> elementAggregatorClass) { + this(n, new ClassAggregatorFactory<>(elementAggregatorClass)); + } + + /** + * Constructor + * @param n Number of elements in array + * @param elementAggregatorFactory Element aggregator factory + */ + public ArrayAggregatorFactory(int n, + WritableFactory<? extends Aggregator<A>> elementAggregatorFactory) { + this.n = n; + this.elementAggregatorFactory = elementAggregatorFactory; + } + + @Override + public void readFields(DataInput in) throws IOException { + n = in.readInt(); + elementAggregatorFactory = WritableUtils.readWritableObject(in, null); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(n); + WritableUtils.writeWritableObject(elementAggregatorFactory, out); + } + + @Override + public Aggregator<ArrayWritable<A>> create() { + return new ArrayAggregator<>( + n, elementAggregatorFactory.create()); + } + + /** + * Stateful aggregator that aggregates ArrayWritable by + * aggregating individual elements + * + * @param <A> Type of individual element + */ + public static class ArrayAggregator<A extends Writable> + extends BasicAggregator<ArrayWritable<A>> { + /** number of elements in array */ + private final int n; + /** element aggregator */ + private final Aggregator<A> elementAggregator; + + /** + * Constructor + * @param n Number of elements in array + * @param elementAggregator Element aggregator + */ + public ArrayAggregator(int n, Aggregator<A> elementAggregator) { + super(null); + this.n = n; + this.elementAggregator = elementAggregator; + reset(); + } + + @Override + public void aggregate(ArrayWritable<A> other) { + A[] array = getAggregatedValue().get(); + for (int i = 0; i < n; i++) { + elementAggregator.setAggregatedValue(array[i]); + elementAggregator.aggregate(other.get()[i]); + array[i] = elementAggregator.getAggregatedValue(); + } + } + + @Override + public ArrayWritable<A> createInitialValue() { + Class<A> elementClass = + (Class) elementAggregator.createInitialValue().getClass(); + A[] array = (A[]) Array.newInstance(elementClass, n); + for (int i = 0; i < n; i++) { + array[i] = elementAggregator.createInitialValue(); + } + return new ArrayWritable<>(elementClass, array); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java index 07a4100..c351846 100644 --- a/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java +++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/BasicAggregator.java @@ -40,6 +40,15 @@ public abstract class BasicAggregator<A extends Writable> implements value = createInitialValue(); } + /** + * Constructor + * @param initialValue initial value + */ + public BasicAggregator(A initialValue) { + value = initialValue; + } + + @Override public A getAggregatedValue() { return value; http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java new file mode 100644 index 0000000..944656e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.aggregators; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.ReflectionUtils; +import org.apache.giraph.utils.WritableFactory; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.Writable; + +import com.google.common.base.Preconditions; + +/** + * Aggregator factory based on aggregatorClass. + * + * @param <T> Aggregated value type + */ +public class ClassAggregatorFactory<T extends Writable> + extends DefaultImmutableClassesGiraphConfigurable + implements WritableFactory<Aggregator<T>> { + /** Aggregator class */ + private Class<? extends Aggregator<T>> aggregatorClass; + + /** Constructor */ + public ClassAggregatorFactory() { + } + + /** + * Constructor + * @param aggregatorClass Aggregator class + */ + public ClassAggregatorFactory( + Class<? extends Aggregator<T>> aggregatorClass) { + this(aggregatorClass, null); + + } + + /** + * Constructor + * @param aggregatorClass Aggregator class + * @param conf Configuration + */ + public ClassAggregatorFactory(Class<? extends Aggregator<T>> aggregatorClass, + ImmutableClassesGiraphConfiguration conf) { + Preconditions.checkNotNull(aggregatorClass, + "aggregatorClass cannot be null in ClassAggregatorFactory"); + this.aggregatorClass = aggregatorClass; + setConf(conf); + } + + @Override + public Aggregator<T> create() { + return ReflectionUtils.newInstance(aggregatorClass, getConf()); + } + + @Override + public void readFields(DataInput in) throws IOException { + aggregatorClass = WritableUtils.readClass(in); + } + + @Override + public void write(DataOutput out) throws IOException { + Preconditions.checkNotNull(aggregatorClass, + "aggregatorClass cannot be null in ClassAggregatorFactory"); + WritableUtils.writeClass(aggregatorClass, out); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java index 793d059..b7718a7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/MasterClient.java @@ -18,11 +18,12 @@ package org.apache.giraph.comm; +import java.io.IOException; + import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.utils.WritableFactory; import org.apache.hadoop.io.Writable; -import java.io.IOException; - /** * Interface for master to send messages to workers */ @@ -36,12 +37,12 @@ public interface MasterClient { * Sends aggregator to its owner * * @param aggregatorName Name of the aggregator - * @param aggregatorClass Class of the aggregator + * @param aggregatorFactory Aggregator factory * @param aggregatedValue Value of the aggregator * @throws IOException */ void sendAggregator(String aggregatorName, - Class<? extends Aggregator> aggregatorClass, + WritableFactory<? extends Aggregator> aggregatorFactory, Writable aggregatedValue) throws IOException; /** http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java index 29488fc..a92cd1c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java @@ -18,6 +18,12 @@ package org.apache.giraph.comm; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.aggregators.AllAggregatorServerData; import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData; @@ -36,12 +42,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Mapper; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ConcurrentHashMap; - /** * Anything that the server stores * @@ -123,7 +123,7 @@ public class ServerData<I extends WritableComparable, EdgeStoreFactory<I, V, E> edgeStoreFactory = conf.createEdgeStoreFactory(); edgeStoreFactory.initialize(service, conf, context); edgeStore = edgeStoreFactory.newStore(); - ownerAggregatorData = new OwnerAggregatorServerData(context, conf); + ownerAggregatorData = new OwnerAggregatorServerData(context); allAggregatorData = new AllAggregatorServerData(context, conf); } http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java index 627b4cc..79bc08a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java @@ -18,11 +18,13 @@ package org.apache.giraph.comm.aggregators; +import java.io.IOException; + import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.utils.WritableFactory; +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; -import java.io.IOException; - /** * Implementation of {@link CountingOutputStream} which allows writing of * aggregators in the form of triple (name, classname, value) @@ -32,17 +34,17 @@ public class AggregatorOutputStream extends CountingOutputStream { * Write aggregator to the stream and increment internal counter * * @param aggregatorName Name of the aggregator - * @param aggregatorClass Class of aggregator + * @param aggregatorFactory Aggregator factory * @param aggregatedValue Value of aggregator * @return Number of bytes occupied by the stream * @throws IOException */ public int addAggregator(String aggregatorName, - Class<? extends Aggregator> aggregatorClass, + WritableFactory<? extends Aggregator> aggregatorFactory, Writable aggregatedValue) throws IOException { incrementCounter(); dataOutput.writeUTF(aggregatorName); - dataOutput.writeUTF(aggregatorClass.getName()); + WritableUtils.writeWritableObject(aggregatorFactory, dataOutput); aggregatedValue.write(dataOutput); return getSize(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java index ceb30a8..a94ab38 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java @@ -18,13 +18,10 @@ package org.apache.giraph.comm.aggregators; +import java.util.List; + import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.aggregators.Aggregator; -import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.worker.WorkerInfo; -import org.apache.hadoop.io.Writable; - -import java.util.List; /** * Class for aggregator constants and utility methods @@ -36,6 +33,7 @@ public class AggregatorUtils { */ public static final String SPECIAL_COUNT_AGGREGATOR = "__aggregatorRequestCount"; + /** How big a single aggregator request can be (in bytes) */ public static final String MAX_BYTES_PER_AGGREGATOR_REQUEST = "giraph.maxBytesPerAggregatorRequest"; @@ -58,37 +56,6 @@ public class AggregatorUtils { private AggregatorUtils() { } /** - * Get aggregator class from class name, catch all exceptions. - * - * @param aggregatorClassName Class nam of aggregator class - * @return Aggregator class - */ - public static Class<Aggregator<Writable>> getAggregatorClass(String - aggregatorClassName) { - try { - return (Class<Aggregator<Writable>>) Class.forName(aggregatorClassName); - } catch (ClassNotFoundException e) { - throw new IllegalStateException("getAggregatorClass: " + - "ClassNotFoundException for aggregator class " + aggregatorClassName, - e); - } - } - - /** - * Create new aggregator instance from aggregator class, - * catch all exceptions. - * - * @param aggregatorClass Class of aggregator - * @param conf Configuration - * @return New aggregator - */ - public static Aggregator<Writable> newAggregatorInstance( - Class<Aggregator<Writable>> aggregatorClass, - ImmutableClassesGiraphConfiguration conf) { - return ReflectionUtils.newInstance(aggregatorClass, conf); - } - - /** * Get owner of aggregator with selected name from the list of workers * * @param aggregatorName Name of the aggregators http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java index 177e738..effc9bf 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java @@ -18,10 +18,18 @@ package org.apache.giraph.comm.aggregators; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.master.MasterInfo; +import org.apache.giraph.utils.Factory; import org.apache.giraph.utils.TaskIdsPermitsBarrier; +import org.apache.giraph.utils.WritableFactory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; @@ -29,12 +37,6 @@ import org.apache.log4j.Logger; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; - /** * Accepts aggregators and their values from previous superstep from master * and workers which own aggregators. Keeps data received from master so it @@ -49,16 +51,9 @@ public class AllAggregatorServerData { /** Class logger */ private static final Logger LOG = Logger.getLogger(AllAggregatorServerData.class); - /** - * Map from aggregator class to aggregator object which we need in order - * to create initial aggregated values - */ - private final - ConcurrentMap<Class<Aggregator<Writable>>, Aggregator<Writable>> - aggregatorTypesMap = Maps.newConcurrentMap(); - /** Map of aggregator classes */ - private final ConcurrentMap<String, Class<Aggregator<Writable>>> - aggregatorClassMap = Maps.newConcurrentMap(); + /** Map of aggregator factories */ + private final ConcurrentMap<String, WritableFactory<Aggregator<Writable>>> + aggregatorFactoriesMap = Maps.newConcurrentMap(); /** Map of values of aggregators from previous superstep */ private final ConcurrentMap<String, Writable> aggregatedValuesMap = Maps.newConcurrentMap(); @@ -104,16 +99,12 @@ public class AllAggregatorServerData { /** * Register the class of the aggregator, received by master or worker. * - * @param name Aggregator name - * @param aggregatorClass Class of the aggregator + * @param name Aggregator name + * @param aggregatorFactory Aggregator factory */ public void registerAggregatorClass(String name, - Class<Aggregator<Writable>> aggregatorClass) { - aggregatorClassMap.put(name, aggregatorClass); - if (!aggregatorTypesMap.containsKey(aggregatorClass)) { - aggregatorTypesMap.putIfAbsent(aggregatorClass, - AggregatorUtils.newAggregatorInstance(aggregatorClass, conf)); - } + WritableFactory<Aggregator<Writable>> aggregatorFactory) { + aggregatorFactoriesMap.put(name, aggregatorFactory); progressable.progress(); } @@ -139,10 +130,10 @@ public class AllAggregatorServerData { * @return Empty aggregated value for this aggregator */ public Writable createAggregatorInitialValue(String name) { - Class<Aggregator<Writable>> aggregatorClass = aggregatorClassMap.get(name); - Aggregator<Writable> aggregator = aggregatorTypesMap.get(aggregatorClass); - synchronized (aggregator) { - return aggregator.createInitialValue(); + WritableFactory<Aggregator<Writable>> aggregatorFactory = + aggregatorFactoriesMap.get(name); + synchronized (aggregatorFactory) { + return aggregatorFactory.create().createInitialValue(); } } @@ -211,29 +202,25 @@ public class AllAggregatorServerData { * @param workerIds All workers in the job apart from the current one * @param previousAggregatedValuesMap Map of values from previous * superstep to fill out - * @param currentAggregatorMap Map of aggregators for current superstep to - * fill out. All aggregators in this map will - * be set to initial value. + * @param currentAggregatorFactoryMap Map of aggregators factories for + * current superstep to fill out. */ public void fillNextSuperstepMapsWhenReady( Set<Integer> workerIds, Map<String, Writable> previousAggregatedValuesMap, - Map<String, Aggregator<Writable>> currentAggregatorMap) { + Map<String, Factory<Aggregator<Writable>>> currentAggregatorFactoryMap) { workersBarrier.waitForRequiredPermits(workerIds); if (LOG.isDebugEnabled()) { LOG.debug("fillNextSuperstepMapsWhenReady: Aggregators ready"); } previousAggregatedValuesMap.clear(); previousAggregatedValuesMap.putAll(aggregatedValuesMap); - for (Map.Entry<String, Class<Aggregator<Writable>>> entry : - aggregatorClassMap.entrySet()) { - Aggregator<Writable> aggregator = - currentAggregatorMap.get(entry.getKey()); - if (aggregator == null) { - currentAggregatorMap.put(entry.getKey(), - AggregatorUtils.newAggregatorInstance(entry.getValue(), conf)); - } else { - aggregator.reset(); + for (Map.Entry<String, WritableFactory<Aggregator<Writable>>> entry : + aggregatorFactoriesMap.entrySet()) { + Factory<Aggregator<Writable>> aggregatorFactory = + currentAggregatorFactoryMap.get(entry.getKey()); + if (aggregatorFactory == null) { + currentAggregatorFactoryMap.put(entry.getKey(), entry.getValue()); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java index eb25a2e..2f3d5e5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java @@ -18,9 +18,14 @@ package org.apache.giraph.comm.aggregators; +import java.util.AbstractMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + import org.apache.giraph.aggregators.Aggregator; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.utils.TaskIdsPermitsBarrier; +import org.apache.giraph.utils.WritableFactory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; @@ -29,11 +34,6 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import java.util.AbstractMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; - /** * Class for holding aggregators which current worker owns, * and aggregating partial aggregator values from workers. @@ -73,19 +73,14 @@ public class OwnerAggregatorServerData { private final TaskIdsPermitsBarrier workersBarrier; /** Progressable used to report progress */ private final Progressable progressable; - /** Configuration */ - private final ImmutableClassesGiraphConfiguration conf; /** * Constructor * * @param progressable Progressable used to report progress - * @param conf Configuration */ - public OwnerAggregatorServerData(Progressable progressable, - ImmutableClassesGiraphConfiguration conf) { + public OwnerAggregatorServerData(Progressable progressable) { this.progressable = progressable; - this.conf = conf; workersBarrier = new TaskIdsPermitsBarrier(progressable); } @@ -93,15 +88,14 @@ public class OwnerAggregatorServerData { * Register an aggregator which current worker owns. Thread-safe. * * @param name Name of aggregator - * @param aggregatorClass Aggregator class + * @param aggregatorFactory Aggregator factory */ public void registerAggregator(String name, - Class<Aggregator<Writable>> aggregatorClass) { + WritableFactory<Aggregator<Writable>> aggregatorFactory) { if (LOG.isDebugEnabled() && myAggregatorMap.isEmpty()) { LOG.debug("registerAggregator: The first registration after a reset()"); } - myAggregatorMap.putIfAbsent(name, - AggregatorUtils.newAggregatorInstance(aggregatorClass, conf)); + myAggregatorMap.putIfAbsent(name, aggregatorFactory.create()); progressable.progress(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java index adc2aa8..8f880b4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java @@ -18,15 +18,16 @@ package org.apache.giraph.comm.aggregators; +import java.io.IOException; +import java.util.Map; + import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.utils.WritableFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import com.google.common.collect.Maps; -import java.io.IOException; -import java.util.Map; - /** * Takes and serializes aggregators and keeps them grouped by owner * partition id, to be sent in bulk. @@ -41,20 +42,20 @@ public class SendAggregatorCache extends CountingCache { * * @param taskId Task id of worker which owns the aggregator * @param aggregatorName Name of the aggregator - * @param aggregatorClass Class of the aggregator + * @param aggregatorFactory Aggregator factory * @param aggregatedValue Value of the aggregator * @return Number of bytes in serialized data for this worker * @throws IOException */ public int addAggregator(Integer taskId, String aggregatorName, - Class<? extends Aggregator> aggregatorClass, + WritableFactory<? extends Aggregator> aggregatorFactory, Writable aggregatedValue) throws IOException { AggregatorOutputStream out = aggregatorMap.get(taskId); if (out == null) { out = new AggregatorOutputStream(); aggregatorMap.put(taskId, out); } - return out.addAggregator(aggregatorName, aggregatorClass, + return out.addAggregator(aggregatorName, aggregatorFactory, aggregatedValue); } @@ -86,6 +87,6 @@ public class SendAggregatorCache extends CountingCache { // current number of requests, plus one for the last flush long totalCount = getCount(taskId) + 1; addAggregator(taskId, AggregatorUtils.SPECIAL_COUNT_AGGREGATOR, - Aggregator.class, new LongWritable(totalCount)); + null, new LongWritable(totalCount)); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java index 1218d29..51277c9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java @@ -18,20 +18,21 @@ package org.apache.giraph.comm.netty; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import java.io.IOException; + +import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.bsp.CentralizedServiceMaster; import org.apache.giraph.comm.MasterClient; import org.apache.giraph.comm.aggregators.AggregatorUtils; import org.apache.giraph.comm.aggregators.SendAggregatorCache; import org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest; -import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.WritableFactory; import org.apache.giraph.worker.WorkerInfo; -import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.Progressable; -import java.io.IOException; - /** * Netty implementation of {@link MasterClient} */ @@ -39,7 +40,7 @@ public class NettyMasterClient implements MasterClient { /** Netty client that does the actual I/O */ private final NettyClient nettyClient; /** Worker information for current superstep */ - private CentralizedServiceMaster<?, ?, ?> service; + private final CentralizedServiceMaster<?, ?, ?> service; /** Cached map of partition ids to serialized aggregator data */ private final SendAggregatorCache sendAggregatorCache = new SendAggregatorCache(); @@ -78,12 +79,12 @@ public class NettyMasterClient implements MasterClient { @Override public void sendAggregator(String aggregatorName, - Class<? extends Aggregator> aggregatorClass, + WritableFactory<? extends Aggregator> aggregatorFactory, Writable aggregatedValue) throws IOException { WorkerInfo owner = AggregatorUtils.getOwner(aggregatorName, service.getWorkerInfoList()); int currentSize = sendAggregatorCache.addAggregator(owner.getTaskId(), - aggregatorName, aggregatorClass, aggregatedValue); + aggregatorName, aggregatorFactory, aggregatedValue); if (currentSize >= maxBytesPerAggregatorRequest) { flushAggregatorsToWorker(owner); } http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java index e2681ee..10d8d02 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java @@ -18,16 +18,18 @@ package org.apache.giraph.comm.requests; +import java.io.DataInput; +import java.io.IOException; + +import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.aggregators.AggregatorUtils; import org.apache.giraph.comm.aggregators.AllAggregatorServerData; -import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.utils.WritableFactory; +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; -import java.io.DataInput; -import java.io.IOException; - /** * Request to send final aggregatd values from master to worker which owns * the aggregators @@ -59,23 +61,22 @@ public class SendAggregatorsToOwnerRequest int numAggregators = input.readInt(); for (int i = 0; i < numAggregators; i++) { String aggregatorName = input.readUTF(); - String aggregatorClassName = input.readUTF(); + WritableFactory<Aggregator<Writable>> aggregatorFactory = + WritableUtils.readWritableObject(input, conf); if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { LongWritable count = new LongWritable(0); count.readFields(input); aggregatorData.receivedRequestCountFromMaster(count.get(), getSenderTaskId()); } else { - Class<Aggregator<Writable>> aggregatorClass = - AggregatorUtils.getAggregatorClass(aggregatorClassName); aggregatorData.registerAggregatorClass(aggregatorName, - aggregatorClass); + aggregatorFactory); Writable aggregatorValue = aggregatorData.createAggregatorInitialValue(aggregatorName); aggregatorValue.readFields(input); aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue); serverData.getOwnerAggregatorData().registerAggregator( - aggregatorName, aggregatorClass); + aggregatorName, aggregatorFactory); } } } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java index 52e4cba..d469e96 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java @@ -18,16 +18,18 @@ package org.apache.giraph.comm.requests; +import java.io.DataInput; +import java.io.IOException; + +import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.comm.ServerData; import org.apache.giraph.comm.aggregators.AggregatorUtils; import org.apache.giraph.comm.aggregators.AllAggregatorServerData; -import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.utils.WritableFactory; +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; -import java.io.DataInput; -import java.io.IOException; - /** * Request to send final aggregated values from worker which owns them to * other workers @@ -59,17 +61,16 @@ public class SendAggregatorsToWorkerRequest extends int numAggregators = input.readInt(); for (int i = 0; i < numAggregators; i++) { String aggregatorName = input.readUTF(); - String aggregatorClassName = input.readUTF(); + WritableFactory<Aggregator<Writable>> aggregatorFactory = + WritableUtils.readWritableObject(input, conf); if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { LongWritable count = new LongWritable(0); count.readFields(input); aggregatorData.receivedRequestCountFromWorker(count.get(), getSenderTaskId()); } else { - Class<Aggregator<Writable>> aggregatorClass = - AggregatorUtils.getAggregatorClass(aggregatorClassName); aggregatorData.registerAggregatorClass(aggregatorName, - aggregatorClass); + aggregatorFactory); Writable aggregatorValue = aggregatorData.createAggregatorInitialValue(aggregatorName); aggregatorValue.readFields(input); http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java index 14c8c0d..62ab7f1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/WritableRequest.java @@ -44,7 +44,7 @@ public abstract class WritableRequest<I extends WritableComparable, public static final int UNKNOWN_SIZE = -1; /** Configuration */ - private ImmutableClassesGiraphConfiguration<I, V, E> conf; + protected ImmutableClassesGiraphConfiguration<I, V, E> conf; /** Client id */ private int clientId = -1; /** Request id */ http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java index 325d91f..2bc08e9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java @@ -18,15 +18,24 @@ package org.apache.giraph.master; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.bsp.SuperstepState; -import org.apache.giraph.comm.MasterClient; -import org.apache.giraph.comm.aggregators.AggregatorUtils; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.AbstractMap; +import java.util.Map; + import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.aggregators.AggregatorWrapper; import org.apache.giraph.aggregators.AggregatorWriter; +import org.apache.giraph.aggregators.ClassAggregatorFactory; import org.apache.giraph.bsp.BspService; +import org.apache.giraph.bsp.SuperstepState; +import org.apache.giraph.comm.MasterClient; +import org.apache.giraph.comm.aggregators.AggregatorUtils; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.utils.MasterLoggingAggregator; +import org.apache.giraph.utils.WritableFactory; +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; @@ -35,12 +44,6 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.AbstractMap; -import java.util.Map; - /** Handler for aggregators on master */ public class MasterAggregatorHandler implements MasterAggregatorUsage, Writable { @@ -106,7 +109,17 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage, Class<? extends Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException { checkAggregatorName(name); - return registerAggregator(name, aggregatorClass, false) != null; + ClassAggregatorFactory<A> aggregatorFactory = + new ClassAggregatorFactory<A>(aggregatorClass, conf); + return registerAggregator(name, aggregatorFactory, false) != null; + } + + @Override + public <A extends Writable> boolean registerAggregator(String name, + WritableFactory<? extends Aggregator<A>> aggregator) throws + InstantiationException, IllegalAccessException { + checkAggregatorName(name); + return registerAggregator(name, aggregator, false) != null; } @Override @@ -114,7 +127,9 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage, Class<? extends Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException { checkAggregatorName(name); - return registerAggregator(name, aggregatorClass, true) != null; + ClassAggregatorFactory<A> aggregatorFactory = + new ClassAggregatorFactory<A>(aggregatorClass, conf); + return registerAggregator(name, aggregatorFactory, true) != null; } /** @@ -134,22 +149,22 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage, /** * Helper function for registering aggregators. * - * @param name Name of the aggregator - * @param aggregatorClass Class of the aggregator - * @param persistent Whether aggregator is persistent or not - * @param <A> Aggregated value type + * @param name Name of the aggregator + * @param aggregatorFactory Aggregator factory + * @param persistent Whether aggregator is persistent or not + * @param <A> Aggregated value type * @return Newly registered aggregator or aggregator which was previously * created with selected name, if any */ private <A extends Writable> AggregatorWrapper<A> registerAggregator - (String name, Class<? extends Aggregator<A>> aggregatorClass, + (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory, boolean persistent) throws InstantiationException, IllegalAccessException { AggregatorWrapper<A> aggregatorWrapper = (AggregatorWrapper<A>) aggregatorMap.get(name); if (aggregatorWrapper == null) { aggregatorWrapper = - new AggregatorWrapper<A>(aggregatorClass, persistent, conf); + new AggregatorWrapper<A>(aggregatorFactory, persistent, conf); aggregatorMap.put(name, (AggregatorWrapper<Writable>) aggregatorWrapper); } return aggregatorWrapper; @@ -207,7 +222,7 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage, for (Map.Entry<String, AggregatorWrapper<Writable>> entry : aggregatorMap.entrySet()) { masterClient.sendAggregator(entry.getKey(), - entry.getValue().getAggregatorClass(), + entry.getValue().getAggregatorFactory(), entry.getValue().getPreviousAggregatedValue()); progressable.progress(); } @@ -322,7 +337,7 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage, for (Map.Entry<String, AggregatorWrapper<Writable>> entry : aggregatorMap.entrySet()) { out.writeUTF(entry.getKey()); - out.writeUTF(entry.getValue().getAggregatorClass().getName()); + entry.getValue().getAggregatorFactory().write(out); out.writeBoolean(entry.getValue().isPersistent()); entry.getValue().getPreviousAggregatedValue().write(out); progressable.progress(); @@ -336,15 +351,16 @@ public class MasterAggregatorHandler implements MasterAggregatorUsage, try { for (int i = 0; i < numAggregators; i++) { String aggregatorName = in.readUTF(); - String aggregatorClassName = in.readUTF(); + WritableFactory<Aggregator<Writable>> aggregatorFactory = + WritableUtils.readWritableObject(in, conf); boolean isPersistent = in.readBoolean(); - AggregatorWrapper<Writable> aggregator = registerAggregator( + AggregatorWrapper<Writable> aggregatorWrapper = registerAggregator( aggregatorName, - AggregatorUtils.getAggregatorClass(aggregatorClassName), + aggregatorFactory, isPersistent); - Writable value = aggregator.createInitialValue(); + Writable value = aggregatorWrapper.createInitialValue(); value.readFields(in); - aggregator.setPreviousAggregatedValue(value); + aggregatorWrapper.setPreviousAggregatedValue(value); progressable.progress(); } } catch (InstantiationException e) { http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java index cadae67..91f5d24 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java @@ -20,6 +20,7 @@ package org.apache.giraph.master; import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.aggregators.AggregatorUsage; +import org.apache.giraph.utils.WritableFactory; import org.apache.hadoop.io.Writable; /** @@ -40,6 +41,21 @@ public interface MasterAggregatorUsage extends AggregatorUsage { InstantiationException, IllegalAccessException; /** + * Register an aggregator in preSuperstep() and/or preApplication(). This + * aggregator will have its value reset at the end of each super step. + * + * Aggregator should either implement Writable, or have no-arg constructor. + * + * @param name of aggregator + * @param aggregatorFactory aggregator factory + * @param <A> Aggregator type + * @return True iff aggregator wasn't already registered + */ + <A extends Writable> boolean registerAggregator(String name, + WritableFactory<? extends Aggregator<A>> aggregatorFactory) throws + InstantiationException, IllegalAccessException; + + /** * Register persistent aggregator in preSuperstep() and/or * preApplication(). This aggregator will not reset value at the end of * super step. http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java index d77a9b5..c2a1f9a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java @@ -23,6 +23,7 @@ import org.apache.giraph.combiner.MessageCombiner; import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.giraph.graph.Computation; import org.apache.giraph.graph.GraphState; +import org.apache.giraph.utils.WritableFactory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Mapper; @@ -195,6 +196,13 @@ public abstract class MasterCompute } @Override + public final <A extends Writable> boolean registerAggregator( + String name, WritableFactory<? extends Aggregator<A>> aggregator) + throws InstantiationException, IllegalAccessException { + return masterAggregatorUsage.registerAggregator(name, aggregator); + } + + @Override public final <A extends Writable> boolean registerPersistentAggregator( String name, Class<? extends Aggregator<A>> aggregatorClass) throws http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/utils/ArrayWritable.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ArrayWritable.java b/giraph-core/src/main/java/org/apache/giraph/utils/ArrayWritable.java new file mode 100644 index 0000000..9ea24c3 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ArrayWritable.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Array; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; + +import com.google.common.base.Preconditions; + +/** + * A Writable for arrays containing instances of a class. The elements of this + * writable must all be instances of the same class. + * + * @param <T> element type + */ +public class ArrayWritable<T extends Writable> implements Writable { + /** Element type class */ + private Class<T> valueClass; + /** Array */ + private T[] values; + + /** Constructor */ + public ArrayWritable() { + } + + /** + * Constructor + * @param valueClass Element type class + * @param values Array of elements + */ + public ArrayWritable(Class<T> valueClass, T[] values) { + Preconditions.checkNotNull(valueClass, + "valueClass cannot be null in ArrayWritable"); + this.valueClass = valueClass; + this.values = values; + } + + /** + * Get element type class + * @return element type class + */ + public Class<T> getValueClass() { + return valueClass; + } + + /** + * Set array + * @param values array + */ + public void set(T[] values) { this.values = values; } + + /** + * Ger array + * @return array + */ + public T[] get() { return values; } + + @Override + public void readFields(DataInput in) throws IOException { + valueClass = WritableUtils.readClass(in); + values = (T[]) Array.newInstance(valueClass, in.readInt()); + + for (int i = 0; i < values.length; i++) { + T value = (T) WritableFactories.newInstance(valueClass); + value.readFields(in); // read a value + values[i] = value; // store it in values + } + } + + @Override + public void write(DataOutput out) throws IOException { + Preconditions.checkNotNull(valueClass, + "valueClass cannot be null in ArrayWritable"); + WritableUtils.writeClass(valueClass, out); + out.writeInt(values.length); // write values + for (int i = 0; i < values.length; i++) { + values[i].write(out); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java new file mode 100644 index 0000000..43bed7e --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +import org.apache.hadoop.io.Writable; + +/** + * Factory that can be serialized. + * @param <T> Type of object factory creates + */ +public interface WritableFactory<T> extends Writable, Factory<T> { + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java index 3f8382e..763f59d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java @@ -18,6 +18,18 @@ package org.apache.giraph.utils; +import static org.apache.hadoop.util.ReflectionUtils.newInstance; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.OutEdges; @@ -33,18 +45,6 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.hadoop.util.ReflectionUtils.newInstance; - /** * Helper static methods for working with Writable objects. */ @@ -70,6 +70,23 @@ public class WritableUtils { } /** + * Instantiate a new Writable, checking for NullWritable along the way. + * + * @param klass Class + * @param configuration Configuration + * @param <W> type + * @return new instance of class + */ + public static <W extends Writable> W createWritable( + Class<W> klass, + ImmutableClassesGiraphConfiguration configuration) { + W result = createWritable(klass); + ConfigurationUtils.configureIfPossible(result, configuration); + return result; + } + + + /** * Read fields from byteArray to a Writeable object. * * @param byteArray Byte array to find the fields in. @@ -616,4 +633,47 @@ public class WritableUtils { return null; } } + + /** + * Write object to output stream + * @param object Object + * @param output Output stream + * @throws IOException + */ + public static void writeWritableObject( + Writable object, DataOutput output) + throws IOException { + output.writeBoolean(object != null); + if (object != null) { + output.writeUTF(object.getClass().getName()); + object.write(output); + } + } + + /** + * Reads object from input stream + * @param input Input stream + * @param conf Configuration + * @param <T> Object type + * @return Object + * @throws IOException + */ + public static <T extends Writable> + T readWritableObject(DataInput input, + ImmutableClassesGiraphConfiguration conf) throws IOException { + if (input.readBoolean()) { + String className = input.readUTF(); + try { + T object = + (T) ReflectionUtils.newInstance(Class.forName(className), conf); + object.readFields(input); + return object; + } catch (ClassNotFoundException e) { + throw new IllegalStateException("readWritableObject: No class found " + + className); + } + } else { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java index 9bfd7b5..45ca665 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java @@ -18,14 +18,19 @@ package org.apache.giraph.worker; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.bsp.CentralizedServiceWorker; -import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor; import org.apache.giraph.comm.aggregators.AggregatedValueOutputStream; import org.apache.giraph.comm.aggregators.AggregatorUtils; import org.apache.giraph.comm.aggregators.AllAggregatorServerData; import org.apache.giraph.comm.aggregators.OwnerAggregatorServerData; -import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.Factory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.Progressable; import org.apache.log4j.Logger; @@ -33,10 +38,6 @@ import org.apache.log4j.Logger; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import java.io.IOException; -import java.util.Map; -import java.util.Set; - /** * Handler for aggregators on worker. Provides the aggregated values and * performs aggregations from user vertex code (thread-safe). Also has @@ -58,10 +59,13 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { private static final Logger LOG = Logger.getLogger(WorkerAggregatorHandler.class); /** Map of values from previous superstep */ - private Map<String, Writable> previousAggregatedValueMap = + private final Map<String, Writable> previousAggregatedValueMap = Maps.newHashMap(); + /** Map of aggregator factories for current superstep */ + private final Map<String, Factory<Aggregator<Writable>>> + currentAggregatorFactoryMap = Maps.newHashMap(); /** Map of aggregators for current superstep */ - private Map<String, Aggregator<Writable>> currentAggregatorMap = + private final Map<String, Aggregator<Writable>> currentAggregatorMap = Maps.newHashMap(); /** Service worker */ private final CentralizedServiceWorker<?, ?, ?> serviceWorker; @@ -143,7 +147,8 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { // Wait for all other aggregators and store them allAggregatorData.fillNextSuperstepMapsWhenReady( getOtherWorkerIdsSet(), previousAggregatedValueMap, - currentAggregatorMap); + currentAggregatorFactoryMap); + fillAndInitAggregatorsMap(currentAggregatorMap); allAggregatorData.reset(); if (LOG.isDebugEnabled()) { LOG.debug("prepareSuperstep: Aggregators prepared"); @@ -151,6 +156,25 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { } /** + * Fills aggregators map from currentAggregatorFactoryMap. + * All aggregators in this map will be set to initial value. + * @param aggregatorMap Map to fill. + */ + private void fillAndInitAggregatorsMap( + Map<String, Aggregator<Writable>> aggregatorMap) { + for (Map.Entry<String, Factory<Aggregator<Writable>>> entry : + currentAggregatorFactoryMap.entrySet()) { + Aggregator<Writable> aggregator = + aggregatorMap.get(entry.getKey()); + if (aggregator == null) { + aggregatorMap.put(entry.getKey(), entry.getValue().create()); + } else { + aggregator.reset(); + } + } + } + + /** * Send aggregators to their owners and in the end to the master * * @param requestProcessor Request processor for aggregators @@ -286,13 +310,7 @@ public class WorkerAggregatorHandler implements WorkerThreadAggregatorUsage { public ThreadLocalWorkerAggregatorUsage() { threadAggregatorMap = Maps.newHashMapWithExpectedSize( WorkerAggregatorHandler.this.currentAggregatorMap.size()); - for (Map.Entry<String, Aggregator<Writable>> entry : - WorkerAggregatorHandler.this.currentAggregatorMap.entrySet()) { - threadAggregatorMap.put(entry.getKey(), - AggregatorUtils.newAggregatorInstance( - (Class<Aggregator<Writable>>) entry.getValue().getClass(), - conf)); - } + fillAndInitAggregatorsMap(threadAggregatorMap); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/ce97134d/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java new file mode 100644 index 0000000..2898647 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.aggregators; + +import static org.junit.Assert.assertEquals; + +import org.apache.giraph.utils.ArrayWritable; +import org.apache.hadoop.io.LongWritable; +import org.junit.Test; + +public class TestArrayAggregator { + @Test + public void testMaxAggregator() { + Aggregator<ArrayWritable<LongWritable>> max = new ArrayAggregatorFactory<>(2, LongMaxAggregator.class).create(); + + ArrayWritable<LongWritable> tmp = max.createInitialValue(); + + tmp.get()[0].set(2); + max.aggregate(tmp); + + tmp.get()[0].set(3); + tmp.get()[1].set(1); + max.aggregate(tmp); + + assertEquals(3L, max.getAggregatedValue().get()[0].get()); + assertEquals(1L, max.getAggregatedValue().get()[1].get()); + + tmp.get()[0].set(-1); + tmp.get()[1].set(-1); + max.setAggregatedValue(tmp); + + assertEquals(-1L, max.getAggregatedValue().get()[0].get()); + assertEquals(-1L, max.getAggregatedValue().get()[1].get()); + } +}
