lukecwik commented on code in PR #25065:
URL: https://github.com/apache/beam/pull/25065#discussion_r1084624554
##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
FinalizeBundleResponse finalize_bundle = 1004;
MonitoringInfosMetadataResponse monitoring_infos = 1005;
HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+ SampleDataResponse sample = 1007;
// DEPRECATED
RegisterResponse register = 1000;
}
}
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.
+message SampleDataRequest {
+ // (Optional) The ProcessBundleDescriptor ids to filter for.
+ repeated string process_bundle_descriptor_ids = 1;
+
+ // (Optional) The PCollection ids to filter for.
+ repeated string pcollection_ids = 2;
+}
+
+
+// A element sampled when the SDK is processing a bundle. This is a proto
Review Comment:
```suggestion
// An element sampled when the SDK is processing a bundle. This is a proto
```
##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
FinalizeBundleResponse finalize_bundle = 1004;
MonitoringInfosMetadataResponse monitoring_infos = 1005;
HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+ SampleDataResponse sample = 1007;
// DEPRECATED
RegisterResponse register = 1000;
}
}
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.
+message SampleDataRequest {
+ // (Optional) The ProcessBundleDescriptor ids to filter for.
+ repeated string process_bundle_descriptor_ids = 1;
+
+ // (Optional) The PCollection ids to filter for.
+ repeated string pcollection_ids = 2;
+}
+
+
+// A element sampled when the SDK is processing a bundle. This is a proto
+// message to allow for additional per-element metadata.
+message SampledElement {
+ // Required. Sampled raw bytes for an element. This is a
+ // single encoded element in the nested context.
+ bytes element = 1;
+
+ // FUTURE WORK: Capture lull detections and exceptions.
+ //
+ // Optional. Present if there was an exception
+ // processing the above element.
+ //
+ // LogEntry exception_entry = 2;
+}
+
+// If supported, the `SampleDataResponse` will contain samples from all
+// ProcessBundleDescriptors.
+message SampleDataResponse {
+ message ElementList {
+ // Required. The individual elements sampled from a PCollection.
+ repeated SampledElement elements = 1;
+ }
+
+ // Map from PCollection id to sample elements.
+ map<string, ElementList> element_samples = 1;
+
+ // FUTURE WORK: Investigate ways of storing multiple interesting types of
+ // sampled elements. There are two ways of accomplishing this:
+ // 1) Maps of typed elements: include multiple maps here with typed element
+ // proto messages, ex.
+ //
+ // message SlowElement {...}
+ // message ErroredElement {...}
+ // map<string, SlowElement> slow_elements
+ // map<string, ErroredElement> errored_elements
+ //
+ // However, this forces an element into a single category. It disallows
+ // classification across multiple characteristics (like a slow and errored
+ // element).
+ //
+ // 2) Compositional types: allow for Protobuf Extensions on the base
Review Comment:
proto3 discourages extensions and prefers people use Any which is a poor
version of using URN + payload.
##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
FinalizeBundleResponse finalize_bundle = 1004;
MonitoringInfosMetadataResponse monitoring_infos = 1005;
HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+ SampleDataResponse sample = 1007;
// DEPRECATED
RegisterResponse register = 1000;
}
}
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.
Review Comment:
```suggestion
// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list
will
// match everything.
```
##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
FinalizeBundleResponse finalize_bundle = 1004;
MonitoringInfosMetadataResponse monitoring_infos = 1005;
HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+ SampleDataResponse sample = 1007;
// DEPRECATED
RegisterResponse register = 1000;
}
}
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
+// match anything.
+message SampleDataRequest {
+ // (Optional) The ProcessBundleDescriptor ids to filter for.
+ repeated string process_bundle_descriptor_ids = 1;
+
+ // (Optional) The PCollection ids to filter for.
+ repeated string pcollection_ids = 2;
+}
+
+
+// A element sampled when the SDK is processing a bundle. This is a proto
+// message to allow for additional per-element metadata.
+message SampledElement {
+ // Required. Sampled raw bytes for an element. This is a
+ // single encoded element in the nested context.
+ bytes element = 1;
+
+ // FUTURE WORK: Capture lull detections and exceptions.
+ //
+ // Optional. Present if there was an exception
+ // processing the above element.
+ //
+ // LogEntry exception_entry = 2;
+}
+
+// If supported, the `SampleDataResponse` will contain samples from all
+// ProcessBundleDescriptors.
Review Comment:
```suggestion
// If supported, the `SampleDataResponse` will contain samples from
PCollections
// based upon the filters specified in the request.
```
##########
model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto:
##########
@@ -137,12 +138,79 @@ message InstructionResponse {
FinalizeBundleResponse finalize_bundle = 1004;
MonitoringInfosMetadataResponse monitoring_infos = 1005;
HarnessMonitoringInfosResponse harness_monitoring_infos = 1006;
+ SampleDataResponse sample = 1007;
// DEPRECATED
RegisterResponse register = 1000;
}
}
+// If supported, the `SampleDataRequest` will respond with a
+// `SampleDataResponse`. The SDK being queried must have the
+// "beam:protocol:data_sampling:v1" capability. The samples in the
+// `SampleDataResponse` will be filtered by both being in the
+// `process_bundle_descriptor_ids` AND the `pcollection_ids`. An empty list can
Review Comment:
Are you use you want AND here vs OR?
What kinds of lookups do you think runners will ask for?
##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto:
##########
@@ -1638,6 +1638,14 @@ message StandardProtocols {
// SDKs ability to store the data in memory reducing the amount of memory
// used overall.
STATE_CACHING = 7 [(beam_urn) = "beam:protocol:state_caching:v1"];
+
+ // Indicates that this SDK can sample in-flight elements. These samples can
+ // then be queried using the SampleRequest. Samples are uniquely associated
+ // with (ProcessBundleDescriptor::id, PCollection::id). Meaning, samples
+ // are taken for each PCollection in every ProcessBundleDescriptor. This
+ // is disabled by default and enabled with the enable_data_sampling
+ // experiment.
Review Comment:
```suggestion
// Indicates that this SDK can sample in-flight elements. These samples
can
// then be queried using the SampleDataRequest. Samples are uniquely
associated
// with a PCollection. Meaning, samples are taken for each PCollection
// during bundle processing. This is disabled by default and enabled
with the
// `enable_data_sampling` experiment.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]