lukecwik commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1084624554


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.
+message SampleDataRequest {
+  // (Optional) The ProcessBundleDescriptor ids to filter for.
+  repeated string process_bundle_descriptor_ids = 1;
+
+  // (Optional) The PCollection ids to filter for.
+  repeated string pcollection_ids = 2;
+}
+
+
+// A element sampled when the SDK is processing a bundle. This is a proto

Review Comment:
   ```suggestion
   // An element sampled when the SDK is processing a bundle. This is a proto
   ```



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.
+message SampleDataRequest {
+  // (Optional) The ProcessBundleDescriptor ids to filter for.
+  repeated string process_bundle_descriptor_ids = 1;
+
+  // (Optional) The PCollection ids to filter for.
+  repeated string pcollection_ids = 2;
+}
+
+
+// A element sampled when the SDK is processing a bundle. This is a proto
+// message to allow for additional per-element metadata.
+message SampledElement {
+  // Required. Sampled raw bytes for an element. This is a
+  // single encoded element in the nested context.
+  bytes element = 1;
+
+  // FUTURE WORK: Capture lull detections and exceptions.
+  //
+  // Optional. Present if there was an exception
+  // processing the above element.
+  //
+  // LogEntry exception_entry = 2;
+}
+
+// If supported, the `SampleDataResponse` will contain samples from all
+// ProcessBundleDescriptors.
+message SampleDataResponse {
+  message ElementList {
+    // Required. The individual elements sampled from a PCollection.
+    repeated SampledElement elements = 1;
+  }
+
+  // Map from PCollection id to sample elements.
+  map<string, ElementList> element_samples = 1;
+
+  // FUTURE WORK: Investigate ways of storing multiple interesting types of
+  // sampled elements. There are two ways of accomplishing this:
+  // 1) Maps of typed elements: include multiple maps here with typed element
+  // proto messages, ex.
+  //
+  // message SlowElement {...}
+  // message ErroredElement {...}
+  // map<string, SlowElement> slow_elements
+  // map<string, ErroredElement> errored_elements
+  //
+  // However, this forces an element into a single category. It disallows
+  // classification across multiple characteristics (like a slow and errored
+  // element).
+  //
+  // 2) Compositional types: allow for Protobuf Extensions on the base

Review Comment:
   proto3 discourages extensions and prefers people use Any which is a poor 
version of using URN + payload.



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.

Review Comment:
   ```suggestion
   // `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list 
will
   // match everything.
   ```



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.
+message SampleDataRequest {
+  // (Optional) The ProcessBundleDescriptor ids to filter for.
+  repeated string process_bundle_descriptor_ids = 1;
+
+  // (Optional) The PCollection ids to filter for.
+  repeated string pcollection_ids = 2;
+}
+
+
+// A element sampled when the SDK is processing a bundle. This is a proto
+// message to allow for additional per-element metadata.
+message SampledElement {
+  // Required. Sampled raw bytes for an element. This is a
+  // single encoded element in the nested context.
+  bytes element = 1;
+
+  // FUTURE WORK: Capture lull detections and exceptions.
+  //
+  // Optional. Present if there was an exception
+  // processing the above element.
+  //
+  // LogEntry exception_entry = 2;
+}
+
+// If supported, the `SampleDataResponse` will contain samples from all
+// ProcessBundleDescriptors.

Review Comment:
   ```suggestion
   // If supported, the `SampleDataResponse` will contain samples from 
PCollections
   // based upon the filters specified in the request.
   ```



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleDataResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can

Review Comment:
   Are you use you want AND here vs OR?
   
   What kinds of lookups do you think runners will ask for?



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto:
##########
@@ -1638,6 +1638,14 @@ message StandardProtocols {
     // SDKs ability to store the data in memory reducing the amount of memory
     // used overall.
     STATE_CACHING = 7 [(beam_urn) = "beam:protocol:state_caching:v1"];
+
+    // Indicates that this SDK can sample in-flight elements. These samples can
+    // then be queried using the SampleRequest. Samples are uniquely associated
+    // with (ProcessBundleDescriptor::id, PCollection::id). Meaning, samples
+    // are taken for each PCollection in every ProcessBundleDescriptor. This
+    // is disabled by default and enabled with the enable_data_sampling
+    // experiment.

Review Comment:
   ```suggestion
       // Indicates that this SDK can sample in-flight elements. These samples 
can
       // then be queried using the SampleDataRequest. Samples are uniquely 
associated
       // with a PCollection. Meaning, samples are taken for each PCollection
       // during bundle processing. This is disabled by default and enabled 
with the
       // `enable_data_sampling` experiment.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to