This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d24c3dec3b [Prism] Fix a distribution metric problem when count is 
zero (#36723)
6d24c3dec3b is described below

commit 6d24c3dec3bfab3d49c32ed2ef2fe3d0a8d803ef
Author: Shunping Huang <[email protected]>
AuthorDate: Tue Nov 4 19:03:31 2025 -0500

    [Prism] Fix a distribution metric problem when count is zero (#36723)
    
    * Fix a distribution metric problem when count is zero.
    
    * Revise the notes.
---
 .../main/proto/org/apache/beam/model/pipeline/v1/metrics.proto | 10 +++++++++-
 sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go |  6 ++++++
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
index d5951c23c10..fcce35394b9 100644
--- 
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
+++ 
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
@@ -457,7 +457,7 @@ message MonitoringInfo {
     SPANNER_TABLE_ID = 25 [(label_props) = { name: "SPANNER_TABLE_ID" }];
     SPANNER_INSTANCE_ID = 26 [(label_props) = { name: "SPANNER_INSTANCE_ID" }];
     SPANNER_QUERY_NAME = 27 [(label_props) = { name: "SPANNER_QUERY_NAME" }];
-    // Label which if has a "true" value indicates that the metric is intended 
+    // Label which if has a "true" value indicates that the metric is intended
     // to be aggregated per-worker.
     PER_WORKER_METRIC = 28 [(label_props) = { name: "PER_WORKER_METRIC" }];
   }
@@ -517,6 +517,10 @@ message MonitoringInfoTypeUrns {
     //   - sum:   beam:coder:varint:v1
     //   - min:   beam:coder:varint:v1
     //   - max:   beam:coder:varint:v1
+    //
+    // Note that when count is zero, the SDK may not send sum, min, and max in
+    // the response. If those fields are included in the payload, runners 
should
+    // omit them.
     DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
                                      "beam:metrics:distribution_int64:v1"];
 
@@ -531,6 +535,10 @@ message MonitoringInfoTypeUrns {
     //   - sum:   beam:coder:double:v1
     //   - min:   beam:coder:double:v1
     //   - max:   beam:coder:double:v1
+    //
+    // Note that when count is zero, the SDK may not send sum, min, and max in
+    // the response. If those fields are included in the payload, runners 
should
+    // omit them.
     DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) 
=
                                  "beam:metrics:distribution_double:v1"];
 
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
index bbbdfd1eba4..12d93581546 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
@@ -326,6 +326,12 @@ func (m *distributionInt64) accumulate(pyld []byte) error {
        if dist.Count, err = coder.DecodeVarInt(buf); err != nil {
                return err
        }
+       if dist.Count == 0 {
+               // When there is no elements reported, the payload may contain 
the values
+               // for count, sum, min and max, or it may contain only one 0x00 
byte for
+               // count. No matter what, we will skip aggregation in this case.
+               return nil
+       }
        if dist.Sum, err = coder.DecodeVarInt(buf); err != nil {
                return err
        }

Reply via email to