lukecwik commented on code in PR #25551:
URL: https://github.com/apache/beam/pull/25551#discussion_r1113598604


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java:
##########
@@ -393,89 +394,58 @@ public double getProgress() {
     }
   }
 
-  private static class SampleByteSizeDistribution<T> {
+  private static class SampleByteSizeDistribution<T> extends 
ElementByteSizeObserver {
     /** Basic implementation of {@link ElementByteSizeObserver} for use in 
size estimation. */
-    private static class ByteSizeObserver extends ElementByteSizeObserver {
-      private long observedSize = 0;
-
-      @Override
-      protected void reportElementSize(long elementSize) {
-        observedSize += elementSize;
-      }
+    @Override
+    protected void reportElementSize(long elementSize) {
+      distribution.update(elementSize);
     }
 
     final Distribution distribution;
-    ByteSizeObserver byteCountObserver;
 
     public SampleByteSizeDistribution(Distribution distribution) {
       this.distribution = distribution;
-      this.byteCountObserver = null;
     }
 
     public void tryUpdate(T value, Coder<T> coder) throws Exception {
       if (shouldSampleElement()) {
-        // First try using byte size observer
-        byteCountObserver = new ByteSizeObserver();
-        coder.registerByteSizeObserver(value, byteCountObserver);
+        coder.registerByteSizeObserver(value, this);
 
-        if (!byteCountObserver.getIsLazy()) {
-          byteCountObserver.advance();
-          this.distribution.update(byteCountObserver.observedSize);
+        if (!getIsLazy()) {
+          advance();
         }
-      } else {
-        byteCountObserver = null;
       }
     }
 
     public void finishLazyUpdate() {
       // Advance lazy ElementByteSizeObservers, if any.
-      if (byteCountObserver != null && byteCountObserver.getIsLazy()) {
-        byteCountObserver.advance();
-        this.distribution.update(byteCountObserver.observedSize);
+      // Note that user's code is allowed to store the element of one
+      // DoFn.processElement() call and access it later on. We are still
+      // calling next() here, causing an update to byteCount. If user's
+      // code really accesses more element's pieces later on, their byte
+      // count would accrue against a future element. This is not ideal,
+      // but still approximately correct.
+      if (getIsLazy()) {
+        advance();
       }
     }
 
-    private static final int RESERVOIR_SIZE = 10;
-    private static final int SAMPLING_THRESHOLD = 30;
-    private long samplingToken = 0;
-    private long nextSamplingToken = 0;
-    private Random randomGenerator = new Random();
+    // Lowest sampling probability: 0.001%.
+    private static final int SAMPLING_TOKEN_UPPER_BOUND = 1000000;
+    private static final int SAMPLING_CUTOFF = 10;
+    private int samplingToken = 0;
+    private Random randomGenerator = ThreadLocalRandom.current();
 
     private boolean shouldSampleElement() {
       // Sampling probability decreases as the element count is increasing.
-      // We unconditionally sample the first samplingCutoff elements. 
Calculating
-      // nextInt(samplingToken) for each element is expensive, so after a 
threshold, calculate the
-      // gap to next sample.
-      // 
https://erikerlandson.github.io/blog/2015/11/20/very-fast-reservoir-sampling/
-
-      // Reset samplingToken if it's going to exceed the max value.
-      if (samplingToken + 1 == Long.MAX_VALUE) {
-        samplingToken = 0;
-        nextSamplingToken = getNextSamplingToken(samplingToken);
-      }
-
-      samplingToken++;
-      // Use traditional sampling until the threshold of 30
-      if (nextSamplingToken == 0) {
-        if (samplingToken <= RESERVOIR_SIZE
-            || randomGenerator.nextInt((int) samplingToken) < RESERVOIR_SIZE) {
-          if (samplingToken > SAMPLING_THRESHOLD) {
-            nextSamplingToken = getNextSamplingToken(samplingToken);
-          }
-          return true;
-        }
-      } else if (samplingToken >= nextSamplingToken) {
-        nextSamplingToken = getNextSamplingToken(samplingToken);
-        return true;
-      }
-      return false;
-    }
-
-    private long getNextSamplingToken(long samplingToken) {
-      double gap =
-          Math.log(1.0 - randomGenerator.nextDouble())
-              / Math.log(1.0 - RESERVOIR_SIZE / (double) samplingToken);
-      return samplingToken + (int) gap;
+      // We unconditionally sample the first samplingCutoff elements. For the
+      // next samplingCutoff elements, the sampling probability drops from 100%
+      // to 50%. The probability of sampling the Nth element is:
+      // min(1, samplingCutoff / N), with an additional lower bound of
+      // samplingCutoff / samplingTokenUpperBound. This algorithm may be 
refined
+      // later.
+      samplingToken = Math.min(samplingToken + 1, SAMPLING_TOKEN_UPPER_BOUND);
+      return randomGenerator.nextInt(samplingToken) < SAMPLING_CUTOFF;

Review Comment:
   Yeah this is still WIP and marked it as so. I'm still trying to root cause 
why the existing implementation shows up on FlexWordCount profiles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to