This is an automated email from the ASF dual-hosted git repository. rkhachatryan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 954fb37214b1c6a3727df834166b9178dc64af0a Author: Aleksandr Iushmanov <[email protected]> AuthorDate: Mon Apr 27 13:02:19 2026 +0100 [FLINK-39127][metrics] Address review feedback for OTel attribute truncation - Avoid HashMap.equals in no-op check: applyLimits renamed to getTransformedOrNull, returns null when nothing was modified so transform() uses reference equality instead. - Guard against null attribute values in the variable map. - Replace transformer = null in close() with transformer.reset() that clears tracking state while keeping config-derived limits. - Rephrase collision log message as "Possible truncation collision". - Add tests for null attribute value preservation. Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]> --- .../metrics/otel/MetricAttributeTransformer.java | 37 +++++++++++++++------- .../metrics/otel/OpenTelemetryMetricReporter.java | 4 ++- .../otel/MetricAttributeTransformerTest.java | 26 +++++++++++++++ 3 files changed, 55 insertions(+), 12 deletions(-) diff --git a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java index 96081604674..3248d08d940 100644 --- a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java +++ b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java @@ -165,9 +165,8 @@ final class MetricAttributeTransformer { * resulting (name, transformed-attributes) slot has been seen before with a different raw * input. Emits at most one WARN per slot. * - * <p>If no limits are configured, or if truncation would produce a map equal to the input, - * {@code rawVariables} is returned unchanged (reference-equal) and no tracking state is - * touched. + * <p>If no limits are configured, or if truncation produces no changes, {@code rawVariables} is + * returned unchanged (reference-equal) and no tracking state is touched. * * @param metricName the fully-qualified metric name * @param rawVariables the raw metric attribute map @@ -177,8 +176,8 @@ final class MetricAttributeTransformer { if (attributeValueLimits.isEmpty()) { return rawVariables; } - final Map<String, String> transformed = applyLimits(rawVariables); - if (transformed.equals(rawVariables)) { + final Map<String, String> transformed = getTransformedOrNull(rawVariables); + if (transformed == null) { // No-op truncation can't produce new collisions. Return the input to avoid retaining // the freshly-allocated transformed map. return rawVariables; @@ -198,9 +197,9 @@ final class MetricAttributeTransformer { return transformed; } LOG.warn( - "Truncation collision at metric '{}': this registration truncated {} and " + "Possible truncation collision at metric '{}': this registration truncated {} and " + "collapsed onto transformed attributes {} already seen for a " - + "different raw input. Exported series at this slot will be " + + "different raw input. Exported series at this slot may be " + "ambiguous at the downstream backend.", metricName, truncatedEntries(rawVariables, transformed), @@ -239,6 +238,13 @@ final class MetricAttributeTransformer { return slotStates == null ? 0 : slotStates.size(); } + void reset() { + if (slotStates != null) { + slotStates.clear(); + } + collisionCount = 0; + } + @VisibleForTesting Map<String, Integer> getAttributeValueLimits() { return attributeValueLimits; @@ -246,26 +252,34 @@ final class MetricAttributeTransformer { // ----------------------------------------------------------------------- - private Map<String, String> applyLimits(final Map<String, String> variables) { + @Nullable + private Map<String, String> getTransformedOrNull(final Map<String, String> variables) { final int globalLimit = attributeValueLimits.getOrDefault(GLOBAL_LIMIT_KEY, Integer.MAX_VALUE); final Map<String, String> result = new HashMap<>(); + boolean modified = false; for (Map.Entry<String, String> entry : variables.entrySet()) { final String key = entry.getKey(); final String value = entry.getValue(); + if (value == null) { + result.put(key, null); + continue; + } final int configuredLimit = attributeValueLimits.getOrDefault(key, globalLimit); if (configuredLimit == 0) { // Drop the attribute entirely (user-requested via 0). + modified = true; continue; } if (configuredLimit > 0 && value.length() > configuredLimit) { result.put(key, value.substring(0, configuredLimit)); + modified = true; } else { // No truncation needed, or negative limit (disables per-attribute truncation). result.put(key, value); } } - return result; + return modified ? result : null; } /** @@ -295,8 +309,9 @@ final class MetricAttributeTransformer { } /** Packs two 32-bit string hashes into one 64-bit value — key in the high half, value low. */ - private static long entryHash(final String key, final String value) { - return (((long) key.hashCode()) << 32) ^ (value.hashCode() & LOW_32_BITS_MASK); + private static long entryHash(final String key, @Nullable final String value) { + final int valueHash = value != null ? value.hashCode() : 0; + return (((long) key.hashCode()) << 32) ^ (valueHash & LOW_32_BITS_MASK); } private static Map<String, Integer> parseAttributeLimits(final MetricConfig config) { diff --git a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java index 6876fb09275..3ad30aae010 100644 --- a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java +++ b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java @@ -166,7 +166,9 @@ public class OpenTelemetryMetricReporter extends OpenTelemetryReporterBase @Override public synchronized void close() { - transformer = null; + if (transformer != null) { + transformer.reset(); + } if (exporter != null) { exporter.flush(); waitForLastReportToComplete(); diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java index ebb500d5ec6..40c033271c5 100644 --- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java @@ -171,6 +171,32 @@ class MetricAttributeTransformerTest { assertThat(perAttrTransformer.transform("m", input)).containsEntry("k", ""); } + @Test + void testNullAttributeValueIsPreserved() { + final MetricAttributeTransformer transformer = buildTransformerWithGlobalLimit(5); + + final Map<String, String> input = new HashMap<>(); + input.put("null_key", null); + input.put("normal_key", "long_value_to_truncate"); + + final Map<String, String> result = transformer.transform("m", input); + + assertThat(result).containsEntry("null_key", null).containsEntry("normal_key", "long_"); + assertNotSame(input, result); + } + + @Test + void testNullAttributeValueNoOpWhenOnlyNullsPresent() { + final MetricAttributeTransformer transformer = buildTransformerWithGlobalLimit(5); + + final Map<String, String> input = new HashMap<>(); + input.put("null_key", null); + + final Map<String, String> result = transformer.transform("m", input); + + assertSame(input, result); + } + @Test void testEmptyAttributeNameIsRejected() { final MetricConfig cfg = new MetricConfig();
