Abacn commented on code in PR #37066:
URL: https://github.com/apache/beam/pull/37066#discussion_r2641239823


##########
runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java:
##########
@@ -123,26 +134,51 @@ private static PortableMetrics 
convertMonitoringInfosToMetricResults(
         boundedTrieFromMetrics);
   }
 
+  /**
+   * Build a stable deduplication key for a MonitoringInfo based on type and 
the metric identity
+   * labels.
+   */
+  private static String monitoringInfoKey(MetricsApi.MonitoringInfo mi) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(mi.getType()).append('|');
+    Map<String, String> labels = mi.getLabelsMap();
+    // Use canonical labels that form the metric identity
+    sb.append(labels.getOrDefault(STEP_NAME_LABEL, "")).append('|');
+    sb.append(labels.getOrDefault(NAMESPACE_LABEL, "")).append('|');
+    sb.append(labels.getOrDefault(METRIC_NAME_LABEL, ""));
+    return sb.toString();
+  }
+
+  private static class MiAndCommitted {
+    final MetricsApi.MonitoringInfo mi;
+    final boolean committed;
+
+    MiAndCommitted(MetricsApi.MonitoringInfo mi, boolean committed) {
+      this.mi = mi;
+      this.committed = committed;
+    }
+  }
+
   private static Iterable<MetricResult<DistributionResult>>
-      extractDistributionMetricsFromJobMetrics(List<MetricsApi.MonitoringInfo> 
monitoringInfoList) {
+      extractDistributionMetricsFromJobMetrics(List<MiAndCommitted> 
monitoringInfoList) {
     return monitoringInfoList.stream()
-        .filter(item -> DISTRIBUTION_INT64_TYPE.equals(item.getType()))
-        .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
-        .map(PortableMetrics::convertDistributionMonitoringInfoToDistribution)
+        .filter(m -> DISTRIBUTION_INT64_TYPE.equals(m.mi.getType()))
+        .filter(m -> m.mi.getLabelsMap().get(NAMESPACE_LABEL) != null)
+        .map(m -> convertDistributionMonitoringInfoToDistribution(m.mi, 
m.committed))

Review Comment:
   we can make `convertDistributionMonitoringInfoToDistribution` to take a 
"MiAndCommitted" type parameter since it's internal (private static). Same 
apply to other helper methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to