lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1113627698


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -35,22 +34,22 @@
  * simultaneously, even if computing the same logical PCollection.
  */
 public class DataSampler {
+  private static final Logger LOG = LoggerFactory.getLogger(DataSampler.class);
 
   /**
    * Creates a DataSampler to sample every 1000 elements while keeping a 
maximum of 10 in memory.
    */
   public DataSampler() {
-    this.maxSamples = 10;
-    this.sampleEveryN = 1000;
+    this(10, 1000);
   }
 
   /**
    * @param maxSamples Sets the maximum number of samples held in memory at 
once.
    * @param sampleEveryN Sets how often to sample.
    */
   public DataSampler(int maxSamples, int sampleEveryN) {
-    this.maxSamples = maxSamples;
-    this.sampleEveryN = sampleEveryN;
+    this.maxSamples = maxSamples <= 0 ? 10 : maxSamples;
+    this.sampleEveryN = sampleEveryN <= 0 ? 1000 : sampleEveryN;

Review Comment:
   Its usually better to throw an IllegalArgumentException in these cases 
instead of silently having a different behavior then before.
   
   ```suggestion
       checkArgument(maxSamples > 0, "Expected positive number of samples, did 
you mean to disable data sampling?");
       checkArgument(sampleEveryN > 0, "Expected positive number for sampling 
period, did you mean to disable data sampling?");
       this.maxSamples = maxSamples;
       this.sampleEveryN = sampleEveryN;
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -91,10 +94,13 @@ public void sample(T element) {
   /**
    * Clears samples at end of call. This is to help mitigate memory use.
    *
+   * <p>This method is invoked by a thread handling a data sampling request in 
parallel to any calls
+   * to {@link #sample}.
+   *
    * @return samples taken since last call.
    */
-  public List<byte[]> samples() {
-    List<byte[]> ret = new ArrayList<>();
+  public List<BeamFnApi.SampledElement> samples() throws IOException {
+    List<BeamFnApi.SampledElement> ret = new ArrayList<>();
 
     // Serializing can take a lot of CPU time for larger or complex elements. 
Copy the array here
     // so as to not slow down the main processing hot path.

Review Comment:
   Since your only ever accessing buffer under a synchronized block it will be 
better to do a buffer swap then to copy the buffer.
   
   e.g.
   ```
       List<T> bufferToSend;
       synchronized (this) {
         bufferToSend = buffer;
         buffer = new ArrayList(maxElements);
         resampleIndex = 0;
       }
   ```
   
   Saves on copying the elements and clearing the original.



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -117,6 +127,24 @@ public void testSingleOutput() throws Exception {
     assertHasSamples(samples, "pcollection-id", 
Collections.singleton(encodeInt(1)));
   }
 
+  /**
+   * Smoke test that a samples show in the output map.

Review Comment:
   ```suggestion
      * Smoke test that a sample shows in the output map.
   ```



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -117,6 +127,24 @@ public void testSingleOutput() throws Exception {
     assertHasSamples(samples, "pcollection-id", 
Collections.singleton(encodeInt(1)));
   }
 
+  /**
+   * Smoke test that a samples show in the output map.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testNestedContext() throws Exception {
+    DataSampler sampler = new DataSampler();
+
+    String rawString = "hello";
+    byte[] byteArray = rawString.getBytes(Charset.forName("ASCII"));

Review Comment:
   ```suggestion
       byte[] byteArray = rawString.getBytes(StandardCharsets.US_ASCII);
   ```



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
     // The first 10 are always sampled, but with maxSamples = 5, the first ten 
are downsampled to
     // 4..9 inclusive. Then,
     // the 20th element is sampled (19) and every 20 after.
-    List<byte[]> expected = new ArrayList<>();
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
     expected.add(encodeInt(19));
     expected.add(encodeInt(39));
     expected.add(encodeInt(59));
     expected.add(encodeInt(79));
     expected.add(encodeInt(99));
 
-    List<byte[]> samples = outputSampler.samples();
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
+
+  /**
+   * Test that sampling a PCollection while retrieving samples from multiple 
threads is ok.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentSamples() throws Exception {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000, 
1);

Review Comment:
   ```suggestion
       OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 
100000, 1);
   ```
   
   You can use a small maxElements size like 10. Using a bigger number may 
decrease contention. Also using sampleEveryN of 2 means that we swap between 
choosing to sample and not to sample often instead of sampling every element.



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
     // The first 10 are always sampled, but with maxSamples = 5, the first ten 
are downsampled to
     // 4..9 inclusive. Then,
     // the 20th element is sampled (19) and every 20 after.
-    List<byte[]> expected = new ArrayList<>();
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
     expected.add(encodeInt(19));
     expected.add(encodeInt(39));
     expected.add(encodeInt(59));
     expected.add(encodeInt(79));
     expected.add(encodeInt(99));
 
-    List<byte[]> samples = outputSampler.samples();
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
+
+  /**
+   * Test that sampling a PCollection while retrieving samples from multiple 
threads is ok.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentSamples() throws Exception {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000, 
1);
+
+    // Iteration count was empirically chosen to have a high probability of 
failure without the
+    // test going for too long.
+    Thread sampleThreadA =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 10000000; i++) {
+                outputSampler.sample(i);
+              }
+            });
+
+    Thread sampleThreadB =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 10000000; i++) {
+                outputSampler.sample(i);
+              }
+            });
+
+    sampleThreadA.start();
+    sampleThreadB.start();
+
+    for (int i = 0; i < 10000; i++) {
+      outputSampler.samples();
+    }

Review Comment:
   Grab samples until both of the above threads are done, no need to go to a 
fixed number.
   
   Also perform the validation that I described above on each of the output 
samples.



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
     // The first 10 are always sampled, but with maxSamples = 5, the first ten 
are downsampled to
     // 4..9 inclusive. Then,
     // the 20th element is sampled (19) and every 20 after.
-    List<byte[]> expected = new ArrayList<>();
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
     expected.add(encodeInt(19));
     expected.add(encodeInt(39));
     expected.add(encodeInt(59));
     expected.add(encodeInt(79));
     expected.add(encodeInt(99));
 
-    List<byte[]> samples = outputSampler.samples();
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
+
+  /**
+   * Test that sampling a PCollection while retrieving samples from multiple 
threads is ok.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentSamples() throws Exception {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000, 
1);
+
+    // Iteration count was empirically chosen to have a high probability of 
failure without the
+    // test going for too long.
+    Thread sampleThreadA =

Review Comment:
   You should use the count down latch to increase contention.



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -193,4 +221,44 @@ public void testFiltersMultiplePCollectionIds() throws 
Exception {
     assertHasSamples(samples, "a", ImmutableList.of(encodeString("a1"), 
encodeString("a2")));
     assertHasSamples(samples, "c", ImmutableList.of(encodeString("c1"), 
encodeString("c2")));
   }
+
+  /**
+   * Test that samples can be taken from the DataSampler while adding new 
OutputSamplers. This fails
+   * with a ConcurrentModificationException if there is a bug.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentNewSampler() throws Exception {
+    DataSampler sampler = new DataSampler();
+    VarIntCoder coder = VarIntCoder.of();
+
+    // Create a thread that constantly creates new samplers.
+    Thread sampleThread =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 1000000; i++) {
+                sampler.sampleOutput("pcollection-" + i, coder).sample(0);
+
+                // This sleep is here to allow for the test to stop this 
thread.
+                try {
+                  Thread.sleep(0);
+                } catch (InterruptedException e) {
+                  return;
+                }
+              }
+            });
+
+    sampleThread.start();
+
+    for (int i = 0; i < 20; i++) {
+      sampler.handleDataSampleRequest(

Review Comment:
   This tests concurrency between a single sampling thread and getting the 
progress request.
   
   You'll want to update this test to cover multiple threads (e.g. like 100) 
all creating the same set of 100 output samplers. You can have them all wait on 
a CountDownLatch which you release from the test thread before the output 
sampler creation starts allowing for all the threads to be ready to go (as done 
in 
https://github.com/apache/beam/blob/679d30256c6bd64d9760702c667d7d355e70166b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java#L94).
   
   At the same time you should continuously call handleDataSampleRequest until 
all the threads state has transitioned to TERMINATED (note that checking 
isAlive() can expose you to a race since start() doesn't mean that the thread 
is alive, just that the thread is scheduled to become alive at some point in 
the future). You can also ensure it is alive if you use another CountDownLatch 
to block the test thread from advancing to check if the sampler creating 
threads are alive as well. There are also futures as well.
   
   



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -193,4 +221,44 @@ public void testFiltersMultiplePCollectionIds() throws 
Exception {
     assertHasSamples(samples, "a", ImmutableList.of(encodeString("a1"), 
encodeString("a2")));
     assertHasSamples(samples, "c", ImmutableList.of(encodeString("c1"), 
encodeString("c2")));
   }
+
+  /**
+   * Test that samples can be taken from the DataSampler while adding new 
OutputSamplers. This fails
+   * with a ConcurrentModificationException if there is a bug.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentNewSampler() throws Exception {
+    DataSampler sampler = new DataSampler();
+    VarIntCoder coder = VarIntCoder.of();
+
+    // Create a thread that constantly creates new samplers.
+    Thread sampleThread =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 1000000; i++) {
+                sampler.sampleOutput("pcollection-" + i, coder).sample(0);
+
+                // This sleep is here to allow for the test to stop this 
thread.
+                try {
+                  Thread.sleep(0);
+                } catch (InterruptedException e) {
+                  return;
+                }

Review Comment:
   This and the interrupt below are unnecessary. Your test should be able to 
join all the sampler creating threads.
   ```suggestion
   ```



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java:
##########
@@ -80,14 +84,50 @@ public void testActsLikeCircularBuffer() throws Exception {
     // The first 10 are always sampled, but with maxSamples = 5, the first ten 
are downsampled to
     // 4..9 inclusive. Then,
     // the 20th element is sampled (19) and every 20 after.
-    List<byte[]> expected = new ArrayList<>();
+    List<BeamFnApi.SampledElement> expected = new ArrayList<>();
     expected.add(encodeInt(19));
     expected.add(encodeInt(39));
     expected.add(encodeInt(59));
     expected.add(encodeInt(79));
     expected.add(encodeInt(99));
 
-    List<byte[]> samples = outputSampler.samples();
+    List<BeamFnApi.SampledElement> samples = outputSampler.samples();
     assertThat(samples, containsInAnyOrder(expected.toArray()));
   }
+
+  /**
+   * Test that sampling a PCollection while retrieving samples from multiple 
threads is ok.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentSamples() throws Exception {
+    VarIntCoder coder = VarIntCoder.of();
+    OutputSampler<Integer> outputSampler = new OutputSampler<>(coder, 100000, 
1);
+
+    // Iteration count was empirically chosen to have a high probability of 
failure without the
+    // test going for too long.
+    Thread sampleThreadA =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 10000000; i++) {

Review Comment:
   If you have this thread produce elements between 1 and 100000 and the other 
thread produce elements between 100000 and 200000 then you can validate below 
that when you get the samples that the numbers in series A and in series B are 
always greater than the largest seen from the previous sample. The series could 
be negative numbers and positive numbers or even and odd, just something that 
allows you to know which thread it came from so you can perform the validation 
on it.



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -193,4 +221,44 @@ public void testFiltersMultiplePCollectionIds() throws 
Exception {
     assertHasSamples(samples, "a", ImmutableList.of(encodeString("a1"), 
encodeString("a2")));
     assertHasSamples(samples, "c", ImmutableList.of(encodeString("c1"), 
encodeString("c2")));
   }
+
+  /**
+   * Test that samples can be taken from the DataSampler while adding new 
OutputSamplers. This fails
+   * with a ConcurrentModificationException if there is a bug.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testConcurrentNewSampler() throws Exception {
+    DataSampler sampler = new DataSampler();
+    VarIntCoder coder = VarIntCoder.of();
+
+    // Create a thread that constantly creates new samplers.
+    Thread sampleThread =
+        new Thread(
+            () -> {
+              for (int i = 0; i < 1000000; i++) {
+                sampler.sampleOutput("pcollection-" + i, coder).sample(0);
+
+                // This sleep is here to allow for the test to stop this 
thread.
+                try {
+                  Thread.sleep(0);
+                } catch (InterruptedException e) {
+                  return;
+                }
+              }
+            });
+
+    sampleThread.start();
+
+    for (int i = 0; i < 20; i++) {
+      sampler.handleDataSampleRequest(
+          BeamFnApi.InstructionRequest.newBuilder()
+              .setSampleData(BeamFnApi.SampleDataRequest.newBuilder())
+              .build());
+    }
+
+    sampleThread.interrupt();

Review Comment:
   I don't think you need to interrupt since the thread will finish by itself. 
You should just invoke join below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to