Updated Branches: refs/heads/master ed7481d9c -> 1c95647be
CRUNCH-74: Add an initialize(Configuration conf) method to the Aggregator interface Signed-off-by: Rahul Sharma <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/1c95647b Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/1c95647b Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/1c95647b Branch: refs/heads/master Commit: 1c95647be1abc0bc2927ea1aceec9dfdcf61667d Parents: ed7481d Author: Josh Wills <[email protected]> Authored: Fri Sep 28 18:13:18 2012 -0700 Committer: Rahul Sharma <[email protected]> Committed: Sat Oct 6 08:53:37 2012 +0530 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/CollectionsIT.java | 2 +- .../src/main/java/org/apache/crunch/CombineFn.java | 69 ++++++++++---- 2 files changed, 50 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1c95647b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java ---------------------------------------------------------------------- diff --git a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java index 3f372be..0d5803e 100644 --- a/crunch/src/it/java/org/apache/crunch/CollectionsIT.java +++ b/crunch/src/it/java/org/apache/crunch/CollectionsIT.java @@ -38,7 +38,7 @@ import com.google.common.collect.Lists; @SuppressWarnings("serial") public class CollectionsIT { - public static class AggregateStringListFn implements CombineFn.Aggregator<Collection<String>> { + public static class AggregateStringListFn extends CombineFn.SimpleAggregator<Collection<String>> { private final Collection<String> rtn = Lists.newArrayList(); @Override http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/1c95647b/crunch/src/main/java/org/apache/crunch/CombineFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java index 88fbbaf..246827d 100644 --- a/crunch/src/main/java/org/apache/crunch/CombineFn.java +++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.SortedSet; import org.apache.crunch.util.Tuples; +import org.apache.hadoop.conf.Configuration; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; @@ -42,6 +43,12 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, public static interface Aggregator<T> extends Serializable { /** + * Perform any setup of this instance that is required prior to processing + * inputs. + */ + void initialize(Configuration configuration); + + /** * Clears the internal state of this Aggregator and prepares it for the * values associated with the next key. */ @@ -60,6 +67,16 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } /** + * Base class for aggregators that do not require any initialization. + */ + public static abstract class SimpleAggregator<T> implements Aggregator<T> { + @Override + public void initialize(Configuration conf) { + // No-op + } + } + + /** * Interface for constructing new aggregator instances. */ public static interface AggregatorFactory<T> { @@ -79,6 +96,11 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } @Override + public void initialize() { + aggregator.initialize(getConfiguration()); + } + + @Override public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) { aggregator.reset(); for (V v : input.second()) { @@ -101,6 +123,13 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } @Override + public void initialize(Configuration configuration) { + for (Aggregator<?> a : aggregators) { + a.initialize(configuration); + } + } + + @Override public void reset() { for (Aggregator<?> a : aggregators) { a.reset(); @@ -379,7 +408,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, return aggregator(new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength)); } - public static class SumLongs implements Aggregator<Long> { + public static class SumLongs extends SimpleAggregator<Long> { private long sum = 0; @Override @@ -404,7 +433,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class SumInts implements Aggregator<Integer> { + public static class SumInts extends SimpleAggregator<Integer> { private int sum = 0; @Override @@ -429,7 +458,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class SumFloats implements Aggregator<Float> { + public static class SumFloats extends SimpleAggregator<Float> { private float sum = 0; @Override @@ -454,7 +483,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class SumDoubles implements Aggregator<Double> { + public static class SumDoubles extends SimpleAggregator<Double> { private double sum = 0; @Override @@ -479,7 +508,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class SumBigInts implements Aggregator<BigInteger> { + public static class SumBigInts extends SimpleAggregator<BigInteger> { private BigInteger sum = BigInteger.ZERO; @Override @@ -504,7 +533,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class MaxLongs implements Aggregator<Long> { + public static class MaxLongs extends SimpleAggregator<Long> { private Long max = null; @Override @@ -531,7 +560,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class MaxInts implements Aggregator<Integer> { + public static class MaxInts extends SimpleAggregator<Integer> { private Integer max = null; @Override @@ -558,7 +587,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class MaxFloats implements Aggregator<Float> { + public static class MaxFloats extends SimpleAggregator<Float> { private Float max = null; @Override @@ -585,7 +614,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class MaxDoubles implements Aggregator<Double> { + public static class MaxDoubles extends SimpleAggregator<Double> { private Double max = null; @Override @@ -612,7 +641,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class MaxBigInts implements Aggregator<BigInteger> { + public static class MaxBigInts extends SimpleAggregator<BigInteger> { private BigInteger max = null; @Override @@ -639,7 +668,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class MinLongs implements Aggregator<Long> { + public static class MinLongs extends SimpleAggregator<Long> { private Long min = null; @Override @@ -666,7 +695,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class MinInts implements Aggregator<Integer> { + public static class MinInts extends SimpleAggregator<Integer> { private Integer min = null; @Override @@ -693,7 +722,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class MinFloats implements Aggregator<Float> { + public static class MinFloats extends SimpleAggregator<Float> { private Float min = null; @Override @@ -720,7 +749,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class MinDoubles implements Aggregator<Double> { + public static class MinDoubles extends SimpleAggregator<Double> { private Double min = null; @Override @@ -747,7 +776,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class MinBigInts implements Aggregator<BigInteger> { + public static class MinBigInts extends SimpleAggregator<BigInteger> { private BigInteger min = null; @Override @@ -774,7 +803,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } }; - public static class MaxNAggregator<V extends Comparable<V>> implements Aggregator<V> { + public static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> { private final int arity; private transient SortedSet<V> elements; @@ -807,7 +836,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } } - public static class MinNAggregator<V extends Comparable<V>> implements Aggregator<V> { + public static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> { private final int arity; private transient SortedSet<V> elements; @@ -840,7 +869,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } } - public static class FirstNAggregator<V> implements Aggregator<V> { + public static class FirstNAggregator<V> extends SimpleAggregator<V> { private final int arity; private final List<V> elements; @@ -867,7 +896,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } } - public static class LastNAggregator<V> implements Aggregator<V> { + public static class LastNAggregator<V> extends SimpleAggregator<V> { private final int arity; private final LinkedList<V> elements; @@ -895,7 +924,7 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, } } - public static class StringConcatAggregator implements Aggregator<String> { + public static class StringConcatAggregator extends SimpleAggregator<String> { private final String separator; private final boolean skipNulls; private final long maxOutputLength;
