This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 6ddf0c2 Optimize reservoir sampling calculation new bcced0c Merge pull request #14406 from [BEAM-11836] Optimize reservoir sampling calculation in PCollectionConsumerRegistry 6ddf0c2 is described below commit 6ddf0c2ff6706883771ec9cb13309101b34b80c4 Author: kileys <kiley...@google.com> AuthorDate: Fri Apr 2 00:57:38 2021 +0000 Optimize reservoir sampling calculation --- .../harness/data/PCollectionConsumerRegistry.java | 49 ++++++++++++++++------ 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java index 14d245dc..457cbe8 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java @@ -377,23 +377,46 @@ public class PCollectionConsumerRegistry { } } - // Lowest sampling probability: 0.001%. - private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000; - private static final int SAMPLING_CUTOFF = 10; - private int samplingToken = 0; + private static final int RESERVOIR_SIZE = 10; + private static final int SAMPLING_THRESHOLD = 30; + private long samplingToken = 0; + private long nextSamplingToken = 0; private Random randomGenerator = new Random(); - // TODO(BEAM-11836): Implement fast approximation for reservoir sampling. private boolean shouldSampleElement() { // Sampling probability decreases as the element count is increasing. - // We unconditionally sample the first samplingCutoff elements. For the - // next samplingCutoff elements, the sampling probability drops from 100% - // to 50%. The probability of sampling the Nth element is: - // min(1, samplingCutoff / N), with an additional lower bound of - // samplingCutoff / samplingTokenUpperBound. This algorithm may be refined - // later. - samplingToken = Math.min(samplingToken + 1, SAMPLING_TOKEN_UPPER_BOUND); - return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF; + // We unconditionally sample the first samplingCutoff elements. Calculating + // nextInt(samplingToken) for each element is expensive, so after a threshold, calculate the + // gap to next sample. + // https://erikerlandson.github.io/blog/2015/11/20/very-fast-reservoir-sampling/ + + // Reset samplingToken if it's going to exceed the max value. + if (samplingToken + 1 == Long.MAX_VALUE) { + samplingToken = 0; + nextSamplingToken = getNextSamplingToken(samplingToken); + } + + samplingToken++; + // Use traditional sampling until the threshold of 30 + if (nextSamplingToken == 0) { + if (randomGenerator.nextInt((int) samplingToken) <= RESERVOIR_SIZE) { + if (samplingToken > SAMPLING_THRESHOLD) { + nextSamplingToken = getNextSamplingToken(samplingToken); + } + return true; + } + } else if (samplingToken >= nextSamplingToken) { + nextSamplingToken = getNextSamplingToken(samplingToken); + return true; + } + return false; + } + + private long getNextSamplingToken(long samplingToken) { + double gap = + Math.log(1.0 - randomGenerator.nextDouble()) + / Math.log(1.0 - RESERVOIR_SIZE / (double) samplingToken); + return samplingToken + (int) gap; } } }