CRUNCH-118: Lock down Distinct and add SAMPLE_UNIQUE_ELEMENTS Aggregator
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/affa10f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/affa10f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/affa10f2 Branch: refs/heads/master Commit: affa10f28407408149b346d4a764be2e4a957b36 Parents: a71871d Author: Josh Wills <[email protected]> Authored: Wed Nov 21 11:49:55 2012 -0800 Committer: Matthias Friedrich <[email protected]> Committed: Sat Nov 24 10:05:17 2012 +0100 ---------------------------------------------------------------------- .../java/org/apache/crunch/fn/Aggregators.java | 18 ++++++++++++-- .../main/java/org/apache/crunch/lib/Distinct.java | 5 +++- .../java/org/apache/crunch/fn/AggregatorsTest.java | 6 +++++ 3 files changed, 25 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/affa10f2/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java index 5364d62..524983d 100644 --- a/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java +++ b/crunch/src/main/java/org/apache/crunch/fn/Aggregators.java @@ -391,6 +391,19 @@ public final class Aggregators { } /** + * Collect a random sample of unique elements from the input, where 'unique' is defined by + * the {@code equals} method for the input objects. No guarantees are made about which + * elements will be returned, simply that there will not be any more than the given sample + * size for any key. + * + * @param maximumSampleSize The maximum number of unique elements to return per key + * @return The newly constructed instance + */ + public static <V> Aggregator<V> SAMPLE_UNIQUE_ELEMENTS(int maximumSampleSize) { + return new SetAggregator<V>(maximumSampleSize); + } + + /** * Apply separate aggregators to each component of a {@link Pair}. */ public static <V1, V2> Aggregator<Pair<V1, V2>> pairAggregator( @@ -1084,9 +1097,8 @@ public final class Aggregators { @Override public void update(V value) { - elements.add(value); - if (sizeLimit > 0 && elements.size() > sizeLimit) { - elements.iterator().remove(); + if (sizeLimit == -1 || elements.size() < sizeLimit) { + elements.add(value); } } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/affa10f2/crunch/src/main/java/org/apache/crunch/lib/Distinct.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java index fcf7b7e..dae11f7 100644 --- a/crunch/src/main/java/org/apache/crunch/lib/Distinct.java +++ b/crunch/src/main/java/org/apache/crunch/lib/Distinct.java @@ -32,7 +32,7 @@ import com.google.common.collect.Sets; /** * Functions for computing the distinct elements of a {@code PCollection}. */ -public class Distinct { +public final class Distinct { private static final int DEFAULT_FLUSH_EVERY = 50000; @@ -97,4 +97,7 @@ public class Distinct { emitter.emit(input.first()); } } + + // No instantiation + private Distinct() {} } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/affa10f2/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java b/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java index bd63653..d7daec1 100644 --- a/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java +++ b/crunch/src/test/java/org/apache/crunch/fn/AggregatorsTest.java @@ -38,6 +38,7 @@ import static org.apache.crunch.fn.Aggregators.SUM_LONGS; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import java.math.BigInteger; @@ -128,6 +129,11 @@ public class AggregatorsTest { public void testUniqueElements() { assertThat(ImmutableSet.copyOf(apply(Aggregators.<Integer>UNIQUE_ELEMENTS(), 17, 29, 29, 16, 17)), is(ImmutableSet.of(17, 29, 16))); + + Iterable<Integer> samp = apply(Aggregators.<Integer>SAMPLE_UNIQUE_ELEMENTS(2), 17, 29, 16, 17, 29, 16); + List<Integer> elements = ImmutableList.copyOf(samp); + assertEquals(2, elements.size()); + assertFalse(elements.get(0).equals(elements.get(1))); } @Test
