rohdesamuel commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1080737501


##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of 
any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;

Review Comment:
   This is an interesting idea that I need to think about a bit more. The one 
outcome from separating elements in this way is that it forces mutually 
exclusive element types. The consequence being is that you can no longer have 
both a slow and and an error element. It's not impossible, but I think the 
categories would have to be broader than proposed, e.g. SuccessfulElement 
(meaning an element that processed normally) and ExceptionalElement 
(encompassing any non-normal processing).
   
   Another approach is to favor a "composition over inheritance". For example, 
have an element with proto extensions. Then we can have a single element with 
multiple attributes (slow, error, etc.) that can be queried.
   
   Another option is a hybrid of both, have two maps for normal and abnormal 
processing where an element can be queried for extensions.



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
     FinalizeBundleResponse finalize_bundle = 1004;
     MonitoringInfosMetadataResponse monitoring_infos = 1005;
     HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+    SampleResponse sample = 1007;
 
     // DEPRECATED
     RegisterResponse register = 1000;
   }
 }
 
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of 
any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+  // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+  // returns samples from all descriptors.
+  string process_bundle_descriptor_id = 1;
+
+  // Optional. The PCollections to sample from. If empty, returns all sampled
+  // PCollections from the given PBD. If both are empty, returns all samples.
+  repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+  // A sampled element. This is a proto message to allow for additional
+  // per-element metadata.
+  message Element {
+    // Required. Sampled raw bytes for an element. This is a
+    // single encoded element in the nested context.
+    bytes element = 1;
+
+    // FUTURE WORK: Capture lull detections and exceptions.
+    //
+    // Optional. Present if there was an exception
+    // processing the above element.
+    //
+    // LogEntry exception_entry = 2;
+  }
+
+  message ElementList {
+    // Required. The individual elements sampled from a PCollection.
+    repeated Element elements = 1;
+  }
+
+  // Required. Map from PCollection id to list of encoded elements using the
+  // associated Ccoder id for that PCollection.
+  map<string, ElementList> pcollections = 1;
+}
+
+// If supported, the `SampleResponse` will contain samples from all
+// ProcessBundleDescriptors.
+message SampleResponse {
+  // Required. Map from ProcessBundleDescriptor id to samples using that
+  // descriptor.
+  map<string, Samples> descriptors = 1;

Review Comment:
   Yeah, that's a good simplification. In the future, will SDKs be able to 
execute bundles from different pipelines? I'm just worried that not specifying 
the descriptor too may introduce unwanted behavior if that's the case.



##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -109,6 +109,7 @@ message InstructionRequest {
     FinalizeBundleRequest finalize_bundle = 1004;
     MonitoringInfosMetadataRequest monitoring_infos = 1005;
     HarnessMonitoringInfosRequest harness_monitoring_infos = 1006;
+    SampleRequest sample = 1007;

Review Comment:
   sgtm, replaced with SampleDataRequest 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to