rohdesamuel commented on code in PR #25354:
URL: https://github.com/apache/beam/pull/25354#discussion_r1106533556


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java:
##########
@@ -349,6 +386,10 @@ public void accept(WindowedValue<T> input) throws 
Exception {
       // when we have window optimization.
       this.sampledByteSizeDistribution.tryUpdate(input.getValue(), coder);
 
+      if (outputSampler != null) {
+        outputSampler.sample(input.getValue());

Review Comment:
   I do, but I don't want to alter the encoding of the sampled bytes when 
sending back to the runner. It's not impossible, but I've found that there are 
a lot of mines when changing encodings.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -89,7 +92,9 @@ public class FnHarness {
   private static final String STATUS_API_SERVICE_DESCRIPTOR = 
"STATUS_API_SERVICE_DESCRIPTOR";
   private static final String PIPELINE_OPTIONS = "PIPELINE_OPTIONS";
   private static final String RUNNER_CAPABILITIES = "RUNNER_CAPABILITIES";
+  private static final String ENABLE_DATA_SAMPLING_EXPERIMENT = 
"enable_data_sampling";
   private static final Logger LOG = LoggerFactory.getLogger(FnHarness.class);
+  private static final DataSampler dataSampler = new DataSampler();

Review Comment:
   sg, done



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates 
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to 
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different 
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /**
+   * Creates a DataSampler to sample every 1000 elements while keeping a 
maximum of 10 in memory.
+   */
+  public DataSampler() {
+    this.maxSamples = 10;
+    this.sampleEveryN = 1000;
+  }
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at 
once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private final int maxSamples;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // The fully-qualified type is: Map[PCollectionId, OutputSampler]. In order 
to sample
+  // on a PCollection-basis and not per-bundle, this keeps track of shared 
samples between states.
+  private final Map<String, OutputSampler<?>> outputSamplers = new 
ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes 
when responding to a
+   * SampleDataRequest.
+   *
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from 
a nested context.
+   * @param <T> The type of element contained in the PCollection.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(String pcollectionId, Coder<T> 
coder) {
+    outputSamplers.putIfAbsent(

Review Comment:
   done



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/DataSampler.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import 
org.apache.beam.model.fnexecution.v1.BeamFnApi.SampleDataResponse.ElementList;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.SampledElement;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+
+/**
+ * The DataSampler is a global (per SDK Harness) object that facilitates 
taking and returning
+ * samples to the Runner Harness. The class is thread-safe with respect to 
executing
+ * ProcessBundleDescriptors. Meaning, different threads executing different 
PBDs can sample
+ * simultaneously, even if computing the same logical PCollection.
+ */
+public class DataSampler {
+
+  /**
+   * Creates a DataSampler to sample every 1000 elements while keeping a 
maximum of 10 in memory.
+   */
+  public DataSampler() {
+    this.maxSamples = 10;
+    this.sampleEveryN = 1000;
+  }
+
+  /**
+   * @param maxSamples Sets the maximum number of samples held in memory at 
once.
+   * @param sampleEveryN Sets how often to sample.
+   */
+  public DataSampler(int maxSamples, int sampleEveryN) {
+    this.maxSamples = maxSamples;
+    this.sampleEveryN = sampleEveryN;
+  }
+
+  // Maximum number of elements in buffer.
+  private final int maxSamples;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // The fully-qualified type is: Map[PCollectionId, OutputSampler]. In order 
to sample
+  // on a PCollection-basis and not per-bundle, this keeps track of shared 
samples between states.
+  private final Map<String, OutputSampler<?>> outputSamplers = new 
ConcurrentHashMap<>();
+
+  /**
+   * Creates and returns a class to sample the given PCollection in the given
+   * ProcessBundleDescriptor. Uses the given coder encode samples as bytes 
when responding to a
+   * SampleDataRequest.
+   *
+   * @param pcollectionId The PCollection to take intermittent samples from.
+   * @param coder The coder associated with the PCollection. Coder may be from 
a nested context.
+   * @param <T> The type of element contained in the PCollection.
+   * @return the OutputSampler corresponding to the unique PBD and PCollection.
+   */
+  public <T> OutputSampler<T> sampleOutput(String pcollectionId, Coder<T> 
coder) {
+    outputSamplers.putIfAbsent(
+        pcollectionId, new OutputSampler<>(coder, this.maxSamples, 
this.sampleEveryN));
+    return (OutputSampler<T>) outputSamplers.get(pcollectionId);
+  }
+
+  /**
+   * Returns all collected samples. Thread-safe.
+   *
+   * @param request The instruction request from the FnApi. Filters based on 
the given
+   *     SampleDataRequest.
+   * @return Returns all collected samples.
+   */
+  public BeamFnApi.InstructionResponse.Builder handleDataSampleRequest(
+      BeamFnApi.InstructionRequest request) {
+    BeamFnApi.SampleDataRequest sampleDataRequest = request.getSampleData();
+
+    Map<String, List<byte[]>> responseSamples =
+        samplesFor(sampleDataRequest.getPcollectionIdsList());
+
+    BeamFnApi.SampleDataResponse.Builder response = 
BeamFnApi.SampleDataResponse.newBuilder();
+    for (String pcollectionId : responseSamples.keySet()) {
+      ElementList.Builder elementList = ElementList.newBuilder();
+      for (byte[] sample : responseSamples.get(pcollectionId)) {
+        elementList.addElements(
+            
SampledElement.newBuilder().setElement(ByteString.copyFrom(sample)).build());
+      }
+      response.putElementSamples(pcollectionId, elementList.build());
+    }
+
+    return BeamFnApi.InstructionResponse.newBuilder().setSampleData(response);
+  }
+
+  /**
+   * Returns a map from PCollection to its samples. Samples are filtered on
+   * ProcessBundleDescriptorIds and PCollections. Thread-safe.
+   *
+   * @param pcollections Filters all PCollections on this set. If empty, 
allows all PCollections.
+   * @return a map from PCollection to its samples.
+   */
+  private Map<String, List<byte[]>> samplesFor(List<String> pcollections) {

Review Comment:
   done



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.coders.Coder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class holds samples for a single PCollection until queried by the 
parent DataSampler. This
+ * class is meant to hold only a limited number of elements in memory. So old 
values are constantly
+ * being overridden in a circular buffer.
+ *
+ * @param <T> the element type of the PCollection.
+ */
+public class OutputSampler<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(OutputSampler.class);
+
+  // Temporarily holds elements until the SDK receives a sample data request.
+  private final List<T> buffer;
+
+  // Maximum number of elements in buffer.
+  private final int maxElements;
+
+  // Sampling rate.
+  private final int sampleEveryN;
+
+  // Total number of samples taken.
+  private final AtomicLong numSamples = new AtomicLong();
+
+  // Index into the buffer of where to overwrite samples.
+  private int resampleIndex = 0;
+
+  private final Coder<T> coder;
+
+  public OutputSampler(Coder<T> coder, int maxElements, int sampleEveryN) {
+    this.coder = coder;
+    this.maxElements = maxElements;
+    this.sampleEveryN = sampleEveryN;
+    this.buffer = new ArrayList<>(this.maxElements);
+  }
+
+  /**
+   * Samples every 1000th element or if it is part of the first 10 in the 
(local) PCollection.
+   *
+   * @param element the element to sample.
+   */
+  public void sample(T element) {
+    // Only sample the first 10 elements then after every `sampleEveryN`th 
element.
+    long samples = numSamples.get() + 1;
+
+    // This has eventual consistency. If there are many threads lazy setting, 
this will be set to
+    // the slowest thread accessing the atomic. But over time, it will still 
increase. This is ok
+    // because this is a debugging feature and doesn't need strict atomics.
+    numSamples.lazySet(samples);
+    if (samples > 10 && samples % sampleEveryN != 0) {
+      return;
+    }
+
+    synchronized (this) {
+      // Fill buffer until maxElements.
+      if (buffer.size() < maxElements) {
+        buffer.add(element);
+      } else {
+        // Then rewrite sampled elements as a circular buffer.
+        buffer.set(resampleIndex, element);
+        resampleIndex = (resampleIndex + 1) % maxElements;
+      }
+    }
+  }
+
+  /**
+   * Clears samples at end of call. This is to help mitigate memory use.
+   *
+   * @return samples taken since last call.
+   */
+  public List<byte[]> samples() {
+    List<byte[]> ret = new ArrayList<>();
+
+    // Serializing can take a lot of CPU time for larger or complex elements. 
Copy the array here
+    // so as to not slow down the main processing hot path.
+    List<T> copiedBuffer;
+    synchronized (this) {
+      copiedBuffer = new ArrayList<>(buffer);
+      buffer.clear();
+      resampleIndex = 0;
+    }
+
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();

Review Comment:
   done, also changed to return SampleElements



##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/DataSamplerTest.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.debug;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataSamplerTest {
+  byte[] encodeInt(Integer i) throws IOException {
+    VarIntCoder coder = VarIntCoder.of();
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    coder.encode(i, stream);

Review Comment:
   done, added a test for this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to