Updated Branches: refs/heads/master fbc741f10 -> a7b98a9e0
CRUNCH-29: Add a string concatenating aggregator for use with CombineFns. Contributed by Gauthier Ambard. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/a7b98a9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/a7b98a9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/a7b98a9e Branch: refs/heads/master Commit: a7b98a9e0ffc50d6bd2ae643607a33d1f3de72d3 Parents: fbc741f Author: jwills <[email protected]> Authored: Wed Aug 1 09:42:26 2012 -0700 Committer: jwills <[email protected]> Committed: Thu Aug 2 15:09:49 2012 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/crunch/CombineFn.java | 48 +++++++++++++++ .../test/java/org/apache/crunch/CombineFnTest.java | 23 +++++++ 2 files changed, 71 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a7b98a9e/crunch/src/main/java/org/apache/crunch/CombineFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java index fe65d74..27183a9 100644 --- a/crunch/src/main/java/org/apache/crunch/CombineFn.java +++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java @@ -25,6 +25,7 @@ import java.util.SortedSet; import org.apache.crunch.util.Tuples; +import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -334,6 +335,22 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, return aggregator(new LastNAggregator<V>(n)); } + + /** + * Used to concatenate strings, with a separator between each strings. + * + * @param separator + * the separator which will be appended between each string + * @param skipNull + * define if we should skip null values. Throw + * NullPointerException if set to false and there is a null + * value. + * @return + */ + public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull) { + return aggregator(new StringConcatAggregator(separator, skipNull)); + } + public static class SumLongs implements Aggregator<Long> { private long sum = 0; @@ -849,4 +866,35 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, return ImmutableList.copyOf(elements); } } + + public static class StringConcatAggregator implements Aggregator<String> { + private final String separator; + private final boolean skipNulls; + private final LinkedList<String> list = new LinkedList<String>(); + + private transient Joiner joiner; + + public StringConcatAggregator(final String separator, final boolean skipNulls) { + this.separator = separator; + this.skipNulls = skipNulls; + } + + @Override + public void reset() { + if (joiner == null) { + joiner = skipNulls ? Joiner.on(separator).skipNulls() : Joiner.on(separator); + } + list.clear(); + } + + @Override + public void update(final String next) { + list.add(next); + } + + @Override + public Iterable<String> results() { + return ImmutableList.of(joiner.join(list)); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a7b98a9e/crunch/src/test/java/org/apache/crunch/CombineFnTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java index 4f08bbe..82bdf00 100644 --- a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java +++ b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java @@ -35,6 +35,7 @@ import static org.apache.crunch.CombineFn.SUM_LONGS; import static org.junit.Assert.assertEquals; import java.math.BigInteger; +import java.util.Arrays; import java.util.List; import org.apache.crunch.CombineFn.Aggregator; @@ -45,6 +46,7 @@ import org.apache.crunch.CombineFn.MaxNAggregator; import org.apache.crunch.CombineFn.MinNAggregator; import org.apache.crunch.CombineFn.PairAggregator; import org.apache.crunch.CombineFn.QuadAggregator; +import org.apache.crunch.CombineFn.StringConcatAggregator; import org.apache.crunch.CombineFn.TripAggregator; import org.apache.crunch.CombineFn.TupleNAggregator; import org.junit.Test; @@ -183,4 +185,25 @@ public class CombineFnTest { MIN_DOUBLES.create(), MAX_LONGS.create()); assertEquals(new TupleN(1, 20.0, 1, 2.0, 12L), Iterables.getOnlyElement(applyAggregator(a, input))); } + + @Test + public void testConcatenation() { + String[] arrayNull = new String[] { null, "" }; + assertEquals(ImmutableList.of("foofoobarbar"), applyAggregator( + new StringConcatAggregator("", true), ImmutableList.of("foo", "foobar", "bar"))); + + assertEquals(ImmutableList.of("foo/foobar/bar"), applyAggregator( + new StringConcatAggregator("/", false), ImmutableList.of("foo", "foobar", "bar"))); + assertEquals(ImmutableList.of(" "), applyAggregator( + new StringConcatAggregator(" ", true), ImmutableList.of(" ", ""))); + assertEquals(ImmutableList.of(""), applyAggregator( + new StringConcatAggregator(" ", true), Arrays.asList(arrayNull))); + } + + @Test(expected = NullPointerException.class) + public void testConcatenationNullException() { + String[] arrayNull = new String[] { null, "" }; + assertEquals(ImmutableList.of(""), applyAggregator( + new StringConcatAggregator(" ", false), Arrays.asList(arrayNull))); + } }
