jkff commented on a change in pull request #4175: [BEAM-3247] fix Sample.any
performance
URL: https://github.com/apache/beam/pull/4175#discussion_r153365677
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
##########
@@ -209,29 +201,49 @@ public void populateDisplayData(DisplayData.Builder
builder) {
}
/**
- * A {@link DoFn} that returns up to limit elements from the side input
PCollection.
+ * A {@link CombineFn} that combines into a {@link List} of up to limit
elements.
*/
- private static class SampleAnyDoFn<T> extends DoFn<Void, T> {
- long limit;
- final PCollectionView<Iterable<T>> iterableView;
+ private static class SampleAnyCombineFn<T> extends CombineFn<T, List<T>,
Iterable<T>> {
+ private final long limit;
- public SampleAnyDoFn(long limit, PCollectionView<Iterable<T>>
iterableView) {
+ private SampleAnyCombineFn(long limit) {
this.limit = limit;
- this.iterableView = iterableView;
}
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (T i : c.sideInput(iterableView)) {
- if (limit-- <= 0) {
- break;
+ @Override
+ public List<T> createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List<T> addInput(List<T> accumulator, T input) {
+ if (accumulator.size() < limit) {
+ accumulator.add(input);
+ }
+ return accumulator;
+ }
+
+ @Override
+ public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+ List<T> merged = new ArrayList<>();
+ for (List<T> accumulator : accumulators) {
+ for (T t : accumulator) {
+ merged.add(t);
+ if (merged.size() >= limit) {
+ return merged;
+ }
}
- c.output(i);
}
+ return merged;
+ }
+
+ @Override
+ public Iterable<T> extractOutput(List<T> accumulator) {
+ return accumulator;
}
}
- /**
+ /**
Review comment:
Accidental change
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services