lukecwik commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1074014043
##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -109,6 +109,7 @@ message InstructionRequest {
FinalizeBundleRequest finalize_bundle = 1004;
MonitoringInfosMetadataRequest monitoring_infos = 1005;
HarnessMonitoringInfosRequest harness_monitoring_infos = 1006;
+ SampleRequest sample = 1007;
Review Comment:
SampleRequest is super vague, what about something like
SamplePCollectionRequest or SampleDataRequest?
ditto for response
##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
FinalizeBundleResponse finalize_bundle = 1004;
MonitoringInfosMetadataResponse monitoring_infos = 1005;
HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+ SampleResponse sample = 1007;
// DEPRECATED
RegisterResponse register = 1000;
}
}
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of
any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+ // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+ // returns samples from all descriptors.
+ string process_bundle_descriptor_id = 1;
Review Comment:
Should this be a list as well?
```suggestion
// (Optional) The ProcessBundleDescriptor ids to filter for.
repeated string process_bundle_descriptor_ids = 1;
```
##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
FinalizeBundleResponse finalize_bundle = 1004;
MonitoringInfosMetadataResponse monitoring_infos = 1005;
HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+ SampleResponse sample = 1007;
// DEPRECATED
RegisterResponse register = 1000;
}
}
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of
any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+ // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+ // returns samples from all descriptors.
+ string process_bundle_descriptor_id = 1;
+
+ // Optional. The PCollections to sample from. If empty, returns all sampled
+ // PCollections from the given PBD. If both are empty, returns all samples.
Review Comment:
```suggestion
// (Optional) The PCollection ids to filter for.
```
##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
FinalizeBundleResponse finalize_bundle = 1004;
MonitoringInfosMetadataResponse monitoring_infos = 1005;
HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+ SampleResponse sample = 1007;
// DEPRECATED
RegisterResponse register = 1000;
}
}
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of
any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+ // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+ // returns samples from all descriptors.
+ string process_bundle_descriptor_id = 1;
+
+ // Optional. The PCollections to sample from. If empty, returns all sampled
+ // PCollections from the given PBD. If both are empty, returns all samples.
+ repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+ // A sampled element. This is a proto message to allow for additional
+ // per-element metadata.
+ message Element {
+ // Required. Sampled raw bytes for an element. This is a
+ // single encoded element in the nested context.
+ bytes element = 1;
Review Comment:
I think it would make sense to have different lists for arbitrary samples,
slow elements and elements that errored out instead of creating one giant
element that has all the permutations.
This way we can create a different element like like:
SlowElementList that contains SlowElement
ErroredElementList that contains ErorredElement
and down in SampleResponse we would add maps that represent these as we get
new use cases. You could have them commented out for now so that when someone
works to expand this feature they will have our thought already laid out on
what we expect from the data structure.
##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,64 @@ message InstructionResponse {
FinalizeBundleResponse finalize_bundle = 1004;
MonitoringInfosMetadataResponse monitoring_infos = 1005;
HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+ SampleResponse sample = 1007;
// DEPRECATED
RegisterResponse register = 1000;
}
}
+// If supported, the `SampleRequest` will respond with a `SampleResponse` of
any
+// sampled elements for the PCollections within the given
+// ProcessBundleDescriptor. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability.
+message SampleRequest {
+ // Required. The id of the ProcessBundleDescriptor to sample. If empty,
+ // returns samples from all descriptors.
+ string process_bundle_descriptor_id = 1;
+
+ // Optional. The PCollections to sample from. If empty, returns all sampled
+ // PCollections from the given PBD. If both are empty, returns all samples.
+ repeated string pcollection_ids = 2;
+}
+
+// Contains sampled elements from all PCollections from a single
+// ProcessBundleDescriptor when the SDK is processing a bundle.
+message Samples {
+
+ // A sampled element. This is a proto message to allow for additional
+ // per-element metadata.
+ message Element {
+ // Required. Sampled raw bytes for an element. This is a
+ // single encoded element in the nested context.
+ bytes element = 1;
+
+ // FUTURE WORK: Capture lull detections and exceptions.
+ //
+ // Optional. Present if there was an exception
+ // processing the above element.
+ //
+ // LogEntry exception_entry = 2;
+ }
+
+ message ElementList {
+ // Required. The individual elements sampled from a PCollection.
+ repeated Element elements = 1;
+ }
+
+ // Required. Map from PCollection id to list of encoded elements using the
+ // associated Ccoder id for that PCollection.
+ map<string, ElementList> pcollections = 1;
+}
+
+// If supported, the `SampleResponse` will contain samples from all
+// ProcessBundleDescriptors.
+message SampleResponse {
+ // Required. Map from ProcessBundleDescriptor id to samples using that
+ // descriptor.
+ map<string, Samples> descriptors = 1;
Review Comment:
Note that PCollection ids are meant to be globally unique even during
execution so you should be able to do something like:
```suggestion
// Map from PCollection id to sample elements.
map<string, ElementList> element_samples = 1;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]