This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 918eb7e109a add worker metadata to windmill* protos (#27462)
918eb7e109a is described below

commit 918eb7e109a6c49962aed5a4fb7b84b3472df3c7
Author: martin trieu <martinktr...@gmail.com>
AuthorDate: Wed Jul 19 10:31:57 2023 -0700

    add worker metadata to windmill* protos (#27462)
---
 .../worker/windmill/src/main/proto/windmill.proto  | 62 ++++++++++++++++------
 .../windmill/src/main/proto/windmill_service.proto |  4 ++
 2 files changed, 50 insertions(+), 16 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index 86a551f8b64..f66b2bed48c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++ 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -27,14 +27,14 @@ option java_outer_classname = "Windmill";
 // API Data types
 
 message Message {
-  required int64 timestamp = 1 [default=-0x8000000000000000];
+  required int64 timestamp = 1 [default = -0x8000000000000000];
   required bytes data = 2;
   optional bytes metadata = 3;
 }
 
 message Timer {
   required bytes tag = 1;
-  optional int64 timestamp = 2 [default=-0x8000000000000000];
+  optional int64 timestamp = 2 [default = -0x8000000000000000];
   enum Type {
     WATERMARK = 0;
     REALTIME = 1;
@@ -124,7 +124,7 @@ message TimerBundle {
 }
 
 message Value {
-  required int64 timestamp = 1 [default=-0x8000000000000000];
+  required int64 timestamp = 1 [default = -0x8000000000000000];
   required bytes data = 2;
 }
 
@@ -293,7 +293,7 @@ message TagSortedListFetchResponse {
   repeated SortedListRange fetch_ranges = 5;
   // Request position copied from request.
   optional bytes request_position = 6;
-  }
+}
 
 message TagSortedListUpdateRequest {
   optional bytes tag = 1;
@@ -332,7 +332,7 @@ message SourceState {
 
 message WatermarkHold {
   required bytes tag = 1;
-  repeated int64 timestamps = 2 [packed=true];
+  repeated int64 timestamps = 2 [packed = true];
   optional bool reset = 3;
   optional string state_family = 4;
 }
@@ -354,7 +354,7 @@ message WorkItem {
   optional TimerBundle timers = 4;
   repeated GlobalDataId global_data_id_notifications = 5;
   optional SourceState source_state = 6;
-  optional int64 output_data_watermark = 8 [default=-0x8000000000000000];
+  optional int64 output_data_watermark = 8 [default = -0x8000000000000000];
   // Indicates that this is a new key with no data associated. This allows
   // the harness to optimize data fetching.
   optional bool is_new_key = 10;
@@ -368,9 +368,9 @@ message WorkItem {
 message ComputationWorkItems {
   required string computation_id = 1;
   repeated WorkItem work = 2;
-  optional int64 input_data_watermark = 3 [default=-0x8000000000000000];
+  optional int64 input_data_watermark = 3 [default = -0x8000000000000000];
   optional int64 dependent_realtime_input_watermark = 4
-      [default = -0x8000000000000000];
+  [default = -0x8000000000000000];
 }
 
 
////////////////////////////////////////////////////////////////////////////////
@@ -478,12 +478,12 @@ message Counter {
   // value accumulated since the worker started working on this WorkItem.
   // By default this is false, indicating that this metric is reported
   // as a delta that is not associated with any WorkItem.
-   optional bool cumulative = 7;
+  optional bool cumulative = 7;
 }
 
 message GlobalDataRequest {
   required GlobalDataId data_id = 1;
-  optional int64 existence_watermark_deadline = 2 [default=0x7FFFFFFFFFFFFFFF];
+  optional int64 existence_watermark_deadline = 2 [default = 
0x7FFFFFFFFFFFFFFF];
   optional string state_family = 3;
 }
 
@@ -509,8 +509,8 @@ message WorkItemCommitRequest {
   repeated GlobalDataRequest global_data_requests = 11;
   repeated GlobalData global_data_updates = 10;
   optional SourceState source_state_updates = 12;
-  optional int64 source_watermark = 13 [default=-0x8000000000000000];
-  optional int64 source_backlog_bytes = 17 [default=-1];
+  optional int64 source_watermark = 13 [default = -0x8000000000000000];
+  optional int64 source_backlog_bytes = 17 [default = -1];
   optional int64 source_bytes_processed = 22;
 
   repeated WatermarkHold watermark_holds = 14;
@@ -592,7 +592,7 @@ message GetConfigResponse {
     optional string computation_id = 2;
   }
   repeated SystemNameToComputationIdMapEntry
-    system_name_to_computation_id_map = 3;
+      system_name_to_computation_id_map = 3;
 
   // Map of computation id to ComputationConfig.
   message ComputationConfigMapEntry {
@@ -627,10 +627,10 @@ message ReportStatsResponse {
 // Streaming API
 
 message StreamingGetWorkRequest {
- oneof chunk_type {
+  oneof chunk_type {
     GetWorkRequest request = 1;
     StreamingGetWorkRequestExtension request_extension = 2;
- }
+  }
 }
 
 message StreamingGetWorkRequestExtension {
@@ -661,7 +661,7 @@ message StreamingGetWorkResponseChunk {
 
 message ComputationWorkItemMetadata {
   optional string computation_id = 1;
-  optional int64 input_data_watermark = 2 [default=-0x8000000000000000];
+  optional int64 input_data_watermark = 2 [default = -0x8000000000000000];
   optional int64 dependent_realtime_input_watermark = 3
   [default = -0x8000000000000000];
 }
@@ -742,6 +742,36 @@ message StreamingCommitResponse {
   repeated CommitStatus status = 2;
 }
 
+message WorkerMetadataRequest {
+  optional JobHeader header = 1;
+}
+
+message WorkerMetadataResponse {
+  // The metadata version increases with every modification. Within a single
+  // stream it will always be increasing. The version may be used across 
streams
+  // to ensure that the view of the metadata does not move backwards.
+  optional int64 metadata_version = 1;
+
+  // Endpoints that should be used for requesting work with GetWorkStream.
+  // Additional data for returned work should be fetched from the endpoint with
+  // GetDataStream. The work should be committed to the endpoint with
+  // CommitWorkStream. Each response on this stream replaces the previous, and
+  // connections to endpoints that are no longer present should be closed.
+  message Endpoint {
+    optional string endpoint = 1;
+  }
+  repeated Endpoint work_endpoints = 2;
+
+  // Maps from GlobalData tag to the endpoint that should be used for GetData
+  // calls to retrieve that global data.
+  map<string, Endpoint> global_data_endpoints = 3;
+
+  // DirectPath endpoints to be used by user workers for streaming engine jobs.
+  // DirectPath endpoints here are virtual IPv6 addresses of the windmill
+  // workers.
+  repeated Endpoint direct_path_endpoints = 4;
+}
+
 service WindmillAppliance {
   // Gets streaming Dataflow work.
   rpc GetWork(.windmill.GetWorkRequest) returns (.windmill.GetWorkResponse);
diff --git 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill_service.proto
 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill_service.proto
index bef819d6990..803766d1a46 100644
--- 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill_service.proto
+++ 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill_service.proto
@@ -33,6 +33,10 @@ service CloudWindmillServiceV1Alpha1 {
   rpc GetWorkStream(stream .windmill.StreamingGetWorkRequest)
     returns (stream .windmill.StreamingGetWorkResponseChunk);
 
+  // Gets worker metadata.  Response is a stream.
+  rpc GetWorkerMetadataStream(.windmill.WorkerMetadataRequest)
+      returns (stream .windmill.WorkerMetadataResponse);
+
   // Gets data from Windmill.
   rpc GetData(.windmill.GetDataRequest) returns(.windmill.GetDataResponse);
 

Reply via email to