This is an automated email from the ASF dual-hosted git repository. johncasey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 918eb7e109a add worker metadata to windmill* protos (#27462) 918eb7e109a is described below commit 918eb7e109a6c49962aed5a4fb7b84b3472df3c7 Author: martin trieu <martinktr...@gmail.com> AuthorDate: Wed Jul 19 10:31:57 2023 -0700 add worker metadata to windmill* protos (#27462) --- .../worker/windmill/src/main/proto/windmill.proto | 62 ++++++++++++++++------ .../windmill/src/main/proto/windmill_service.proto | 4 ++ 2 files changed, 50 insertions(+), 16 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index 86a551f8b64..f66b2bed48c 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -27,14 +27,14 @@ option java_outer_classname = "Windmill"; // API Data types message Message { - required int64 timestamp = 1 [default=-0x8000000000000000]; + required int64 timestamp = 1 [default = -0x8000000000000000]; required bytes data = 2; optional bytes metadata = 3; } message Timer { required bytes tag = 1; - optional int64 timestamp = 2 [default=-0x8000000000000000]; + optional int64 timestamp = 2 [default = -0x8000000000000000]; enum Type { WATERMARK = 0; REALTIME = 1; @@ -124,7 +124,7 @@ message TimerBundle { } message Value { - required int64 timestamp = 1 [default=-0x8000000000000000]; + required int64 timestamp = 1 [default = -0x8000000000000000]; required bytes data = 2; } @@ -293,7 +293,7 @@ message TagSortedListFetchResponse { repeated SortedListRange fetch_ranges = 5; // Request position copied from request. optional bytes request_position = 6; - } +} message TagSortedListUpdateRequest { optional bytes tag = 1; @@ -332,7 +332,7 @@ message SourceState { message WatermarkHold { required bytes tag = 1; - repeated int64 timestamps = 2 [packed=true]; + repeated int64 timestamps = 2 [packed = true]; optional bool reset = 3; optional string state_family = 4; } @@ -354,7 +354,7 @@ message WorkItem { optional TimerBundle timers = 4; repeated GlobalDataId global_data_id_notifications = 5; optional SourceState source_state = 6; - optional int64 output_data_watermark = 8 [default=-0x8000000000000000]; + optional int64 output_data_watermark = 8 [default = -0x8000000000000000]; // Indicates that this is a new key with no data associated. This allows // the harness to optimize data fetching. optional bool is_new_key = 10; @@ -368,9 +368,9 @@ message WorkItem { message ComputationWorkItems { required string computation_id = 1; repeated WorkItem work = 2; - optional int64 input_data_watermark = 3 [default=-0x8000000000000000]; + optional int64 input_data_watermark = 3 [default = -0x8000000000000000]; optional int64 dependent_realtime_input_watermark = 4 - [default = -0x8000000000000000]; + [default = -0x8000000000000000]; } //////////////////////////////////////////////////////////////////////////////// @@ -478,12 +478,12 @@ message Counter { // value accumulated since the worker started working on this WorkItem. // By default this is false, indicating that this metric is reported // as a delta that is not associated with any WorkItem. - optional bool cumulative = 7; + optional bool cumulative = 7; } message GlobalDataRequest { required GlobalDataId data_id = 1; - optional int64 existence_watermark_deadline = 2 [default=0x7FFFFFFFFFFFFFFF]; + optional int64 existence_watermark_deadline = 2 [default = 0x7FFFFFFFFFFFFFFF]; optional string state_family = 3; } @@ -509,8 +509,8 @@ message WorkItemCommitRequest { repeated GlobalDataRequest global_data_requests = 11; repeated GlobalData global_data_updates = 10; optional SourceState source_state_updates = 12; - optional int64 source_watermark = 13 [default=-0x8000000000000000]; - optional int64 source_backlog_bytes = 17 [default=-1]; + optional int64 source_watermark = 13 [default = -0x8000000000000000]; + optional int64 source_backlog_bytes = 17 [default = -1]; optional int64 source_bytes_processed = 22; repeated WatermarkHold watermark_holds = 14; @@ -592,7 +592,7 @@ message GetConfigResponse { optional string computation_id = 2; } repeated SystemNameToComputationIdMapEntry - system_name_to_computation_id_map = 3; + system_name_to_computation_id_map = 3; // Map of computation id to ComputationConfig. message ComputationConfigMapEntry { @@ -627,10 +627,10 @@ message ReportStatsResponse { // Streaming API message StreamingGetWorkRequest { - oneof chunk_type { + oneof chunk_type { GetWorkRequest request = 1; StreamingGetWorkRequestExtension request_extension = 2; - } + } } message StreamingGetWorkRequestExtension { @@ -661,7 +661,7 @@ message StreamingGetWorkResponseChunk { message ComputationWorkItemMetadata { optional string computation_id = 1; - optional int64 input_data_watermark = 2 [default=-0x8000000000000000]; + optional int64 input_data_watermark = 2 [default = -0x8000000000000000]; optional int64 dependent_realtime_input_watermark = 3 [default = -0x8000000000000000]; } @@ -742,6 +742,36 @@ message StreamingCommitResponse { repeated CommitStatus status = 2; } +message WorkerMetadataRequest { + optional JobHeader header = 1; +} + +message WorkerMetadataResponse { + // The metadata version increases with every modification. Within a single + // stream it will always be increasing. The version may be used across streams + // to ensure that the view of the metadata does not move backwards. + optional int64 metadata_version = 1; + + // Endpoints that should be used for requesting work with GetWorkStream. + // Additional data for returned work should be fetched from the endpoint with + // GetDataStream. The work should be committed to the endpoint with + // CommitWorkStream. Each response on this stream replaces the previous, and + // connections to endpoints that are no longer present should be closed. + message Endpoint { + optional string endpoint = 1; + } + repeated Endpoint work_endpoints = 2; + + // Maps from GlobalData tag to the endpoint that should be used for GetData + // calls to retrieve that global data. + map<string, Endpoint> global_data_endpoints = 3; + + // DirectPath endpoints to be used by user workers for streaming engine jobs. + // DirectPath endpoints here are virtual IPv6 addresses of the windmill + // workers. + repeated Endpoint direct_path_endpoints = 4; +} + service WindmillAppliance { // Gets streaming Dataflow work. rpc GetWork(.windmill.GetWorkRequest) returns (.windmill.GetWorkResponse); diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill_service.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill_service.proto index bef819d6990..803766d1a46 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill_service.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill_service.proto @@ -33,6 +33,10 @@ service CloudWindmillServiceV1Alpha1 { rpc GetWorkStream(stream .windmill.StreamingGetWorkRequest) returns (stream .windmill.StreamingGetWorkResponseChunk); + // Gets worker metadata. Response is a stream. + rpc GetWorkerMetadataStream(.windmill.WorkerMetadataRequest) + returns (stream .windmill.WorkerMetadataResponse); + // Gets data from Windmill. rpc GetData(.windmill.GetDataRequest) returns(.windmill.GetDataResponse);