Repository: giraph Updated Branches: refs/heads/trunk d11da6779 -> 931569d58
[GIRAPH-987] Improve naming for ReduceOperation Summary: reduceSingle/reducePartial can be slightly confusing, changing to reduce/reduceMerge OnSameReduceOperation => ReduceSameTypeOperation If you have better suggestions - I am all ears :) Test Plan: mvn clean install Reviewers: majakabiljo, sergey.edunov, laxman.dhulipala, maja.kabiljo Differential Revision: https://reviews.facebook.net/D31785 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/931569d5 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/931569d5 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/931569d5 Branch: refs/heads/trunk Commit: 931569d5816a520a9f67844eca296e24140bd259 Parents: d11da67 Author: Igor Kabiljo <[email protected]> Authored: Wed Jan 28 10:10:19 2015 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Wed Jan 28 10:18:13 2015 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../giraph/benchmark/ReducersBenchmark.java | 6 ++-- .../aggregators/OwnerAggregatorServerData.java | 2 +- .../master/AggregatorReduceOperation.java | 6 ++-- .../giraph/master/MasterAggregatorHandler.java | 2 +- .../giraph/reducers/OnSameReduceOperation.java | 34 -------------------- .../apache/giraph/reducers/ReduceOperation.java | 4 +-- .../reducers/ReduceSameTypeOperation.java | 34 ++++++++++++++++++++ .../org/apache/giraph/reducers/Reducer.java | 8 ++--- .../apache/giraph/reducers/impl/AndReduce.java | 6 ++-- .../apache/giraph/reducers/impl/MaxReduce.java | 6 ++-- .../apache/giraph/reducers/impl/MinReduce.java | 6 ++-- .../apache/giraph/reducers/impl/OrReduce.java | 6 ++-- .../apache/giraph/reducers/impl/SumReduce.java | 6 ++-- .../worker/WorkerAggregatorDelegator.java | 4 +-- .../giraph/worker/WorkerAggregatorHandler.java | 16 ++++----- .../apache/giraph/worker/WorkerReduceUsage.java | 2 +- 17 files changed, 76 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index cf366f2..2e8e8bf 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.2.0 - unreleased + GIRAPH-987: Improve naming for ReduceOperation (ikabiljo via majakabiljo) + GIRAPH-986: Add no-arg constructor to BasicSet (ikabiljo via edunov) GIRAPH-985: Add more metrics (majakabiljo) http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java index 16c33e9..771f1e9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java @@ -31,7 +31,7 @@ import org.apache.giraph.graph.Vertex; import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants; import org.apache.giraph.io.formats.PseudoRandomVertexInputFormat; import org.apache.giraph.master.DefaultMasterCompute; -import org.apache.giraph.reducers.OnSameReduceOperation; +import org.apache.giraph.reducers.ReduceSameTypeOperation; import org.apache.giraph.worker.DefaultWorkerContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; @@ -54,7 +54,7 @@ public class ReducersBenchmark extends GiraphBenchmark { /** LongSumReducer */ public static class TestLongSumReducer - extends OnSameReduceOperation<LongWritable> { + extends ReduceSameTypeOperation<LongWritable> { /** Singleton */ public static final TestLongSumReducer INSTANCE = new TestLongSumReducer(); @@ -64,7 +64,7 @@ public class ReducersBenchmark extends GiraphBenchmark { } @Override - public LongWritable reduceSingle( + public LongWritable reduce( LongWritable curValue, LongWritable valueToReduce) { curValue.set(curValue.get() + valueToReduce.get()); return curValue; http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java index 9e92efc..541a10f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java @@ -109,7 +109,7 @@ public class OwnerAggregatorServerData { public void reduce(String name, Writable value) { Reducer<Object, Writable> reducer = myReducerMap.get(name); synchronized (reducer) { - reducer.reducePartial(value); + reducer.reduceMerge(value); } progressable.progress(); } http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java index 54d421b..bf3570e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorReduceOperation.java @@ -24,7 +24,7 @@ import java.io.IOException; import org.apache.giraph.aggregators.Aggregator; import org.apache.giraph.conf.GiraphConfigurationSettable; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.reducers.OnSameReduceOperation; +import org.apache.giraph.reducers.ReduceSameTypeOperation; import org.apache.giraph.utils.ReflectionUtils; import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; @@ -35,7 +35,7 @@ import org.apache.hadoop.io.Writable; * @param <A> Aggregation object type */ public class AggregatorReduceOperation<A extends Writable> - extends OnSameReduceOperation<A> implements GiraphConfigurationSettable { + extends ReduceSameTypeOperation<A> implements GiraphConfigurationSettable { /** Aggregator class */ private Class<? extends Aggregator<A>> aggregatorClass; /** Aggregator */ @@ -90,7 +90,7 @@ public class AggregatorReduceOperation<A extends Writable> } @Override - public synchronized A reduceSingle(A curValue, A valueToReduce) { + public synchronized A reduce(A curValue, A valueToReduce) { aggregator.setAggregatedValue(curValue); aggregator.aggregate(valueToReduce); A aggregated = aggregator.getAggregatedValue(); http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java index 98de9d6..5558cee 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java @@ -243,7 +243,7 @@ public class MasterAggregatorHandler valueToReduce.readFields(reducedValuesInput); if (reducer.getCurrentValue() != null) { - reducer.reducePartial(valueToReduce); + reducer.reduceMerge(valueToReduce); } else { reducer.setCurrentValue(valueToReduce); } http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java deleted file mode 100644 index cb9f6e0..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/reducers/OnSameReduceOperation.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.reducers; - -import org.apache.hadoop.io.Writable; - -/** - * ReduceOperation object when single object being reduced is of - * same type as reduced value. - * - * @param <R> Reduced object type. - */ -public abstract class OnSameReduceOperation<R extends Writable> - implements ReduceOperation<R, R> { - @Override - public final R reducePartial(R curValue, R valueToReduce) { - return reduceSingle(curValue, valueToReduce); - } -} http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java index adbc4d8..ef501d1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceOperation.java @@ -49,7 +49,7 @@ public interface ReduceOperation<S, R extends Writable> extends Writable { * @param valueToReduce Single value to be reduced * @return reduced value */ - R reduceSingle(R curValue, S valueToReduce); + R reduce(R curValue, S valueToReduce); /** * Add partially reduced value to current partially reduced value. * @@ -59,5 +59,5 @@ public interface ReduceOperation<S, R extends Writable> extends Writable { * @param valueToReduce Partial value to be reduced * @return reduced value */ - R reducePartial(R curValue, R valueToReduce); + R reduceMerge(R curValue, R valueToReduce); } http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceSameTypeOperation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceSameTypeOperation.java b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceSameTypeOperation.java new file mode 100644 index 0000000..a9f792f --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/ReduceSameTypeOperation.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.reducers; + +import org.apache.hadoop.io.Writable; + +/** + * ReduceOperation object when single object being reduced is of + * same type as reduced value. + * + * @param <R> Reduced object type. + */ +public abstract class ReduceSameTypeOperation<R extends Writable> + implements ReduceOperation<R, R> { + @Override + public final R reduceMerge(R curValue, R valueToReduce) { + return reduce(curValue, valueToReduce); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java index 6759276..c03cbbc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/Reducer.java @@ -66,15 +66,15 @@ public class Reducer<S, R extends Writable> { * Reduce given value into current reduced value. * @param valueToReduce Single value to reduce */ - public void reduceSingle(S valueToReduce) { - currentValue = reduceOp.reduceSingle(currentValue, valueToReduce); + public void reduce(S valueToReduce) { + currentValue = reduceOp.reduce(currentValue, valueToReduce); } /** * Reduce given partially reduced value into current reduced value. * @param valueToReduce Partial value to reduce */ - public void reducePartial(R valueToReduce) { - currentValue = reduceOp.reducePartial(currentValue, valueToReduce); + public void reduceMerge(R valueToReduce) { + currentValue = reduceOp.reduceMerge(currentValue, valueToReduce); } /** * Return new initial reduced value. http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/reducers/impl/AndReduce.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/AndReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/AndReduce.java index 31a324c..6947c43 100644 --- a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/AndReduce.java +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/AndReduce.java @@ -21,7 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.giraph.reducers.OnSameReduceOperation; +import org.apache.giraph.reducers.ReduceSameTypeOperation; import org.apache.hadoop.io.BooleanWritable; /** @@ -30,7 +30,7 @@ import org.apache.hadoop.io.BooleanWritable; * The default value when nothing is aggregated is true. * */ -public class AndReduce extends OnSameReduceOperation<BooleanWritable> { +public class AndReduce extends ReduceSameTypeOperation<BooleanWritable> { /** Instance */ public static final AndReduce INSTANCE = new AndReduce(); @@ -40,7 +40,7 @@ public class AndReduce extends OnSameReduceOperation<BooleanWritable> { } @Override - public BooleanWritable reduceSingle( + public BooleanWritable reduce( BooleanWritable curValue, BooleanWritable valueToReduce) { curValue.set(curValue.get() && valueToReduce.get()); return curValue; http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java index febd9a4..35ca778 100644 --- a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java @@ -21,7 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.giraph.reducers.OnSameReduceOperation; +import org.apache.giraph.reducers.ReduceSameTypeOperation; import org.apache.giraph.types.ops.DoubleTypeOps; import org.apache.giraph.types.ops.IntTypeOps; import org.apache.giraph.types.ops.LongTypeOps; @@ -37,7 +37,7 @@ import org.apache.hadoop.io.WritableComparable; * @param <T> Value type */ public class MaxReduce<T extends WritableComparable> - extends OnSameReduceOperation<T> { + extends ReduceSameTypeOperation<T> { /** DoubleWritable specialization */ public static final MaxReduce<DoubleWritable> DOUBLE = new MaxReduce<>(DoubleTypeOps.INSTANCE); @@ -69,7 +69,7 @@ public class MaxReduce<T extends WritableComparable> } @Override - public T reduceSingle(T curValue, T valueToReduce) { + public T reduce(T curValue, T valueToReduce) { if (curValue.compareTo(valueToReduce) < 0) { typeOps.set(curValue, valueToReduce); } http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java index 1b5cf03..42880f7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java @@ -21,7 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.giraph.reducers.OnSameReduceOperation; +import org.apache.giraph.reducers.ReduceSameTypeOperation; import org.apache.giraph.types.ops.DoubleTypeOps; import org.apache.giraph.types.ops.IntTypeOps; import org.apache.giraph.types.ops.LongTypeOps; @@ -37,7 +37,7 @@ import org.apache.hadoop.io.WritableComparable; * @param <T> Value type */ public class MinReduce<T extends WritableComparable> - extends OnSameReduceOperation<T> { + extends ReduceSameTypeOperation<T> { /** DoubleWritable specialization */ public static final MinReduce<DoubleWritable> DOUBLE = new MinReduce<>(DoubleTypeOps.INSTANCE); @@ -69,7 +69,7 @@ public class MinReduce<T extends WritableComparable> } @Override - public T reduceSingle(T curValue, T valueToReduce) { + public T reduce(T curValue, T valueToReduce) { if (curValue.compareTo(valueToReduce) > 0) { typeOps.set(curValue, valueToReduce); } http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/reducers/impl/OrReduce.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/OrReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/OrReduce.java index 86dd891..ccaedf7 100644 --- a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/OrReduce.java +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/OrReduce.java @@ -21,7 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.giraph.reducers.OnSameReduceOperation; +import org.apache.giraph.reducers.ReduceSameTypeOperation; import org.apache.hadoop.io.BooleanWritable; /** @@ -30,7 +30,7 @@ import org.apache.hadoop.io.BooleanWritable; * The default value when nothing is aggregated is false. * */ -public class OrReduce extends OnSameReduceOperation<BooleanWritable> { +public class OrReduce extends ReduceSameTypeOperation<BooleanWritable> { /** Instance */ public static final OrReduce INSTANCE = new OrReduce(); @@ -40,7 +40,7 @@ public class OrReduce extends OnSameReduceOperation<BooleanWritable> { } @Override - public BooleanWritable reduceSingle( + public BooleanWritable reduce( BooleanWritable curValue, BooleanWritable valueToReduce) { curValue.set(curValue.get() || valueToReduce.get()); return curValue; http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java index 8722ab8..62e87e2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java @@ -21,7 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.giraph.reducers.OnSameReduceOperation; +import org.apache.giraph.reducers.ReduceSameTypeOperation; import org.apache.giraph.types.ops.DoubleTypeOps; import org.apache.giraph.types.ops.IntTypeOps; import org.apache.giraph.types.ops.LongTypeOps; @@ -37,7 +37,7 @@ import org.apache.hadoop.io.Writable; * @param <T> Value type */ public class SumReduce<T extends Writable> - extends OnSameReduceOperation<T> { + extends ReduceSameTypeOperation<T> { /** DoubleWritable specialization */ public static final SumReduce<DoubleWritable> DOUBLE = new SumReduce<>(DoubleTypeOps.INSTANCE); @@ -69,7 +69,7 @@ public class SumReduce<T extends Writable> } @Override - public T reduceSingle(T curValue, T valueToReduce) { + public T reduce(T curValue, T valueToReduce) { typeOps.plusInto(curValue, valueToReduce); return curValue; } http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java index 1b2e749..5cb8ea0 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java @@ -54,8 +54,8 @@ public abstract class WorkerAggregatorDelegator<I extends WritableComparable, } @Override - public void reducePartial(String name, Writable value) { - workerGlobalCommUsage.reducePartial(name, value); + public void reduceMerge(String name, Writable value) { + workerGlobalCommUsage.reduceMerge(name, value); } @Override http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java index 96d239d..d81d1d4 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java @@ -99,7 +99,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage { if (reducer != null) { progressable.progress(); synchronized (reducer) { - reducer.reduceSingle(value); + reducer.reduce(value); } } else { throw new IllegalStateException("reduce: " + @@ -114,12 +114,12 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage { * @param valueToReduce Partial value to reduce */ @Override - public void reducePartial(String name, Writable valueToReduce) { + public void reduceMerge(String name, Writable valueToReduce) { Reducer<Object, Writable> reducer = reducerMap.get(name); if (reducer != null) { progressable.progress(); synchronized (reducer) { - reducer.reducePartial(valueToReduce); + reducer.reduceMerge(valueToReduce); } } else { throw new IllegalStateException("reduce: " + @@ -321,7 +321,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage { Reducer<Object, Writable> reducer = threadReducerMap.get(name); if (reducer != null) { progressable.progress(); - reducer.reduceSingle(value); + reducer.reduce(value); } else { throw new IllegalStateException("reduce: " + AggregatorUtils.getUnregisteredAggregatorMessage(name, @@ -330,13 +330,13 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage { } @Override - public void reducePartial(String name, Writable value) { + public void reduceMerge(String name, Writable value) { Reducer<Object, Writable> reducer = threadReducerMap.get(name); if (reducer != null) { progressable.progress(); - reducer.reducePartial(value); + reducer.reduceMerge(value); } else { - throw new IllegalStateException("reducePartial: " + + throw new IllegalStateException("reduceMerge: " + AggregatorUtils.getUnregisteredAggregatorMessage(name, threadReducerMap.size() != 0, conf)); } @@ -353,7 +353,7 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage { // WorkerAggregatorHandler for (Entry<String, Reducer<Object, Writable>> entry : threadReducerMap.entrySet()) { - WorkerAggregatorHandler.this.reducePartial(entry.getKey(), + WorkerAggregatorHandler.this.reduceMerge(entry.getKey(), entry.getValue().getCurrentValue()); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/931569d5/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java index fe7cd32..1b6a111 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java @@ -35,5 +35,5 @@ public interface WorkerReduceUsage { * @param name Name of the reducer * @param value Single value to reduce */ - void reducePartial(String name, Writable value); + void reduceMerge(String name, Writable value); }
