lukecwik commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1099191235


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java:
##########
@@ -166,6 +157,8 @@ public class ProcessBundleHandler {
   @VisibleForTesting final BundleProcessorCache bundleProcessorCache;
   private final Set<String> runnerCapabilities;
 
+  private DataSampler dataSampler;

Review Comment:
   ```suggestion
     private final DataSampler dataSampler;
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates 
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to 
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different 
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a 
maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at 
once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;

Review Comment:
   ```suggestion
     private final int maxSamples = 10;
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates 
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to 
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different 
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a 
maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at 
once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, 
[PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This 
means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique 
per thread, so only
+  // synchronization
+  // is needed on the outermost map.
+  private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes 
when responding to a
+   * SampleDataRequest.
+   *
+   * @param processBundleDescriptorId The PBD to sample from.
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from 
a nested context.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   * @param <T> The type of element contained in the PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(
+      String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+    Map<String, OutputSampler<?>> samplers = 
outputSamplers.get(processBundleDescriptorId);
+    samplers.putIfAbsent(
+        pcollectionId, new OutputSampler<T>(coder, this.maxSamples, 
this.sampleEveryN));
+
+    return (OutputSampler<T>) samplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on 
the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(
+            
ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),

Review Comment:
   why copy?



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates 
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to 
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different 
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a 
maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at 
once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, 
[PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This 
means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique 
per thread, so only

Review Comment:
   This is not true, ProcessBundleDescriptors can be re-used across multiple 
process bundle requests. We see this regularly on Dataflow streaming pipelines 
and under some circumstances on batch pipelines.
   
   This is true for ProcessBundleIds but I don't think that is what you want.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates 
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to 
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different 
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a 
maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at 
once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;

Review Comment:
   ```suggestion
     private final int sampleEveryN = 1000;
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the 
parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old 
values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private final Coder<T> coder;
+  private final List<T> buffer = new ArrayList<>();
+  private static final Logger LOG = 
LoggerFactory.getLogger(OutputSampler.class);
+
+  // Maximum number of elements in buffer.
+  private int maxElements = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // Total number of samples taken.
+  private long numSamples = 0;
+
+  // Index into the buffer of where to overwrite samples.
+  private int resampleIndex = 0;
+
+  public OutputSampler(Coder<T> coder) {
+    this.coder = coder;
+  }
+
+  public OutputSampler(Coder<T> coder, int maxElements, int sampleEveryN) {
+    this(coder);
+    this.maxElements = maxElements;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  /**
+   * Samples every 1000th element or if it is part of the first 10 in the 
(local) PCollection.
+   *
+   * @param element the element to sample.
+   */
+  public void sample(T element) {
+    // Only sample the first 10 elements then after every `sampleEveryN`th 
element.
+    numSamples += 1;
+    if (numSamples > 10 && numSamples % sampleEveryN != 0) {
+      return;
+    }
+
+    // Fill buffer until maxElements.
+    if (buffer.size() < maxElements) {
+      buffer.add(element);
+    } else {
+      // Then rewrite sampled elements as a circular buffer.
+      buffer.set(resampleIndex, element);
+      resampleIndex = (resampleIndex + 1) % maxElements;
+    }
+  }
+
+  /**
+   * Clears samples at end of call. This is to help mitigate memory use.
+   *
+   * @return samples taken since last call.
+   */
+  public List<byte[]> samples() {

Review Comment:
   note that sample() and samples() will be invoked by two different threads 
(say T1 and T2).
   
   sample() is invoked a lot and samples() is invoked rarely. Currently there 
is no synchronization between T1 and T2 which means that the changes from T1 do 
not have to be made visible to the T2 and vice versa. It will be dependent on 
the CPU architecture and the contents of the registers/L* caches.
   
   Note that clear() by T2 in the current implementation is unlikely to be seen 
by T1 and will be overwritten whenever T1 adds a value.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSamplingDescriptorModifier.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import org.apache.beam.fn.harness.ProcessBundleDescriptorModifier;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+
+/**
+ * Modifies the given ProcessBundleDescriptor by adding a DataSampling 
operation as a consumer to
+ * every PCollection.
+ */
+public class DataSamplingDescriptorModifier implements 
ProcessBundleDescriptorModifier {

Review Comment:
   Why did you want to insert this as a separate transform in the graph instead 
of modifying 
https://github.com/apache/beam/blob/d20d0b01c3c6bcde551420f36e13d794c930f1e2/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java#L174
 to support passing elements to the OutputSampler or being an OutputSampler 
itself?
   
   Also typically we would ask the runner to do these PBD modifications itself 
so it could selectively choose which PCollections to sample and which not to. 
This would also remove the need for the SDK to check for an experiment, allow 
the runner to choose which to sample without needing to pass in PCollection ids 
(the PBD ids would be enough). There is a cost though since transforms impose 
msec and state transition book keeping overhead which is not insignificant and 
you'll make the current single consumer hot-path in PCollectionConsumerRegistry 
always use the multi-consumer variant which has additional overhead.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the 
parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old 
values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private final Coder<T> coder;
+  private final List<T> buffer = new ArrayList<>();

Review Comment:
   setup the capacity of the buffer so we don't pay and resizing costs
   ```suggestion
     private final List<T> buffer = new ArrayList<>(maxElements);
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates 
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to 
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different 
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a 
maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at 
once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, 
[PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This 
means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique 
per thread, so only
+  // synchronization
+  // is needed on the outermost map.
+  private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes 
when responding to a
+   * SampleDataRequest.
+   *
+   * @param processBundleDescriptorId The PBD to sample from.
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from 
a nested context.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   * @param <T> The type of element contained in the PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(
+      String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+    Map<String, OutputSampler<?>> samplers = 
outputSamplers.get(processBundleDescriptorId);
+    samplers.putIfAbsent(
+        pcollectionId, new OutputSampler<T>(coder, this.maxSamples, 
this.sampleEveryN));
+
+    return (OutputSampler<T>) samplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on 
the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(
+            
ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),
+            ImmutableSet.copyOf(sampleDataRequest.getPcollectionIdsList()));
+
+    BeamFnApi.SampleDataResponse.Builder response = 
BeamFnApi.SampleDataResponse.newBuilder();
+    for (String pcollectionId : responseSamples.keySet()) {
+      ElementList.Builder elementList = ElementList.newBuilder();
+      for (byte[] sample : responseSamples.get(pcollectionId)) {
+        elementList.addElements(
+            
SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+      }
+      response.putElementSamples(pcollectionId, elementList.build());
+    }
+
+    return BeamFnApi.InstructionResponse.newBuilder().setSample(response);
+  }
+
+  /**
+   * Returns a map from PCollection to its samples. Samples are filtered on
+   * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+   *
+   * @param descriptors PCollections under each PBD id will be unioned. If 
empty, allows all
+   *     descriptors.
+   * @param pcollections Filters all PCollections on this set. If empty, 
allows all PCollections.
+   * @return a map from PCollection to its samples.
+   */
+  public Map<String, List<byte[]>> samplesFor(Set<String> descriptors, 
Set<String> pcollections) {
+    Map<String, List<byte[]>> samples = new HashMap<>();
+
+    // Safe to iterate as the ConcurrentHashMap will return each element at 
most once and will not
+    // throw
+    // ConcurrentModificationException.
+    outputSamplers.forEach(
+        (descriptorId, samplers) -> {
+          if (!descriptors.isEmpty() && !descriptors.contains(descriptorId)) {
+            return;
+          }
+
+          samplers.forEach(
+              (pcollectionId, outputSampler) -> {
+                if (!pcollections.isEmpty() && 
!pcollections.contains(pcollectionId)) {
+                  return;
+                }
+
+                samples.putIfAbsent(pcollectionId, new ArrayList<>());
+                samples.get(pcollectionId).addAll(outputSampler.samples());
+              });
+        });
+
+    return samples;
+  }
+
+  /** @return samples from all PBDs and all PCollections. */
+  public Map<String, List<byte[]>> allSamples() {
+    return samplesFor(ImmutableSet.of(), ImmutableSet.of());
+  }
+
+  /**
+   * @param descriptors PBDs to filter on.
+   * @return samples only from the given descriptors.
+   */
+  public Map<String, List<byte[]>> samplesForDescriptors(Set<String> 
descriptors) {
+    return samplesFor(descriptors, ImmutableSet.of());
+  }
+
+  /**
+   * @param pcollections PCollection ids to filter on.
+   * @return samples only from the given PCollections.
+   */
+  public Map<String, List<byte[]>> samplesForPCollections(Set<String> 
pcollections) {
+    return samplesFor(ImmutableSet.of(), pcollections);
+  }

Review Comment:
   There are only used during testing, move them into the test class



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates 
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to 
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different 
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a 
maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at 
once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, 
[PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This 
means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique 
per thread, so only
+  // synchronization
+  // is needed on the outermost map.
+  private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes 
when responding to a
+   * SampleDataRequest.
+   *
+   * @param processBundleDescriptorId The PBD to sample from.
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from 
a nested context.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   * @param <T> The type of element contained in the PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(
+      String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+    Map<String, OutputSampler<?>> samplers = 
outputSamplers.get(processBundleDescriptorId);
+    samplers.putIfAbsent(
+        pcollectionId, new OutputSampler<T>(coder, this.maxSamples, 
this.sampleEveryN));
+
+    return (OutputSampler<T>) samplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on 
the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(
+            
ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),
+            ImmutableSet.copyOf(sampleDataRequest.getPcollectionIdsList()));
+
+    BeamFnApi.SampleDataResponse.Builder response = 
BeamFnApi.SampleDataResponse.newBuilder();
+    for (String pcollectionId : responseSamples.keySet()) {
+      ElementList.Builder elementList = ElementList.newBuilder();
+      for (byte[] sample : responseSamples.get(pcollectionId)) {
+        elementList.addElements(
+            
SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+      }
+      response.putElementSamples(pcollectionId, elementList.build());
+    }
+
+    return BeamFnApi.InstructionResponse.newBuilder().setSample(response);
+  }
+
+  /**
+   * Returns a map from PCollection to its samples. Samples are filtered on
+   * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+   *
+   * @param descriptors PCollections under each PBD id will be unioned. If 
empty, allows all
+   *     descriptors.
+   * @param pcollections Filters all PCollections on this set. If empty, 
allows all PCollections.
+   * @return a map from PCollection to its samples.
+   */
+  public Map<String, List<byte[]>> samplesFor(Set<String> descriptors, 
Set<String> pcollections) {

Review Comment:
   don't make this public, instead have the test create and a request and 
return a response having the tests validate the response itself



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates 
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to 
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different 
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /** Creates a DataSampler to sample every 10 elements while keeping a 
maximum of 10 in memory. */
+  public DataSampler() {}
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at 
once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private int maxSamples = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;
+
+  // The fully-qualified type is: Map[ProcessBundleDescriptorId, 
[PCollectionId, OutputSampler]].
+  // The DataSampler object lives on the same level of the FnHarness. This 
means that many threads
+  // can and will
+  // access this simultaneously. However, ProcessBundleDescriptors are unique 
per thread, so only
+  // synchronization
+  // is needed on the outermost map.
+  private final Map<String, Map<String, OutputSampler<?>>> outputSamplers =
+      new ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes 
when responding to a
+   * SampleDataRequest.
+   *
+   * @param processBundleDescriptorId The PBD to sample from.
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from 
a nested context.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   * @param <T> The type of element contained in the PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(
+      String processBundleDescriptorId, String pcollectionId, Coder<T> coder) {
+    outputSamplers.putIfAbsent(processBundleDescriptorId, new HashMap<>());
+    Map<String, OutputSampler<?>> samplers = 
outputSamplers.get(processBundleDescriptorId);
+    samplers.putIfAbsent(
+        pcollectionId, new OutputSampler<T>(coder, this.maxSamples, 
this.sampleEveryN));
+
+    return (OutputSampler<T>) samplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on 
the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSample();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(
+            
ImmutableSet.copyOf(sampleDataRequest.getProcessBundleDescriptorIdsList()),
+            ImmutableSet.copyOf(sampleDataRequest.getPcollectionIdsList()));
+
+    BeamFnApi.SampleDataResponse.Builder response = 
BeamFnApi.SampleDataResponse.newBuilder();
+    for (String pcollectionId : responseSamples.keySet()) {
+      ElementList.Builder elementList = ElementList.newBuilder();
+      for (byte[] sample : responseSamples.get(pcollectionId)) {
+        elementList.addElements(
+            
SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+      }
+      response.putElementSamples(pcollectionId, elementList.build());
+    }
+
+    return BeamFnApi.InstructionResponse.newBuilder().setSample(response);
+  }
+
+  /**
+   * Returns a map from PCollection to its samples. Samples are filtered on
+   * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+   *
+   * @param descriptors PCollections under each PBD id will be unioned. If 
empty, allows all
+   *     descriptors.
+   * @param pcollections Filters all PCollections on this set. If empty, 
allows all PCollections.
+   * @return a map from PCollection to its samples.
+   */
+  public Map<String, List<byte[]>> samplesFor(Set<String> descriptors, 
Set<String> pcollections) {
+    Map<String, List<byte[]>> samples = new HashMap<>();
+
+    // Safe to iterate as the ConcurrentHashMap will return each element at 
most once and will not
+    // throw
+    // ConcurrentModificationException.
+    outputSamplers.forEach(
+        (descriptorId, samplers) -> {
+          if (!descriptors.isEmpty() && !descriptors.contains(descriptorId)) {
+            return;
+          }
+
+          samplers.forEach(
+              (pcollectionId, outputSampler) -> {
+                if (!pcollections.isEmpty() && 
!pcollections.contains(pcollectionId)) {
+                  return;
+                }
+
+                samples.putIfAbsent(pcollectionId, new ArrayList<>());

Review Comment:
   ```suggestion
                   samples.putIfAbsent(pcollectionId, Collections.EMPTY_LIST);
   ```



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the 
parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old 
values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private final Coder<T> coder;
+  private final List<T> buffer = new ArrayList<>();
+  private static final Logger LOG = 
LoggerFactory.getLogger(OutputSampler.class);
+
+  // Maximum number of elements in buffer.
+  private int maxElements = 10;
+
+  // Sampling rate.
+  private int sampleEveryN = 1000;

Review Comment:
   shouldn't this be passed in by DataSampler?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to