This is an automated email from the ASF dual-hosted git repository. jkff pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f3f80d8 Fixes a bug in Sample.Any new fd8ad27 This closes #4216: Fixes a bug in Sample.Any f3f80d8 is described below commit f3f80d841897f4481fdb51609659d76c3fe7e550 Author: Eugene Kirpichov <kirpic...@google.com> AuthorDate: Mon Dec 4 19:08:14 2017 -0800 Fixes a bug in Sample.Any (I introduced the bug while merging a different PR...) Also adds a test for the combine fn and exposes the fn. Documents the difference between any() and fixedSize(). --- .../java/org/apache/beam/sdk/testing/CombineFnTester.java | 2 +- .../main/java/org/apache/beam/sdk/transforms/Sample.java | 15 +++++++++++++-- .../java/org/apache/beam/sdk/transforms/SampleTest.java | 15 +++++++++++++++ 3 files changed, 29 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java index efd2af3..896d955 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java @@ -95,7 +95,7 @@ public class CombineFnTester { CombineFn<InputT, AccumT, OutputT> fn, List<? extends Iterable<InputT>> shards, Matcher<? super OutputT> matcher) { - AccumT accumulator = null; + AccumT accumulator = shards.isEmpty() ? fn.createAccumulator() : null; for (AccumT inputAccum : combineInputs(fn, shards)) { if (accumulator == null) { accumulator = inputAccum; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java index 2eb12d6..d7cba7e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java @@ -38,17 +38,28 @@ import org.apache.beam.sdk.values.PCollection; * {@code PCollection}, or samples of the values associated with each * key in a {@code PCollection} of {@code KV}s. * + * {@link #fixedSizeGlobally(int)} and {@link #fixedSizePerKey(int)} compute uniformly random + * samples. {@link #any(long)} is faster, but provides no uniformity guarantees. + * * <p>{@link #combineFn} can also be used manually, in combination with state and with the * {@link Combine} transform. */ public class Sample { - /** Returns a {@link CombineFn} that computes a fixed-sized sample of its inputs. */ + /** Returns a {@link CombineFn} that computes a fixed-sized uniform sample of its inputs. */ public static <T> CombineFn<T, ?, Iterable<T>> combineFn(int sampleSize) { return new FixedSizedSampleFn<>(sampleSize); } /** + * Returns a {@link CombineFn} that computes a fixed-sized potentially non-uniform sample of its + * inputs. + */ + public static <T> CombineFn<T, ?, Iterable<T>> anyCombineFn(int sampleSize) { + return new SampleAnyCombineFn<>(sampleSize); + } + + /** * {@code Sample#any(long)} takes a {@code PCollection<T>} and a limit, and * produces a new {@code PCollection<T>} containing up to limit * elements of the input {@code PCollection}. @@ -233,10 +244,10 @@ public class Sample { List<T> res = iter.next(); while (iter.hasNext()) { for (T t : iter.next()) { - res.add(t); if (res.size() >= limit) { return res; } + res.add(t); } } return res; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java index 357f256..ed6905d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java @@ -20,6 +20,9 @@ package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.isIn; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -37,6 +40,7 @@ import java.util.TreeSet; import org.apache.beam.sdk.TestUtils; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.CombineFnTester; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; @@ -46,6 +50,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; @@ -174,6 +179,16 @@ public class SampleTest { public void testPickAny() { runPickAnyTest(lines, limit); } + + @Test + public void testCombineFn() { + CombineFnTester.testCombineFn( + Sample.<String>combineFn(limit), + lines, + allOf( + Matchers.<String>iterableWithSize(Math.min(lines.size(), limit)), + everyItem(isIn(lines)))); + } } /** -- To stop receiving notification emails like this one, please contact ['"commits@beam.apache.org" <commits@beam.apache.org>'].