This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aaadb7712ab Define PerStepNamespaceMetrics proto that will be used to 
send metric updates on the GetData heartbeat requests. (#29482)
aaadb7712ab is described below

commit aaadb7712ab4e5617913babc1c360d6c833e74f1
Author: JayajP <[email protected]>
AuthorDate: Mon Nov 20 06:58:31 2023 -0800

    Define PerStepNamespaceMetrics proto that will be used to send metric 
updates on the GetData heartbeat requests. (#29482)
    
    * Add PerStepNamespaceMetrics proto in windmill api
    
    * Add worker_id field to GetDataRequest
---
 .../worker/windmill/src/main/proto/windmill.proto  | 75 ++++++++++++++++++++++
 1 file changed, 75 insertions(+)

diff --git 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index 1759185911d..4b69533a151 100644
--- 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++ 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -81,6 +81,72 @@ message LatencyAttribution {
   optional int64 total_duration_millis = 2;
 }
 
+message PerStepNamespaceMetrics {
+  // The namespace of these metrics on the user worker.
+  optional string metrics_namespace = 1;
+  // The original system name of the unfused step that these metrics are
+  // reported from.
+  optional string original_step_name = 2;
+  // Metrics that are recorded for this namespace and unfused step.
+  repeated MetricValue metric_values = 3;
+}
+
+message MetricValue {
+  optional string metric_name = 1;
+  map<string, string> metric_labels = 2;
+  oneof value {
+    int64 value_int64 = 3;
+    Histogram value_histogram = 4;
+  }
+}
+
+// google3/google/api/distribution.proto with the following limitations.
+// Histogram does not support explicit buckets.
+// Histogram only supports a specific type of exponential buckets.
+message Histogram {
+  // Number of values recorded in this distribution.
+  optional int64 count = 1;
+  // The arithmetic mean of the values recorded in this distribution.
+  optional double mean = 2;
+  // The sum of squared deviations from the mean of the values recorded in this
+  // histogram. For values x_i this is:
+  //     Sum[i=1..n]((x_i - mean)^2)
+  optional double sum_of_squared_deviations = 3;
+
+  // `BucketOptions` describes the bucket boundaries used in the histogram.
+  message BucketOptions {
+    // Linear buckets with the following boundaries for indicies in 0 to n-1.
+    // 0: (-inf, start)
+    // i in [1, n-2]:  [start + (i-1)*width, start + (i)*width)
+    // n-1: [start + (n-1)*width, inf)
+    // The 0th and n-1th bucket are the underflow/overflow buckets 
respectively.
+    message Linear {
+      optional int32 number_of_buckets = 1;
+      optional double width = 2;
+      optional double start = 3;
+    }
+
+    // Exponential buckets where the growth factor between buckets is
+    // 2**(2**-scale). e.g. for 'scale=1' growth factor is 
2**(2**(-1))=sqrt(2).
+    // Buckets with the following boundaries for indicies in 0 to n-1.
+    // 0th: (-inf, 0)
+    // 1st: [0, gf)
+    // i in [2, n-2]: [gf^(i-1), gf^i)
+    // n-1: [gf^(n-2), inf)
+    // The 0th and n-1th bucket are the underflow/overflow buckets 
respectively.
+    message Base2Exponent {
+      optional int32 number_of_buckets = 1;
+      optional int32 scale = 2;
+    }
+    oneof BucketType {
+      Linear linear = 1;
+      Base2Exponent exponential = 2;
+    }
+  }
+  optional BucketOptions bucket_options = 5;
+  repeated int64 bucket_counts = 6 [packed = true];
+}
+
 message GetWorkStreamTimingInfo {
   enum Event {
     UNKNOWN = 0;
@@ -422,6 +488,8 @@ message GetDataRequest {
   optional string project_id = 5;
   repeated ComputationGetDataRequest requests = 1;
   repeated GlobalDataRequest global_data_fetch_requests = 3;
+  // Assigned worker id for the instance.
+  optional string worker_id = 6;
 
   // DEPRECATED
   repeated GlobalDataId global_data_to_fetch = 2;
@@ -485,6 +553,13 @@ message GlobalDataRequest {
   required GlobalDataId data_id = 1;
   optional int64 existence_watermark_deadline = 2 [default = 
0x7FFFFFFFFFFFFFFF];
   optional string state_family = 3;
+
+  // Computation Id for this GlobalDataRequest. Only set for heartbeats.
+  optional string computation_id = 4;
+  // Dataflow defined metrics keyed by metrics namespace and unfused step name.
+  // All unfused steps in this list belong to the fused stage that
+  // computation_id refers to. Only set for heartbeats.
+  repeated PerStepNamespaceMetrics per_step_namespace_metrics = 5;
 }
 
 // next id: 28

Reply via email to