jkff commented on a change in pull request #4175: [BEAM-3247] fix Sample.any 
performance
URL: https://github.com/apache/beam/pull/4175#discussion_r153365677
 
 

 ##########
 File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
 ##########
 @@ -209,29 +201,49 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
   }
 
   /**
-   * A {@link DoFn} that returns up to limit elements from the side input 
PCollection.
+   * A {@link CombineFn} that combines into a {@link List} of up to limit 
elements.
    */
-  private static class SampleAnyDoFn<T> extends DoFn<Void, T> {
-    long limit;
-    final PCollectionView<Iterable<T>> iterableView;
+  private static class SampleAnyCombineFn<T> extends CombineFn<T, List<T>, 
Iterable<T>> {
+    private final long limit;
 
-    public SampleAnyDoFn(long limit, PCollectionView<Iterable<T>> 
iterableView) {
+    private SampleAnyCombineFn(long limit) {
       this.limit = limit;
-      this.iterableView = iterableView;
     }
 
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      for (T i : c.sideInput(iterableView)) {
-        if (limit-- <= 0) {
-          break;
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      if (accumulator.size() < limit) {
+        accumulator.add(input);
+      }
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> merged = new ArrayList<>();
+      for (List<T> accumulator : accumulators) {
+        for (T t : accumulator) {
+          merged.add(t);
+          if (merged.size() >= limit) {
+            return merged;
+          }
         }
-        c.output(i);
       }
+      return merged;
+    }
+
+    @Override
+    public Iterable<T> extractOutput(List<T> accumulator) {
+      return accumulator;
     }
   }
 
-  /**
+    /**
 
 Review comment:
   Accidental change

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to