This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6d24c3dec3b [Prism] Fix a distribution metric problem when count is
zero (#36723)
6d24c3dec3b is described below
commit 6d24c3dec3bfab3d49c32ed2ef2fe3d0a8d803ef
Author: Shunping Huang <[email protected]>
AuthorDate: Tue Nov 4 19:03:31 2025 -0500
[Prism] Fix a distribution metric problem when count is zero (#36723)
* Fix a distribution metric problem when count is zero.
* Revise the notes.
---
.../main/proto/org/apache/beam/model/pipeline/v1/metrics.proto | 10 +++++++++-
sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go | 6 ++++++
2 files changed, 15 insertions(+), 1 deletion(-)
diff --git
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
index d5951c23c10..fcce35394b9 100644
---
a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
+++
b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
@@ -457,7 +457,7 @@ message MonitoringInfo {
SPANNER_TABLE_ID = 25 [(label_props) = { name: "SPANNER_TABLE_ID" }];
SPANNER_INSTANCE_ID = 26 [(label_props) = { name: "SPANNER_INSTANCE_ID" }];
SPANNER_QUERY_NAME = 27 [(label_props) = { name: "SPANNER_QUERY_NAME" }];
- // Label which if has a "true" value indicates that the metric is intended
+ // Label which if has a "true" value indicates that the metric is intended
// to be aggregated per-worker.
PER_WORKER_METRIC = 28 [(label_props) = { name: "PER_WORKER_METRIC" }];
}
@@ -517,6 +517,10 @@ message MonitoringInfoTypeUrns {
// - sum: beam:coder:varint:v1
// - min: beam:coder:varint:v1
// - max: beam:coder:varint:v1
+ //
+ // Note that when count is zero, the SDK may not send sum, min, and max in
+ // the response. If those fields are included in the payload, runners
should
+ // omit them.
DISTRIBUTION_INT64_TYPE = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:distribution_int64:v1"];
@@ -531,6 +535,10 @@ message MonitoringInfoTypeUrns {
// - sum: beam:coder:double:v1
// - min: beam:coder:double:v1
// - max: beam:coder:double:v1
+ //
+ // Note that when count is zero, the SDK may not send sum, min, and max in
+ // the response. If those fields are included in the payload, runners
should
+ // omit them.
DISTRIBUTION_DOUBLE_TYPE = 3 [(org.apache.beam.model.pipeline.v1.beam_urn)
=
"beam:metrics:distribution_double:v1"];
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
index bbbdfd1eba4..12d93581546 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/metrics.go
@@ -326,6 +326,12 @@ func (m *distributionInt64) accumulate(pyld []byte) error {
if dist.Count, err = coder.DecodeVarInt(buf); err != nil {
return err
}
+ if dist.Count == 0 {
+ // When there is no elements reported, the payload may contain
the values
+ // for count, sum, min and max, or it may contain only one 0x00
byte for
+ // count. No matter what, we will skip aggregation in this case.
+ return nil
+ }
if dist.Sum, err = coder.DecodeVarInt(buf); err != nil {
return err
}