This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f3f80d8  Fixes a bug in Sample.Any
     new fd8ad27  This closes #4216: Fixes a bug in Sample.Any
f3f80d8 is described below

commit f3f80d841897f4481fdb51609659d76c3fe7e550
Author: Eugene Kirpichov <kirpic...@google.com>
AuthorDate: Mon Dec 4 19:08:14 2017 -0800

    Fixes a bug in Sample.Any
    
    (I introduced the bug while merging a different PR...)
    Also adds a test for the combine fn and exposes the fn.
    Documents the difference between any() and fixedSize().
---
 .../java/org/apache/beam/sdk/testing/CombineFnTester.java |  2 +-
 .../main/java/org/apache/beam/sdk/transforms/Sample.java  | 15 +++++++++++++--
 .../java/org/apache/beam/sdk/transforms/SampleTest.java   | 15 +++++++++++++++
 3 files changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
index efd2af3..896d955 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/CombineFnTester.java
@@ -95,7 +95,7 @@ public class CombineFnTester {
       CombineFn<InputT, AccumT, OutputT> fn,
       List<? extends Iterable<InputT>> shards,
       Matcher<? super OutputT> matcher) {
-    AccumT accumulator = null;
+    AccumT accumulator = shards.isEmpty() ? fn.createAccumulator() : null;
     for (AccumT inputAccum : combineInputs(fn, shards)) {
       if (accumulator == null) {
         accumulator = inputAccum;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 2eb12d6..d7cba7e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -38,17 +38,28 @@ import org.apache.beam.sdk.values.PCollection;
  * {@code PCollection}, or samples of the values associated with each
  * key in a {@code PCollection} of {@code KV}s.
  *
+ * {@link #fixedSizeGlobally(int)} and {@link #fixedSizePerKey(int)} compute 
uniformly random
+ * samples. {@link #any(long)} is faster, but provides no uniformity 
guarantees.
+ *
  * <p>{@link #combineFn} can also be used manually, in combination with state 
and with the
  * {@link Combine} transform.
  */
 public class Sample {
 
-  /** Returns a {@link CombineFn} that computes a fixed-sized sample of its 
inputs. */
+  /** Returns a {@link CombineFn} that computes a fixed-sized uniform sample 
of its inputs. */
   public static <T> CombineFn<T, ?, Iterable<T>> combineFn(int sampleSize) {
     return new FixedSizedSampleFn<>(sampleSize);
   }
 
   /**
+   * Returns a {@link CombineFn} that computes a fixed-sized potentially 
non-uniform sample of its
+   * inputs.
+   */
+  public static <T> CombineFn<T, ?, Iterable<T>> anyCombineFn(int sampleSize) {
+    return new SampleAnyCombineFn<>(sampleSize);
+  }
+
+  /**
    * {@code Sample#any(long)} takes a {@code PCollection<T>} and a limit, and
    * produces a new {@code PCollection<T>} containing up to limit
    * elements of the input {@code PCollection}.
@@ -233,10 +244,10 @@ public class Sample {
       List<T> res = iter.next();
       while (iter.hasNext()) {
         for (T t : iter.next()) {
-          res.add(t);
           if (res.size() >= limit) {
             return res;
           }
+          res.add(t);
         }
       }
       return res;
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index 357f256..ed6905d 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -20,6 +20,9 @@ package org.apache.beam.sdk.transforms;
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.isIn;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -37,6 +40,7 @@ import java.util.TreeSet;
 import org.apache.beam.sdk.TestUtils;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.CombineFnTester;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.ValidatesRunner;
@@ -46,6 +50,7 @@ import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -174,6 +179,16 @@ public class SampleTest {
     public void testPickAny() {
       runPickAnyTest(lines, limit);
     }
+
+    @Test
+    public void testCombineFn() {
+      CombineFnTester.testCombineFn(
+          Sample.<String>combineFn(limit),
+          lines,
+          allOf(
+              Matchers.<String>iterableWithSize(Math.min(lines.size(), limit)),
+              everyItem(isIn(lines))));
+    }
   }
 
   /**

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <commits@beam.apache.org>'].

Reply via email to