This is an automated email from the ASF dual-hosted git repository.

rkhachatryan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 954fb37214b1c6a3727df834166b9178dc64af0a
Author: Aleksandr Iushmanov <[email protected]>
AuthorDate: Mon Apr 27 13:02:19 2026 +0100

    [FLINK-39127][metrics] Address review feedback for OTel attribute truncation
    
    - Avoid HashMap.equals in no-op check: applyLimits renamed to
      getTransformedOrNull, returns null when nothing was modified so
      transform() uses reference equality instead.
    - Guard against null attribute values in the variable map.
    - Replace transformer = null in close() with transformer.reset()
      that clears tracking state while keeping config-derived limits.
    - Rephrase collision log message as "Possible truncation collision".
    - Add tests for null attribute value preservation.
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
---
 .../metrics/otel/MetricAttributeTransformer.java   | 37 +++++++++++++++-------
 .../metrics/otel/OpenTelemetryMetricReporter.java  |  4 ++-
 .../otel/MetricAttributeTransformerTest.java       | 26 +++++++++++++++
 3 files changed, 55 insertions(+), 12 deletions(-)

diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java
index 96081604674..3248d08d940 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java
@@ -165,9 +165,8 @@ final class MetricAttributeTransformer {
      * resulting (name, transformed-attributes) slot has been seen before with 
a different raw
      * input. Emits at most one WARN per slot.
      *
-     * <p>If no limits are configured, or if truncation would produce a map 
equal to the input,
-     * {@code rawVariables} is returned unchanged (reference-equal) and no 
tracking state is
-     * touched.
+     * <p>If no limits are configured, or if truncation produces no changes, 
{@code rawVariables} is
+     * returned unchanged (reference-equal) and no tracking state is touched.
      *
      * @param metricName the fully-qualified metric name
      * @param rawVariables the raw metric attribute map
@@ -177,8 +176,8 @@ final class MetricAttributeTransformer {
         if (attributeValueLimits.isEmpty()) {
             return rawVariables;
         }
-        final Map<String, String> transformed = applyLimits(rawVariables);
-        if (transformed.equals(rawVariables)) {
+        final Map<String, String> transformed = 
getTransformedOrNull(rawVariables);
+        if (transformed == null) {
             // No-op truncation can't produce new collisions. Return the input 
to avoid retaining
             // the freshly-allocated transformed map.
             return rawVariables;
@@ -198,9 +197,9 @@ final class MetricAttributeTransformer {
             return transformed;
         }
         LOG.warn(
-                "Truncation collision at metric '{}': this registration 
truncated {} and "
+                "Possible truncation collision at metric '{}': this 
registration truncated {} and "
                         + "collapsed onto transformed attributes {} already 
seen for a "
-                        + "different raw input. Exported series at this slot 
will be "
+                        + "different raw input. Exported series at this slot 
may be "
                         + "ambiguous at the downstream backend.",
                 metricName,
                 truncatedEntries(rawVariables, transformed),
@@ -239,6 +238,13 @@ final class MetricAttributeTransformer {
         return slotStates == null ? 0 : slotStates.size();
     }
 
+    void reset() {
+        if (slotStates != null) {
+            slotStates.clear();
+        }
+        collisionCount = 0;
+    }
+
     @VisibleForTesting
     Map<String, Integer> getAttributeValueLimits() {
         return attributeValueLimits;
@@ -246,26 +252,34 @@ final class MetricAttributeTransformer {
 
     // -----------------------------------------------------------------------
 
-    private Map<String, String> applyLimits(final Map<String, String> 
variables) {
+    @Nullable
+    private Map<String, String> getTransformedOrNull(final Map<String, String> 
variables) {
         final int globalLimit =
                 attributeValueLimits.getOrDefault(GLOBAL_LIMIT_KEY, 
Integer.MAX_VALUE);
         final Map<String, String> result = new HashMap<>();
+        boolean modified = false;
         for (Map.Entry<String, String> entry : variables.entrySet()) {
             final String key = entry.getKey();
             final String value = entry.getValue();
+            if (value == null) {
+                result.put(key, null);
+                continue;
+            }
             final int configuredLimit = attributeValueLimits.getOrDefault(key, 
globalLimit);
             if (configuredLimit == 0) {
                 // Drop the attribute entirely (user-requested via 0).
+                modified = true;
                 continue;
             }
             if (configuredLimit > 0 && value.length() > configuredLimit) {
                 result.put(key, value.substring(0, configuredLimit));
+                modified = true;
             } else {
                 // No truncation needed, or negative limit (disables 
per-attribute truncation).
                 result.put(key, value);
             }
         }
-        return result;
+        return modified ? result : null;
     }
 
     /**
@@ -295,8 +309,9 @@ final class MetricAttributeTransformer {
     }
 
     /** Packs two 32-bit string hashes into one 64-bit value — key in the high 
half, value low. */
-    private static long entryHash(final String key, final String value) {
-        return (((long) key.hashCode()) << 32) ^ (value.hashCode() & 
LOW_32_BITS_MASK);
+    private static long entryHash(final String key, @Nullable final String 
value) {
+        final int valueHash = value != null ? value.hashCode() : 0;
+        return (((long) key.hashCode()) << 32) ^ (valueHash & 
LOW_32_BITS_MASK);
     }
 
     private static Map<String, Integer> parseAttributeLimits(final 
MetricConfig config) {
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
index 6876fb09275..3ad30aae010 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
@@ -166,7 +166,9 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
 
     @Override
     public synchronized void close() {
-        transformer = null;
+        if (transformer != null) {
+            transformer.reset();
+        }
         if (exporter != null) {
             exporter.flush();
             waitForLastReportToComplete();
diff --git 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java
 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java
index ebb500d5ec6..40c033271c5 100644
--- 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java
+++ 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java
@@ -171,6 +171,32 @@ class MetricAttributeTransformerTest {
         assertThat(perAttrTransformer.transform("m", 
input)).containsEntry("k", "");
     }
 
+    @Test
+    void testNullAttributeValueIsPreserved() {
+        final MetricAttributeTransformer transformer = 
buildTransformerWithGlobalLimit(5);
+
+        final Map<String, String> input = new HashMap<>();
+        input.put("null_key", null);
+        input.put("normal_key", "long_value_to_truncate");
+
+        final Map<String, String> result = transformer.transform("m", input);
+
+        assertThat(result).containsEntry("null_key", 
null).containsEntry("normal_key", "long_");
+        assertNotSame(input, result);
+    }
+
+    @Test
+    void testNullAttributeValueNoOpWhenOnlyNullsPresent() {
+        final MetricAttributeTransformer transformer = 
buildTransformerWithGlobalLimit(5);
+
+        final Map<String, String> input = new HashMap<>();
+        input.put("null_key", null);
+
+        final Map<String, String> result = transformer.transform("m", input);
+
+        assertSame(input, result);
+    }
+
     @Test
     void testEmptyAttributeNameIsRejected() {
         final MetricConfig cfg = new MetricConfig();

Reply via email to