Repository: giraph Updated Branches: refs/heads/trunk 7c61dcf4a -> d32c429a1
Fix using aggregators before aggregation Summary: If we register aggregator and immediatelly ask for aggregated value, previous code was returning initial value, so we have to do the same. Additionally - cleaning up errors/exceptions to be more understandable (vs NullPointerEx for example) Test Plan: mvn install AggregatorsBenchmark and ReducersBenchmark Reviewers: majakabiljo, pavanka, sergey.edunov, maja.kabiljo Differential Revision: https://reviews.facebook.net/D24951 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d32c429a Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d32c429a Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d32c429a Branch: refs/heads/trunk Commit: d32c429a1d475b322b3fe44738f0cc8f30a97b48 Parents: 7c61dcf Author: Igor Kabiljo <[email protected]> Authored: Mon Oct 20 09:50:55 2014 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Mon Oct 20 09:54:37 2014 -0700 ---------------------------------------------------------------------- .../aggregators/ArrayAggregatorFactory.java | 128 ------------------- .../aggregators/ClassAggregatorFactory.java | 72 ----------- .../giraph/benchmark/ReducersBenchmark.java | 3 +- .../comm/aggregators/AggregatorUtils.java | 51 ++++++++ .../requests/SendAggregatorsToOwnerRequest.java | 2 +- .../giraph/master/AggregatorBroadcast.java | 75 +++++++++++ .../master/AggregatorReduceOperation.java | 66 +++++++--- .../AggregatorToGlobalCommTranslation.java | 113 +++++++++++----- .../apache/giraph/master/BspServiceMaster.java | 2 +- .../giraph/master/MasterAggregatorHandler.java | 29 ++++- .../giraph/master/MasterAggregatorUsage.java | 16 --- .../org/apache/giraph/master/MasterCompute.java | 9 -- .../giraph/reducers/OnSameReduceOperation.java | 4 +- .../apache/giraph/reducers/ReduceOperation.java | 10 +- .../org/apache/giraph/reducers/Reducer.java | 41 ++++-- .../apache/giraph/utils/WritableFactory.java | 28 ---- .../org/apache/giraph/utils/WritableUtils.java | 6 +- .../worker/WorkerAggregatorDelegator.java | 4 +- .../giraph/worker/WorkerAggregatorHandler.java | 8 +- .../giraph/worker/WorkerBroadcastUsage.java | 33 +++++ .../giraph/worker/WorkerGlobalCommUsage.java | 17 +-- .../apache/giraph/worker/WorkerReduceUsage.java | 30 +++++ .../giraph/aggregators/TestArrayAggregator.java | 50 -------- 23 files changed, 396 insertions(+), 401 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java deleted file mode 100644 index c977c57..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/aggregators/ArrayAggregatorFactory.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.aggregators; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.lang.reflect.Array; - -import org.apache.giraph.utils.ArrayWritable; -import org.apache.giraph.utils.WritableFactory; -import org.apache.giraph.utils.WritableUtils; -import org.apache.hadoop.io.Writable; - -/** - * Generic array aggregator factory, used to aggregate elements - * of ArrayWritable via passed element aggregator. - * - * @param <A> Type of individual element - */ -public class ArrayAggregatorFactory<A extends Writable> - implements WritableFactory<Aggregator<ArrayWritable<A>>> { - /** number of elements in array */ - private int n; - /** element aggregator class */ - private WritableFactory<? extends Aggregator<A>> elementAggregatorFactory; - - /** - * Constructor - * @param n Number of elements in array - * @param elementAggregatorClass Type of element aggregator - */ - public ArrayAggregatorFactory( - int n, Class<? extends Aggregator<A>> elementAggregatorClass) { - this(n, new ClassAggregatorFactory<>(elementAggregatorClass)); - } - - /** - * Constructor - * @param n Number of elements in array - * @param elementAggregatorFactory Element aggregator factory - */ - public ArrayAggregatorFactory(int n, - WritableFactory<? extends Aggregator<A>> elementAggregatorFactory) { - this.n = n; - this.elementAggregatorFactory = elementAggregatorFactory; - } - - @Override - public void readFields(DataInput in) throws IOException { - n = in.readInt(); - elementAggregatorFactory = WritableUtils.readWritableObject(in, null); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(n); - WritableUtils.writeWritableObject(elementAggregatorFactory, out); - } - - @Override - public Aggregator<ArrayWritable<A>> create() { - return new ArrayAggregator<>( - n, elementAggregatorFactory.create()); - } - - /** - * Stateful aggregator that aggregates ArrayWritable by - * aggregating individual elements - * - * @param <A> Type of individual element - */ - public static class ArrayAggregator<A extends Writable> - extends BasicAggregator<ArrayWritable<A>> { - /** number of elements in array */ - private final int n; - /** element aggregator */ - private final Aggregator<A> elementAggregator; - - /** - * Constructor - * @param n Number of elements in array - * @param elementAggregator Element aggregator - */ - public ArrayAggregator(int n, Aggregator<A> elementAggregator) { - super(null); - this.n = n; - this.elementAggregator = elementAggregator; - reset(); - } - - @Override - public void aggregate(ArrayWritable<A> other) { - A[] array = getAggregatedValue().get(); - for (int i = 0; i < n; i++) { - elementAggregator.setAggregatedValue(array[i]); - elementAggregator.aggregate(other.get()[i]); - array[i] = elementAggregator.getAggregatedValue(); - } - } - - @Override - public ArrayWritable<A> createInitialValue() { - Class<A> elementClass = - (Class) elementAggregator.createInitialValue().getClass(); - A[] array = (A[]) Array.newInstance(elementClass, n); - for (int i = 0; i < n; i++) { - array[i] = elementAggregator.createInitialValue(); - } - return new ArrayWritable<>(elementClass, array); - } - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java deleted file mode 100644 index a022480..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/aggregators/ClassAggregatorFactory.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.aggregators; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import org.apache.giraph.utils.ReflectionUtils; -import org.apache.giraph.utils.WritableFactory; -import org.apache.giraph.utils.WritableUtils; -import org.apache.hadoop.io.Writable; - -import com.google.common.base.Preconditions; - -/** - * Aggregator factory based on aggregatorClass. - * - * @param <T> Aggregated value type - */ -public class ClassAggregatorFactory<T extends Writable> - implements WritableFactory<Aggregator<T>> { - /** Aggregator class */ - private Class<? extends Aggregator<T>> aggregatorClass; - - /** Constructor */ - public ClassAggregatorFactory() { - } - - /** - * Constructor - * @param aggregatorClass Aggregator class - */ - public ClassAggregatorFactory( - Class<? extends Aggregator<T>> aggregatorClass) { - Preconditions.checkNotNull(aggregatorClass, - "aggregatorClass cannot be null in ClassAggregatorFactory"); - this.aggregatorClass = aggregatorClass; - } - - @Override - public Aggregator<T> create() { - return ReflectionUtils.newInstance(aggregatorClass, null); - } - - @Override - public void readFields(DataInput in) throws IOException { - aggregatorClass = WritableUtils.readClass(in); - } - - @Override - public void write(DataOutput out) throws IOException { - Preconditions.checkNotNull(aggregatorClass, - "aggregatorClass cannot be null in ClassAggregatorFactory"); - WritableUtils.writeClass(aggregatorClass, out); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java index ce5c96e..263274d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java @@ -64,9 +64,10 @@ public class ReducersBenchmark extends GiraphBenchmark { } @Override - public void reduceSingle( + public LongWritable reduceSingle( LongWritable curValue, LongWritable valueToReduce) { curValue.set(curValue.get() + valueToReduce.get()); + return curValue; } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java index ecb3a6b..dc0ceed 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java @@ -99,4 +99,55 @@ public class AggregatorUtils { } return message; } + + /** + * Get the warning message about usage of unregistered reducer to be + * printed to user. If user didn't register any reducers also provide + * the explanation on how to do so. + * + * @param reducerName The name of the aggregator which user tried to + * access + * @param hasRegisteredReducers True iff user registered some aggregators + * @param conf Giraph configuration + * @return Warning message + */ + public static String getUnregisteredReducerMessage( + String reducerName, boolean hasRegisteredReducers, + ImmutableClassesGiraphConfiguration conf) { + String message = "Tried to access reducer which wasn't registered " + + reducerName; + if (!hasRegisteredReducers) { + message = message + "; Aggregators can be registered from " + + "MasterCompute by calling registerReducer function. " + + "Also be sure that you are correctly setting MasterCompute class, " + + "currently using " + conf.getMasterComputeClass().getName(); + } + return message; + } + + /** + * Get the warning message when user tries to access broadcast, without + * previously setting it, to be printed to user. + * If user didn't broadcast any value also provide + * the explanation on how to do so. + * + * @param broadcastName The name of the broadcast which user tried to + * access + * @param hasBroadcasted True iff user has broadcasted value before + * @param conf Giraph configuration + * @return Warning message + */ + public static String getUnregisteredBroadcastMessage( + String broadcastName, boolean hasBroadcasted, + ImmutableClassesGiraphConfiguration conf) { + String message = "Tried to access broadcast which wasn't set before " + + broadcastName; + if (!hasBroadcasted) { + message = message + "; Values can be broadcasted from " + + "MasterCompute by calling broadcast function. " + + "Also be sure that you are correctly setting MasterCompute class, " + + "currently using " + conf.getMasterComputeClass().getName(); + } + return message; + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java index 2d5cc51..8f168a2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java @@ -77,7 +77,7 @@ public class SendAggregatorsToOwnerRequest if (type == GlobalCommType.REDUCE_OPERATIONS) { ReduceOperation<Object, Writable> reduceOpCopy = (ReduceOperation<Object, Writable>) - WritableUtils.createCopy(reusedOut, reusedIn, value); + WritableUtils.createCopy(reusedOut, reusedIn, value, conf); serverData.getOwnerAggregatorData().registerReducer( name, reduceOpCopy); http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorBroadcast.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorBroadcast.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorBroadcast.java new file mode 100644 index 0000000..81ea654 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorBroadcast.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.master; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.utils.ReflectionUtils; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.Writable; + +/** + * Writable representation of aggregated value + * + * @param <A> Aggregation object type + */ +public class AggregatorBroadcast<A extends Writable> + extends DefaultImmutableClassesGiraphConfigurable + implements Writable { + /** Aggregator class */ + private Class<? extends Aggregator<A>> aggregatorClass; + /** Aggregated value */ + private A value; + + /** Constructor */ + public AggregatorBroadcast() { + } + + /** + * Constructor + * @param aggregatorClass Aggregator class + * @param value Aggregated value + */ + public AggregatorBroadcast( + Class<? extends Aggregator<A>> aggregatorClass, A value) { + this.aggregatorClass = aggregatorClass; + this.value = value; + } + + public A getValue() { + return value; + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeClass(aggregatorClass, out); + value.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + aggregatorClass = WritableUtils.readClass(in); + value = ReflectionUtils.newInstance(aggregatorClass, getConf()) + .createInitialValue(); + value.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java index 1673f6d..54d421b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java @@ -22,8 +22,10 @@ import java.io.DataOutput; import java.io.IOException; import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.conf.GiraphConfigurationSettable; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.reducers.OnSameReduceOperation; -import org.apache.giraph.utils.WritableFactory; +import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; @@ -33,11 +35,13 @@ import org.apache.hadoop.io.Writable; * @param <A> Aggregation object type */ public class AggregatorReduceOperation<A extends Writable> - extends OnSameReduceOperation<A> { - /** Aggregator factory */ - private WritableFactory<? extends Aggregator<A>> aggregatorFactory; + extends OnSameReduceOperation<A> implements GiraphConfigurationSettable { + /** Aggregator class */ + private Class<? extends Aggregator<A>> aggregatorClass; /** Aggregator */ private Aggregator<A> aggregator; + /** Configuration */ + private ImmutableClassesGiraphConfiguration<?, ?, ?> conf; /** Constructor */ public AggregatorReduceOperation() { @@ -45,18 +49,32 @@ public class AggregatorReduceOperation<A extends Writable> /** * Constructor - * @param aggregatorFactory Aggregator factory + * @param aggregatorClass Aggregator class + * @param conf Configuration */ public AggregatorReduceOperation( - WritableFactory<? extends Aggregator<A>> aggregatorFactory) { - this.aggregatorFactory = aggregatorFactory; - this.aggregator = aggregatorFactory.create(); - this.aggregator.setAggregatedValue(null); + Class<? extends Aggregator<A>> aggregatorClass, + ImmutableClassesGiraphConfiguration<?, ?, ?> conf) { + this.aggregatorClass = aggregatorClass; + this.conf = conf; + initAggregator(); + } + + /** Initialize aggregator */ + private void initAggregator() { + aggregator = ReflectionUtils.newInstance(aggregatorClass, conf); + aggregator.setAggregatedValue(null); } @Override public A createInitialValue() { - return aggregator.createInitialValue(); + A agg = aggregator.createInitialValue(); + if (agg == null) { + throw new IllegalStateException( + "Aggregators initial value must not be null, but is for " + + aggregator); + } + return agg; } /** @@ -64,29 +82,37 @@ public class AggregatorReduceOperation<A extends Writable> * @return copy */ public AggregatorReduceOperation<A> createCopy() { - return new AggregatorReduceOperation<>(aggregatorFactory); + return new AggregatorReduceOperation<>(aggregatorClass, conf); + } + + public Class<? extends Aggregator<A>> getAggregatorClass() { + return aggregatorClass; } @Override - public synchronized void reduceSingle(A curValue, A valueToReduce) { + public synchronized A reduceSingle(A curValue, A valueToReduce) { aggregator.setAggregatedValue(curValue); aggregator.aggregate(valueToReduce); - if (curValue != aggregator.getAggregatedValue()) { - throw new IllegalStateException( - "Aggregator " + aggregator + " aggregates by creating new value"); - } + A aggregated = aggregator.getAggregatedValue(); aggregator.setAggregatedValue(null); + return aggregated; + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration conf) { + this.conf = conf; } @Override public void write(DataOutput out) throws IOException { - WritableUtils.writeWritableObject(aggregatorFactory, out); + WritableUtils.writeClass(aggregatorClass, out); } @Override public void readFields(DataInput in) throws IOException { - aggregatorFactory = WritableUtils.readWritableObject(in, null); - aggregator = aggregatorFactory.create(); - this.aggregator.setAggregatedValue(null); + aggregatorClass = WritableUtils.readClass(in); + initAggregator(); } + + } http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java index 7492fc7..36a4553 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java @@ -24,10 +24,10 @@ import java.util.HashMap; import java.util.Map.Entry; import org.apache.giraph.aggregators.Aggregator; -import org.apache.giraph.aggregators.ClassAggregatorFactory; -import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; -import org.apache.giraph.utils.WritableFactory; +import org.apache.giraph.comm.aggregators.AggregatorUtils; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.hadoop.io.Writable; +import org.apache.log4j.Logger; import com.google.common.base.Preconditions; @@ -36,8 +36,11 @@ import com.google.common.base.Preconditions; * reduce and broadcast operations supported by the MasterAggregatorHandler. */ public class AggregatorToGlobalCommTranslation - extends DefaultImmutableClassesGiraphConfigurable implements MasterAggregatorUsage, Writable { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(AggregatorToGlobalCommTranslation.class); + /** Class providing reduce and broadcast interface to use */ private final MasterGlobalCommUsage globalComm; /** List of registered aggregators */ @@ -45,21 +48,64 @@ public class AggregatorToGlobalCommTranslation registeredAggregators = new HashMap<>(); /** + * List of init aggregator values, in case someone tries to + * access aggregator immediatelly after registering it. + * + * Instead of simply returning value, we need to store it during + * that superstep, so consecutive calls will return identical object, + * which they can modify. + */ + private final HashMap<String, Writable> + initAggregatorValues = new HashMap<>(); + + /** Conf */ + private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf; + + /** * Constructor + * @param conf Configuration * @param globalComm Global communication interface */ - public AggregatorToGlobalCommTranslation(MasterGlobalCommUsage globalComm) { + public AggregatorToGlobalCommTranslation( + ImmutableClassesGiraphConfiguration<?, ?, ?> conf, + MasterGlobalCommUsage globalComm) { + this.conf = conf; this.globalComm = globalComm; } @Override public <A extends Writable> A getAggregatedValue(String name) { - return globalComm.getReduced(name); + AggregatorWrapper<Writable> agg = registeredAggregators.get(name); + if (agg == null) { + LOG.warn("getAggregatedValue: " + + AggregatorUtils.getUnregisteredAggregatorMessage(name, + registeredAggregators.size() != 0, conf)); + // to make sure we are not accessing reducer of the same name. + return null; + } + + A value = globalComm.getReduced(name); + if (value == null) { + value = (A) initAggregatorValues.get(name); + } + + if (value == null) { + value = (A) agg.getReduceOp().createInitialValue(); + initAggregatorValues.put(name, value); + } + + Preconditions.checkState(value != null); + return value; } @Override public <A extends Writable> void setAggregatedValue(String name, A value) { AggregatorWrapper<Writable> aggregator = registeredAggregators.get(name); + if (aggregator == null) { + throw new IllegalArgumentException("setAggregatedValue: " + + AggregatorUtils.getUnregisteredAggregatorMessage(name, + registeredAggregators.size() != 0, conf)); + } aggregator.setCurrentValue(value); } @@ -72,14 +118,15 @@ public class AggregatorToGlobalCommTranslation // register reduce with the same value for (Entry<String, AggregatorWrapper<Writable>> entry : registeredAggregators.entrySet()) { - Writable value = entry.getValue().currentValue != null ? - entry.getValue().getCurrentValue() : - globalComm.getReduced(entry.getKey()); + Writable value = entry.getValue().getCurrentValue(); if (value == null) { - value = entry.getValue().getReduceOp().createInitialValue(); + value = globalComm.getReduced(entry.getKey()); } + Preconditions.checkState(value != null); + + globalComm.broadcast(entry.getKey(), new AggregatorBroadcast<>( + entry.getValue().getReduceOp().getAggregatorClass(), value)); - globalComm.broadcast(entry.getKey(), value); // Always register clean instance of reduceOp, not to conflict with // reduceOp from previous superstep. AggregatorReduceOperation<Writable> cleanReduceOp = @@ -93,31 +140,21 @@ public class AggregatorToGlobalCommTranslation } entry.getValue().setCurrentValue(null); } + initAggregatorValues.clear(); } @Override public <A extends Writable> boolean registerAggregator(String name, Class<? extends Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException { - ClassAggregatorFactory<A> aggregatorFactory = - new ClassAggregatorFactory<A>(aggregatorClass); - return registerAggregator(name, aggregatorFactory, false) != null; - } - - @Override - public <A extends Writable> boolean registerAggregator(String name, - WritableFactory<? extends Aggregator<A>> aggregator) throws - InstantiationException, IllegalAccessException { - return registerAggregator(name, aggregator, false) != null; + return registerAggregator(name, aggregatorClass, false) != null; } @Override public <A extends Writable> boolean registerPersistentAggregator(String name, Class<? extends Aggregator<A>> aggregatorClass) throws InstantiationException, IllegalAccessException { - ClassAggregatorFactory<A> aggregatorFactory = - new ClassAggregatorFactory<A>(aggregatorClass); - return registerAggregator(name, aggregatorFactory, true) != null; + return registerAggregator(name, aggregatorClass, true) != null; } @Override @@ -140,27 +177,35 @@ public class AggregatorToGlobalCommTranslation agg.readFields(in); registeredAggregators.put(name, agg); } + initAggregatorValues.clear(); } /** * Helper function for registering aggregators. * - * @param name Name of the aggregator - * @param aggregatorFactory Aggregator factory - * @param persistent Whether aggregator is persistent or not - * @param <A> Aggregated value type + * @param name Name of the aggregator + * @param aggregatorClass Aggregator class + * @param persistent Whether aggregator is persistent or not + * @param <A> Aggregated value type * @return Newly registered aggregator or aggregator which was previously * created with selected name, if any */ private <A extends Writable> AggregatorWrapper<A> registerAggregator - (String name, WritableFactory<? extends Aggregator<A>> aggregatorFactory, + (String name, Class<? extends Aggregator<A>> aggregatorClass, boolean persistent) throws InstantiationException, IllegalAccessException { AggregatorWrapper<A> aggregatorWrapper = (AggregatorWrapper<A>) registeredAggregators.get(name); if (aggregatorWrapper == null) { aggregatorWrapper = - new AggregatorWrapper<A>(aggregatorFactory, persistent); + new AggregatorWrapper<A>(aggregatorClass, persistent); + // postMasterCompute uses previously reduced value to broadcast, + // unless current value is set. After aggregator is registered, + // there was no previously reduced value, so set current value + // to default to avoid calling getReduced() on unregistered reducer. + // (which logs unnecessary warnings) + aggregatorWrapper.setCurrentValue( + aggregatorWrapper.getReduceOp().createInitialValue()); registeredAggregators.put( name, (AggregatorWrapper<Writable>) aggregatorWrapper); } @@ -171,7 +216,7 @@ public class AggregatorToGlobalCommTranslation * Object holding all needed data related to single Aggregator * @param <A> Aggregated value type */ - private static class AggregatorWrapper<A extends Writable> + private class AggregatorWrapper<A extends Writable> implements Writable { /** False iff aggregator should be reset at the end of each super step */ private boolean persistent; @@ -186,14 +231,14 @@ public class AggregatorToGlobalCommTranslation /** * Constructor - * @param aggregatorFactory Aggregator factory + * @param aggregatorClass Aggregator class * @param persistent Is persistent */ public AggregatorWrapper( - WritableFactory<? extends Aggregator<A>> aggregatorFactory, + Class<? extends Aggregator<A>> aggregatorClass, boolean persistent) { this.persistent = persistent; - this.reduceOp = new AggregatorReduceOperation<>(aggregatorFactory); + this.reduceOp = new AggregatorReduceOperation<>(aggregatorClass, conf); } public AggregatorReduceOperation<A> getReduceOp() { http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index ab1289d..af7e5fd 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -895,7 +895,7 @@ public class BspServiceMaster<I extends WritableComparable, globalCommHandler = new MasterAggregatorHandler( getConfiguration(), getContext()); aggregatorTranslation = new AggregatorToGlobalCommTranslation( - globalCommHandler); + getConfiguration(), globalCommHandler); globalCommHandler.initialize(this); masterCompute = getConfiguration().createMasterCompute(); http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java index 5f7bd73..ccee656 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java @@ -28,6 +28,7 @@ import org.apache.giraph.bsp.BspService; import org.apache.giraph.bsp.SuperstepState; import org.apache.giraph.comm.GlobalCommType; import org.apache.giraph.comm.MasterClient; +import org.apache.giraph.comm.aggregators.AggregatorUtils; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.reducers.ReduceOperation; import org.apache.giraph.reducers.Reducer; @@ -61,6 +62,9 @@ public class MasterAggregatorHandler /** Progressable used to report progress */ private final Progressable progressable; + /** Conf */ + private final ImmutableClassesGiraphConfiguration<?, ?, ?> conf; + /** * Constructor * @@ -71,6 +75,7 @@ public class MasterAggregatorHandler ImmutableClassesGiraphConfiguration<?, ?, ?> conf, Progressable progressable) { this.progressable = progressable; + this.conf = conf; aggregatorWriter = conf.createAggregatorWriter(); } @@ -86,10 +91,18 @@ public class MasterAggregatorHandler R globalInitialValue) { if (reducerMap.containsKey(name)) { throw new IllegalArgumentException( - "Reducer with name " + name + " was already registered"); + "Reducer with name " + name + " was already registered, " + + " and is " + reducerMap.get(name) + ", and we are trying to " + + " register " + reduceOp); } if (reduceOp == null) { - throw new IllegalArgumentException("null reduce cannot be registered"); + throw new IllegalArgumentException( + "null reducer cannot be registered, with name " + name); + } + if (globalInitialValue == null) { + throw new IllegalArgumentException( + "global initial value for reducer cannot be null, but is for " + + reduceOp + " with naem" + name); } Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue); @@ -98,7 +111,13 @@ public class MasterAggregatorHandler @Override public <T extends Writable> T getReduced(String name) { - return (T) reducedMap.get(name); + T value = (T) reducedMap.get(name); + if (value == null) { + LOG.warn("getReduced: " + + AggregatorUtils.getUnregisteredReducerMessage(name, + reducedMap.size() != 0, conf)); + } + return value; } @Override @@ -310,14 +329,14 @@ public class MasterAggregatorHandler for (int i = 0; i < numReducers; i++) { String name = in.readUTF(); Reducer<Object, Writable> reducer = new Reducer<>(); - reducer.readFields(in); + reducer.readFields(in, conf); reducerMap.put(name, reducer); } int numBroadcast = in.readInt(); for (int i = 0; i < numBroadcast; i++) { String name = in.readUTF(); - Writable value = WritableUtils.readWritableObject(in, null); + Writable value = WritableUtils.readWritableObject(in, conf); broadcastMap.put(name, value); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java index 91f5d24..cadae67 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorUsage.java @@ -20,7 +20,6 @@ package org.apache.giraph.master; import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.aggregators.AggregatorUsage; -import org.apache.giraph.utils.WritableFactory; import org.apache.hadoop.io.Writable; /** @@ -41,21 +40,6 @@ public interface MasterAggregatorUsage extends AggregatorUsage { InstantiationException, IllegalAccessException; /** - * Register an aggregator in preSuperstep() and/or preApplication(). This - * aggregator will have its value reset at the end of each super step. - * - * Aggregator should either implement Writable, or have no-arg constructor. - * - * @param name of aggregator - * @param aggregatorFactory aggregator factory - * @param <A> Aggregator type - * @return True iff aggregator wasn't already registered - */ - <A extends Writable> boolean registerAggregator(String name, - WritableFactory<? extends Aggregator<A>> aggregatorFactory) throws - InstantiationException, IllegalAccessException; - - /** * Register persistent aggregator in preSuperstep() and/or * preApplication(). This aggregator will not reset value at the end of * super step. http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java index 72e4d0a..68eb416 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java @@ -25,7 +25,6 @@ import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; import org.apache.giraph.graph.Computation; import org.apache.giraph.graph.GraphState; import org.apache.giraph.reducers.ReduceOperation; -import org.apache.giraph.utils.WritableFactory; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Mapper; @@ -222,14 +221,6 @@ public abstract class MasterCompute } @Override - public final <A extends Writable> boolean registerAggregator( - String name, WritableFactory<? extends Aggregator<A>> aggregator) - throws InstantiationException, IllegalAccessException { - return serviceMaster.getAggregatorTranslationHandler().registerAggregator( - name, aggregator); - } - - @Override public final <A extends Writable> boolean registerPersistentAggregator( String name, Class<? extends Aggregator<A>> aggregatorClass) throws http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java index a675f4d..cb9f6e0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java @@ -28,7 +28,7 @@ import org.apache.hadoop.io.Writable; public abstract class OnSameReduceOperation<R extends Writable> implements ReduceOperation<R, R> { @Override - public final void reducePartial(R curValue, R valueToReduce) { - reduceSingle(curValue, valueToReduce); + public final R reducePartial(R curValue, R valueToReduce) { + return reduceSingle(curValue, valueToReduce); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java index 434e21a..adbc4d8 100644 --- a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java @@ -43,15 +43,21 @@ public interface ReduceOperation<S, R extends Writable> extends Writable { * Add a new value. * Needs to be commutative and associative * + * Commonly, returned value should be same as curValue argument. + * * @param curValue Partial value into which to reduce and store the result * @param valueToReduce Single value to be reduced + * @return reduced value */ - void reduceSingle(R curValue, S valueToReduce); + R reduceSingle(R curValue, S valueToReduce); /** * Add partially reduced value to current partially reduced value. * + * Commonly, returned value should be same as curValue argument. + * * @param curValue Partial value into which to reduce and store the result * @param valueToReduce Partial value to be reduced + * @return reduced value */ - void reducePartial(R curValue, R valueToReduce); + R reducePartial(R curValue, R valueToReduce); } http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java index 9f821b4..6759276 100644 --- a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; @@ -32,7 +33,7 @@ import org.apache.hadoop.io.Writable; * @param <S> Single value type, objects passed on workers * @param <R> Reduced value type */ -public class Reducer<S, R extends Writable> implements Writable { +public class Reducer<S, R extends Writable> { /** Reduce operations */ private ReduceOperation<S, R> reduceOp; /** Current (partially) reduced value*/ @@ -49,7 +50,7 @@ public class Reducer<S, R extends Writable> implements Writable { */ public Reducer(ReduceOperation<S, R> reduceOp) { this.reduceOp = reduceOp; - this.currentValue = reduceOp.createInitialValue(); + this.currentValue = createInitialValue(); } /** * Constructor @@ -66,21 +67,26 @@ public class Reducer<S, R extends Writable> implements Writable { * @param valueToReduce Single value to reduce */ public void reduceSingle(S valueToReduce) { - reduceOp.reduceSingle(currentValue, valueToReduce); + currentValue = reduceOp.reduceSingle(currentValue, valueToReduce); } /** * Reduce given partially reduced value into current reduced value. * @param valueToReduce Partial value to reduce */ public void reducePartial(R valueToReduce) { - reduceOp.reducePartial(currentValue, valueToReduce); + currentValue = reduceOp.reducePartial(currentValue, valueToReduce); } /** * Return new initial reduced value. * @return New initial reduced value */ public R createInitialValue() { - return reduceOp.createInitialValue(); + R value = reduceOp.createInitialValue(); + if (value == null) { + throw new IllegalStateException( + "Initial value for reducer cannot be null, but is for " + reduceOp); + } + return value; } public ReduceOperation<S, R> getReduceOp() { @@ -95,16 +101,31 @@ public class Reducer<S, R extends Writable> implements Writable { this.currentValue = currentValue; } - @Override + /** + * Serialize the fields of this object to <code>out</code>. + * + * @param out <code>DataOuput</code> to serialize this object into. + * @throws IOException + */ public void write(DataOutput out) throws IOException { WritableUtils.writeWritableObject(reduceOp, out); currentValue.write(out); } - @Override - public void readFields(DataInput in) throws IOException { - reduceOp = WritableUtils.readWritableObject(in, null); - currentValue = reduceOp.createInitialValue(); + /** + * Deserialize the fields of this object from <code>in</code>. + * + * <p>For efficiency, implementations should attempt to re-use storage in the + * existing object where possible.</p> + * + * @param in <code>DataInput</code> to deseriablize this object from. + * @param conf Configuration + * @throws IOException + */ + public void readFields(DataInput in, + ImmutableClassesGiraphConfiguration conf) throws IOException { + reduceOp = WritableUtils.readWritableObject(in, conf); + currentValue = createInitialValue(); currentValue.readFields(in); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java deleted file mode 100644 index 43bed7e..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.utils; - -import org.apache.hadoop.io.Writable; - -/** - * Factory that can be serialized. - * @param <T> Type of object factory creates - */ -public interface WritableFactory<T> extends Writable, Factory<T> { - -} http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java index 923d369..8c24697 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java @@ -745,13 +745,15 @@ public class WritableUtils { * @param reusableOut Reusable output stream to serialize into * @param reusableIn Reusable input stream to deserialize out of * @param original Original value of which to make a copy + * @param conf Configuration * @param <T> Type of the object * @return Copy of the original value */ public static <T extends Writable> T createCopy( UnsafeByteArrayOutputStream reusableOut, - UnsafeReusableByteArrayInput reusableIn, T original) { - T copy = (T) createWritable(original.getClass(), null); + UnsafeReusableByteArrayInput reusableIn, T original, + ImmutableClassesGiraphConfiguration conf) { + T copy = (T) createWritable(original.getClass(), conf); try { reusableOut.reset(); http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java index 5238a07..916e7a0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java @@ -18,6 +18,7 @@ package org.apache.giraph.worker; import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable; +import org.apache.giraph.master.AggregatorBroadcast; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -64,6 +65,7 @@ public abstract class WorkerAggregatorDelegator<I extends WritableComparable, @Override public final <A extends Writable> A getAggregatedValue(String name) { - return this.<A>getBroadcast(name); + AggregatorBroadcast<A> broadcast = workerGlobalCommUsage.getBroadcast(name); + return broadcast.getValue(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java index 05a13a7..ee47542 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java @@ -87,7 +87,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage { B value = (B) broadcastedMap.get(name); if (value == null) { LOG.warn("getBroadcast: " + - AggregatorUtils.getUnregisteredAggregatorMessage(name, + AggregatorUtils.getUnregisteredBroadcastMessage(name, broadcastedMap.size() != 0, conf)); } return value; @@ -103,7 +103,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage { } } else { throw new IllegalStateException("reduce: " + - AggregatorUtils.getUnregisteredAggregatorMessage(name, + AggregatorUtils.getUnregisteredReducerMessage(name, reducerMap.size() != 0, conf)); } } @@ -122,7 +122,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage { } } else { throw new IllegalStateException("reduce: " + - AggregatorUtils.getUnregisteredAggregatorMessage(name, + AggregatorUtils.getUnregisteredReducerMessage(name, reducerMap.size() != 0, conf)); } } @@ -309,7 +309,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage { entry.getValue().getReduceOp(); ReduceOperation<Object, Writable> threadLocalCopy = - WritableUtils.createCopy(out, in, globalReduceOp); + WritableUtils.createCopy(out, in, globalReduceOp, conf); threadReducerMap.put(entry.getKey(), new Reducer<>(threadLocalCopy)); } http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerBroadcastUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerBroadcastUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerBroadcastUsage.java new file mode 100644 index 0000000..9b4e160 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerBroadcastUsage.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.worker; + +import org.apache.hadoop.io.Writable; + +/** + * Methods on worker can access broadcasted values through this interface + */ +public interface WorkerBroadcastUsage { + /** + * Get value broadcasted from master + * @param name Name of the broadcasted value + * @return Broadcasted value + * @param <B> Broadcast value type + */ + <B extends Writable> B getBroadcast(String name); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java index 39566f5..fa31bc2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerGlobalCommUsage.java @@ -17,24 +17,11 @@ */ package org.apache.giraph.worker; -import org.apache.hadoop.io.Writable; /** * Methods on worker can access broadcasted values and provide * values to reduce through this interface */ -public interface WorkerGlobalCommUsage { - /** - * Reduce given value. - * @param name Name of the reducer - * @param value Single value to reduce - */ - void reduce(String name, Object value); - /** - * Get value broadcasted from master - * @param name Name of the broadcasted value - * @return Broadcasted value - * @param <B> Broadcast value type - */ - <B extends Writable> B getBroadcast(String name); +public interface WorkerGlobalCommUsage + extends WorkerBroadcastUsage, WorkerReduceUsage { } http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java new file mode 100644 index 0000000..9c2e90d --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.worker; + +/** + * Methods on worker can provide values to reduce through this interface + */ +public interface WorkerReduceUsage { + /** + * Reduce given value. + * @param name Name of the reducer + * @param value Single value to reduce + */ + void reduce(String name, Object value); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d32c429a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java b/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java deleted file mode 100644 index 2898647..0000000 --- a/giraph-core/src/test/java/org/apache/giraph/aggregators/TestArrayAggregator.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.aggregators; - -import static org.junit.Assert.assertEquals; - -import org.apache.giraph.utils.ArrayWritable; -import org.apache.hadoop.io.LongWritable; -import org.junit.Test; - -public class TestArrayAggregator { - @Test - public void testMaxAggregator() { - Aggregator<ArrayWritable<LongWritable>> max = new ArrayAggregatorFactory<>(2, LongMaxAggregator.class).create(); - - ArrayWritable<LongWritable> tmp = max.createInitialValue(); - - tmp.get()[0].set(2); - max.aggregate(tmp); - - tmp.get()[0].set(3); - tmp.get()[1].set(1); - max.aggregate(tmp); - - assertEquals(3L, max.getAggregatedValue().get()[0].get()); - assertEquals(1L, max.getAggregatedValue().get()[1].get()); - - tmp.get()[0].set(-1); - tmp.get()[1].set(-1); - max.setAggregatedValue(tmp); - - assertEquals(-1L, max.getAggregatedValue().get()[0].get()); - assertEquals(-1L, max.getAggregatedValue().get()[1].get()); - } -}
