This is an automated email from the ASF dual-hosted git repository.

rkhachatryan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6efa7473f41c239674c0b709ae916c0174cc76a9
Author: Aleksandr Iushmanov <[email protected]>
AuthorDate: Fri Apr 17 15:15:59 2026 +0100

    [FLINK-39127][metrics] Truncate OTel attribute values via config
    
    Adds configurable per-attribute and global length limits for metric
    attribute values exported by the OpenTelemetry reporter, addressing
    oversized payload rejections from OTel collectors. Configuration is
    prefix-based (transform.attribute-value-length-limits.<attr>), with
    a `*` global key, 0 to drop, and negative values to disable the
    limit per attribute. Colliding metrics after truncation are logged
    and counted best-effort without blocking registration.
    
    A second option, transform.collision-tracking-max-slots, bounds the
    memory footprint of the collision tracker via an access-ordered LRU
    (default 50000, 0 disables, invalid values WARN and fall back to the
    default). This prevents unbounded heap growth under failover loops
    where every task attempt produces a distinct per-attempt UUID that
    maps to a new tracking slot.
    
    Part of FLIP-553.
    
    Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
---
 .../open_telemetry_reporter_configuration.html     |  12 +
 flink-metrics/flink-metrics-otel/pom.xml           |   1 +
 .../metrics/otel/MetricAttributeTransformer.java   | 333 ++++++++++++++++
 .../metrics/otel/OpenTelemetryMetricReporter.java  |  21 +-
 .../metrics/otel/OpenTelemetryReporterOptions.java |  65 ++++
 .../otel/MetricAttributeTransformerTest.java       | 429 +++++++++++++++++++++
 .../otel/OpenTelemetryMetricReporterITCase.java    | 116 ++++++
 7 files changed, 973 insertions(+), 4 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html 
b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
index df9032e4e79..3b82439dde0 100644
--- 
a/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/open_telemetry_reporter_configuration.html
@@ -56,5 +56,17 @@
             <td>String</td>
             <td>service.version passed to OpenTelemetry Reporters.</td>
         </tr>
+        <tr>
+            
<td><h5>metrics.reporter.OpenTelemetry.transform.attribute-value-length-limits.&lt;attribute-name&gt;</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>Limits of the exported attribute values length. Only applies 
to the metric reporter; ignored by the trace and event reporters. Configuration 
is prefix based, for example to limit `task_name` attribute set 
`metrics.reporter.otel.transform.attribute-value-length-limits.task_name: 60` 
in the config for OTel reporter. A special key '*' can be used to define a 
global limit for all attributes not explicitly listed. For example 
`metrics.reporter.otel.transform.attribute-value-le [...]
+        </tr>
+        <tr>
+            
<td><h5>metrics.reporter.OpenTelemetry.transform.collision-tracking-max-slots</h5></td>
+            <td style="word-wrap: break-word;">50000</td>
+            <td>Integer</td>
+            <td>Maximum number of distinct (metric name, transformed 
attributes) slots the truncation-collision tracker retains. Only applies to the 
metric reporter; ignored by the trace and event reporters. When the cap is 
reached the least-recently-touched slot is evicted (LRU); a previously warned 
slot that later gets evicted may fire its warning again on re-entry. Only 
consulted when attribute-value length limits are configured — if no truncation 
is happening, there is nothing to tra [...]
+        </tr>
     </tbody>
 </table>
diff --git a/flink-metrics/flink-metrics-otel/pom.xml 
b/flink-metrics/flink-metrics-otel/pom.xml
index 33117e1f5b8..ee2b5d984da 100644
--- a/flink-metrics/flink-metrics-otel/pom.xml
+++ b/flink-metrics/flink-metrics-otel/pom.xml
@@ -128,6 +128,7 @@ under the License.
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-test-utils-junit</artifactId>
                        <version>${project.version}</version>
+                       <scope>test</scope>
                </dependency>
 
                <dependency>
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java
new file mode 100644
index 00000000000..96081604674
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/MetricAttributeTransformer.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.otel;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.MetricConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY;
+import static 
org.apache.flink.metrics.otel.OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS;
+
+/**
+ * Applies per-attribute and global length limits to metric attribute values 
before they are
+ * exported via the OpenTelemetry reporter, and best-effort detects cases 
where truncation caused
+ * two distinct raw attribute maps to collapse to the same transformed output 
for a given metric
+ * name (which results in ambiguous exported series at the downstream backend).
+ *
+ * <p>Configuration is prefix-based under {@value
+ * OpenTelemetryReporterOptions#ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY}. The 
special key {@code *}
+ * sets a global limit for all attributes not explicitly listed.
+ *
+ * <p>Semantics of individual limit values:
+ *
+ * <ul>
+ *   <li>Positive: truncate the attribute value to that many characters.
+ *   <li>Zero: drop the attribute entirely.
+ *   <li>Negative: keep the full value (overrides a global cap).
+ * </ul>
+ *
+ * <p><b>Collision detection</b> fires at most once per (name, 
transformed-attributes) slot when two
+ * distinct raw attribute maps have been observed for that slot. The tracker 
stores only 64-bit
+ * hashes, in a bounded LRU so that memory stays contained when attribute 
cardinality is high (e.g.
+ * a per-attempt UUID). The cap is configured via {@link
+ * OpenTelemetryReporterOptions#COLLISION_TRACKING_MAX_SLOTS}; {@code 0} 
disables tracking entirely.
+ * On overflow the least-recently-touched slot is evicted, and a previously 
warned slot that later
+ * gets evicted may fire its warning again on re-entry.
+ *
+ * <p>This class is <b>not thread-safe</b>. Callers are expected to serialize 
calls to {@link
+ * #transform}.
+ *
+ * @see OpenTelemetryReporterOptions#ATTRIBUTE_VALUE_LENGTH_LIMITS
+ */
+@NotThreadSafe
+final class MetricAttributeTransformer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MetricAttributeTransformer.class);
+
+    static final String GLOBAL_LIMIT_KEY = "*";
+
+    /** Load factor for the tracking map; matches {@link java.util.HashMap}'s 
default. */
+    private static final float TRACKING_MAP_LOAD_FACTOR = 0.75f;
+
+    /** Upper bound on the tracking map's initial capacity. */
+    private static final int TRACKING_MAP_INITIAL_CAPACITY = 1024;
+
+    /** Access-ordered iteration so {@code removeEldestEntry} evicts the true 
LRU entry. */
+    private static final boolean LRU_ACCESS_ORDER = true;
+
+    /** Parsed attribute-to-limit mapping, including {@code *} for the global 
limit. */
+    private final Map<String, Integer> attributeValueLimits;
+
+    /**
+     * Bounded LRU of slot-hash → tracking state, or {@code null} when 
tracking is disabled. The
+     * slot-hash encodes (metricName, transformedVariables); {@link SlotState} 
records the first
+     * raw-hash observed at that slot and whether the slot has already warned.
+     */
+    @Nullable private final Map<Long, SlotState> slotStates;
+
+    private long collisionCount = 0;
+
+    /**
+     * Per-slot tracking state. Mutating {@code warned} in place avoids a 
per-warning allocation;
+     * the immutable slot-hash key drives LRU ordering regardless.
+     */
+    private static final class SlotState {
+        final long firstRawHash;
+        boolean warned;
+
+        SlotState(final long firstRawHash) {
+            this.firstRawHash = firstRawHash;
+        }
+    }
+
+    /**
+     * Constructs a {@link MetricAttributeTransformer} by reading length-limit 
entries from the
+     * supplied {@link MetricConfig}.
+     */
+    MetricAttributeTransformer(final MetricConfig metricConfig) {
+        this.attributeValueLimits = parseAttributeLimits(metricConfig);
+        if (!attributeValueLimits.isEmpty()) {
+            LOG.info(
+                    "Metric attribute transformer is configured with value 
length limits: {}",
+                    attributeValueLimits);
+        }
+        this.slotStates = buildSlotStateMap(metricConfig);
+    }
+
+    @Nullable
+    private static Map<Long, SlotState> buildSlotStateMap(final MetricConfig 
metricConfig) {
+        final int maxSlots = readCollisionTrackingMaxSlots(metricConfig);
+        if (maxSlots == 0) {
+            return null;
+        }
+        return new LinkedHashMap<>(
+                TRACKING_MAP_INITIAL_CAPACITY, TRACKING_MAP_LOAD_FACTOR, 
LRU_ACCESS_ORDER) {
+            @Override
+            protected boolean removeEldestEntry(final Map.Entry<Long, 
SlotState> eldest) {
+                return size() > maxSlots;
+            }
+        };
+    }
+
+    private static int readCollisionTrackingMaxSlots(final MetricConfig 
metricConfig) {
+        final int defaultValue = COLLISION_TRACKING_MAX_SLOTS.defaultValue();
+        final int parsed;
+        try {
+            parsed = 
metricConfig.getInteger(COLLISION_TRACKING_MAX_SLOTS.key(), defaultValue);
+        } catch (final NumberFormatException e) {
+            LOG.warn(
+                    "Skipping invalid format for {} with value: {}; falling 
back to default {}",
+                    COLLISION_TRACKING_MAX_SLOTS.key(),
+                    metricConfig.get(COLLISION_TRACKING_MAX_SLOTS.key()),
+                    defaultValue,
+                    e);
+            return defaultValue;
+        }
+        if (parsed < 0) {
+            LOG.warn(
+                    "Ignoring negative value {} for {}; falling back to 
default {}",
+                    parsed,
+                    COLLISION_TRACKING_MAX_SLOTS.key(),
+                    defaultValue);
+            return defaultValue;
+        }
+        return parsed;
+    }
+
+    /**
+     * Applies configured length limits to {@code rawVariables} and 
best-effort tracks whether the
+     * resulting (name, transformed-attributes) slot has been seen before with 
a different raw
+     * input. Emits at most one WARN per slot.
+     *
+     * <p>If no limits are configured, or if truncation would produce a map 
equal to the input,
+     * {@code rawVariables} is returned unchanged (reference-equal) and no 
tracking state is
+     * touched.
+     *
+     * @param metricName the fully-qualified metric name
+     * @param rawVariables the raw metric attribute map
+     * @return the transformed attribute map to store alongside the metric
+     */
+    Map<String, String> transform(final String metricName, final Map<String, 
String> rawVariables) {
+        if (attributeValueLimits.isEmpty()) {
+            return rawVariables;
+        }
+        final Map<String, String> transformed = applyLimits(rawVariables);
+        if (transformed.equals(rawVariables)) {
+            // No-op truncation can't produce new collisions. Return the input 
to avoid retaining
+            // the freshly-allocated transformed map.
+            return rawVariables;
+        }
+        if (slotStates == null) {
+            return transformed;
+        }
+
+        final long slotHash = slotHash(metricName, transformed);
+        final long rawHash = attributesHash(rawVariables);
+        final SlotState state = slotStates.get(slotHash);
+        if (state == null) {
+            slotStates.put(slotHash, new SlotState(rawHash));
+            return transformed;
+        }
+        if (state.warned || state.firstRawHash == rawHash) {
+            return transformed;
+        }
+        LOG.warn(
+                "Truncation collision at metric '{}': this registration 
truncated {} and "
+                        + "collapsed onto transformed attributes {} already 
seen for a "
+                        + "different raw input. Exported series at this slot 
will be "
+                        + "ambiguous at the downstream backend.",
+                metricName,
+                truncatedEntries(rawVariables, transformed),
+                transformed);
+        state.warned = true;
+        collisionCount++;
+        return transformed;
+    }
+
+    /**
+     * Returns the subset of {@code raw} entries whose value was actually 
truncated. Dropped
+     * attributes don't contribute to the collision signature for this warning.
+     */
+    private static Map<String, String> truncatedEntries(
+            final Map<String, String> raw, final Map<String, String> 
transformed) {
+        final Map<String, String> diff = new HashMap<>();
+        for (Map.Entry<String, String> e : raw.entrySet()) {
+            final String newValue = transformed.get(e.getKey());
+            if (newValue != null && !newValue.equals(e.getValue())) {
+                diff.put(e.getKey(), e.getValue());
+            }
+        }
+        return diff;
+    }
+
+    /**
+     * @return the cumulative number of collisions observed since construction.
+     */
+    @VisibleForTesting
+    long getCollisionCount() {
+        return collisionCount;
+    }
+
+    @VisibleForTesting
+    int getTrackedSlotCount() {
+        return slotStates == null ? 0 : slotStates.size();
+    }
+
+    @VisibleForTesting
+    Map<String, Integer> getAttributeValueLimits() {
+        return attributeValueLimits;
+    }
+
+    // -----------------------------------------------------------------------
+
+    private Map<String, String> applyLimits(final Map<String, String> 
variables) {
+        final int globalLimit =
+                attributeValueLimits.getOrDefault(GLOBAL_LIMIT_KEY, 
Integer.MAX_VALUE);
+        final Map<String, String> result = new HashMap<>();
+        for (Map.Entry<String, String> entry : variables.entrySet()) {
+            final String key = entry.getKey();
+            final String value = entry.getValue();
+            final int configuredLimit = attributeValueLimits.getOrDefault(key, 
globalLimit);
+            if (configuredLimit == 0) {
+                // Drop the attribute entirely (user-requested via 0).
+                continue;
+            }
+            if (configuredLimit > 0 && value.length() > configuredLimit) {
+                result.put(key, value.substring(0, configuredLimit));
+            } else {
+                // No truncation needed, or negative limit (disables 
per-attribute truncation).
+                result.put(key, value);
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Reserved attribute key under which the metric name is folded into 
{@link #slotHash}, so the
+     * metric name participates in the slot-hash exactly like any other 
attribute.
+     */
+    private static final String METRIC_NAME_PSEUDO_ATTRIBUTE = 
"__metric_name__";
+
+    /** Bit-mask to extend an {@code int} hash into the low 32 bits of a 
{@code long}. */
+    private static final long LOW_32_BITS_MASK = 0xFFFFFFFFL;
+
+    /**
+     * Order-independent 64-bit hash of (metricName, transformed). Folds the 
metric name as a
+     * reserved pseudo-attribute, so it combines through the same logic as 
real attributes.
+     */
+    private static long slotHash(final String metricName, final Map<String, 
String> transformed) {
+        return attributesHash(transformed) ^ 
entryHash(METRIC_NAME_PSEUDO_ATTRIBUTE, metricName);
+    }
+
+    /** Order-independent 64-bit hash of an attribute map. */
+    private static long attributesHash(final Map<String, String> variables) {
+        long h = 0;
+        for (final Map.Entry<String, String> e : variables.entrySet()) {
+            h ^= entryHash(e.getKey(), e.getValue());
+        }
+        return h;
+    }
+
+    /** Packs two 32-bit string hashes into one 64-bit value — key in the high 
half, value low. */
+    private static long entryHash(final String key, final String value) {
+        return (((long) key.hashCode()) << 32) ^ (value.hashCode() & 
LOW_32_BITS_MASK);
+    }
+
+    private static Map<String, Integer> parseAttributeLimits(final 
MetricConfig config) {
+        final Map<String, Integer> limits = new HashMap<>();
+        for (final Object keyObj : config.keySet()) {
+            if (!(keyObj instanceof String)) {
+                continue;
+            }
+            final String fullKey = (String) keyObj;
+            if (!fullKey.startsWith(ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY)) 
{
+                continue;
+            }
+            final String attributeName =
+                    
fullKey.substring(ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY.length());
+            if (attributeName.isEmpty()) {
+                LOG.warn(
+                        "Ignoring attribute value length limit with empty 
attribute name for key: {}",
+                        fullKey);
+                continue;
+            }
+            try {
+                final int limit = config.getInteger(fullKey, 
Integer.MAX_VALUE);
+                limits.put(attributeName, limit);
+            } catch (final NumberFormatException e) {
+                LOG.warn(
+                        "Skipping invalid format for attribute length limit 
with key: {} and value: {}",
+                        fullKey,
+                        config.get(fullKey),
+                        e);
+            }
+        }
+        return limits;
+    }
+}
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
index 33215ab3678..6876fb09275 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java
@@ -105,6 +105,9 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
     private long exportCompletionTimeoutMillis =
             
OpenTelemetryReporterOptions.EXPORT_COMPLETION_TIMEOUT_MILLIS.defaultValue();
 
+    @GuardedBy("this")
+    private @Nullable MetricAttributeTransformer transformer;
+
     public OpenTelemetryMetricReporter() {
         this(Clock.systemUTC());
     }
@@ -114,6 +117,12 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
         this.clock = clock;
     }
 
+    /**
+     * Initializes the reporter's exporter, batching, and attribute 
transformer from {@code
+     * metricConfig}. Must be called before any of {@link 
#notifyOfAddedMetric}, {@link
+     * #notifyOfRemovedMetric}, {@link #report}, or {@link 
#collectAllMetrics}; violating this
+     * contract will result in an NPE.
+     */
     @Override
     public void open(final MetricConfig metricConfig) {
         LOG.info("Starting OpenTelemetryMetricReporter");
@@ -122,6 +131,7 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
         synchronized (this) {
             exporter = createExporter(metricConfig);
             configureBatching(metricConfig);
+            transformer = new MetricAttributeTransformer(metricConfig);
         }
     }
 
@@ -156,6 +166,7 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
 
     @Override
     public synchronized void close() {
+        transformer = null;
         if (exporter != null) {
             exporter.flush();
             waitForLastReportToComplete();
@@ -172,17 +183,19 @@ public class OpenTelemetryMetricReporter extends 
OpenTelemetryReporterBase
                         + "."
                         + metricName;
 
-        Map<String, String> variables =
+        final Map<String, String> rawVariables =
                 group.getAllVariables().entrySet().stream()
                         .collect(
                                 Collectors.toMap(
                                         e -> 
VariableNameUtil.getVariableName(e.getKey()),
                                         Entry::getValue));
-        LOG.debug("Adding metric {} with variables {}", metricName, variables);
-
-        MetricMetadata metricMetadata = new MetricMetadata(name, variables);
 
         synchronized (this) {
+            final Map<String, String> variables = transformer.transform(name, 
rawVariables);
+
+            LOG.debug("Adding metric {} with variables {}", metricName, 
variables);
+
+            final MetricMetadata metricMetadata = new MetricMetadata(name, 
variables);
             switch (metric.getMetricType()) {
                 case COUNTER:
                     this.counters.put((Counter) metric, metricMetadata);
diff --git 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
index f810750b985..5b2b571104c 100644
--- 
a/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
+++ 
b/flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryReporterOptions.java
@@ -132,6 +132,71 @@ public final class OpenTelemetryReporterOptions {
                                             "Timeout in milliseconds for 
waiting on async export completion.")
                                     .build());
 
+    /** Prefix key used to identify attribute value length limit configuration 
entries. */
+    public static final String ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY =
+            "transform.attribute-value-length-limits.";
+
+    /**
+     * Config option for attribute value length limits. Only used for 
documentation purposes — the
+     * actual option is prefix-based and parsed by {@link 
MetricAttributeTransformer}.
+     *
+     * <p>For example, to limit the {@code task_name} attribute to 60 
characters set {@code
+     * 
metrics.reporter.otel.transform.attribute-value-length-limits.task_name: 60} in 
the Flink
+     * configuration. The special key {@code *} defines a global limit for all 
attributes not
+     * explicitly listed. {@code 0} drops an attribute; negative values 
disable the limit for that
+     * attribute (useful to override a global cap).
+     */
+    @PublicEvolving
+    public static final ConfigOption<Integer> ATTRIBUTE_VALUE_LENGTH_LIMITS =
+            ConfigOptions.key(ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + 
"<attribute-name>")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Limits of the exported attribute 
values length. Only applies to the metric "
+                                                    + "reporter; ignored by 
the trace and event reporters. "
+                                                    + "Configuration is prefix 
based, "
+                                                    + "for example to limit 
`task_name` attribute set "
+                                                    + 
"`metrics.reporter.otel.transform.attribute-value-length-limits.task_name: 60` "
+                                                    + "in the config for OTel 
reporter. "
+                                                    + "A special key '*' can 
be used to define a global limit for all attributes not "
+                                                    + "explicitly listed. "
+                                                    + "For example 
`metrics.reporter.otel.transform.attribute-value-length-limits.*: 1024` "
+                                                    + "will limit all 
attributes to attributeValue.substring(0, 1024). "
+                                                    + "Global limit defaults 
to Integer.MAX_VALUE if not set. Individual attribute "
+                                                    + "limits always override 
the global limit and are verified by exact match on the "
+                                                    + "attribute name. "
+                                                    + "0 can be used to drop 
an attribute. Negative values are interpreted as no limit "
+                                                    + "for the attribute (can 
be used for global limit overrides).")
+                                    .build());
+
+    /** Config key for the collision tracker's maximum size. */
+    public static final String COLLISION_TRACKING_MAX_SLOTS_KEY =
+            "transform.collision-tracking-max-slots";
+
+    @PublicEvolving
+    public static final ConfigOption<Integer> COLLISION_TRACKING_MAX_SLOTS =
+            ConfigOptions.key(COLLISION_TRACKING_MAX_SLOTS_KEY)
+                    .intType()
+                    .defaultValue(50_000)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Maximum number of distinct 
(metric name, transformed attributes) "
+                                                    + "slots the 
truncation-collision tracker retains. Only applies to "
+                                                    + "the metric reporter; 
ignored by the trace and event reporters. "
+                                                    + "When the cap is "
+                                                    + "reached the 
least-recently-touched slot is evicted (LRU); a "
+                                                    + "previously warned slot 
that later gets evicted may fire its warning "
+                                                    + "again on re-entry. "
+                                                    + "Only consulted when 
attribute-value length limits are configured — "
+                                                    + "if no truncation is 
happening, there is nothing to track and this "
+                                                    + "option has no effect. "
+                                                    + "Set to 0 to disable 
collision tracking entirely. Malformed or "
+                                                    + "negative values fall 
back to the default with a warning in the logs.")
+                                    .build());
+
     @Internal
     public static void tryConfigureTimeout(MetricConfig metricConfig, 
Consumer<Duration> builder) {
         final String timeoutConfKey = EXPORTER_TIMEOUT.key();
diff --git 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java
 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java
new file mode 100644
index 00000000000..ebb500d5ec6
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/MetricAttributeTransformerTest.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.otel;
+
+import org.apache.flink.metrics.MetricConfig;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+/** Unit tests for {@link MetricAttributeTransformer}. */
+class MetricAttributeTransformerTest {
+
+    private static final String TEST_VALUE = "test-attribute-value";
+
+    // Attribute that should be truncated to the global limit
+    private static final String GLOBAL_TRUNCATED_KEY = "global_truncated_key";
+    private static final int GLOBAL_LIMIT = 5;
+    private static final String GLOBAL_TRUNCATED_EXPECTED = 
TEST_VALUE.substring(0, GLOBAL_LIMIT);
+
+    // Attribute with a per-attribute limit smaller than the global limit
+    private static final String CUSTOM_SMALL_KEY = "custom_small_key";
+    private static final int CUSTOM_SMALL_LIMIT = 3;
+    private static final String CUSTOM_SMALL_EXPECTED = 
TEST_VALUE.substring(0, CUSTOM_SMALL_LIMIT);
+
+    // Attribute with a per-attribute limit larger than the global limit
+    private static final String CUSTOM_LARGE_KEY = "custom_large_key";
+    private static final int CUSTOM_LARGE_LIMIT = GLOBAL_LIMIT + 
CUSTOM_SMALL_LIMIT; // 8
+    private static final String CUSTOM_LARGE_EXPECTED = 
TEST_VALUE.substring(0, CUSTOM_LARGE_LIMIT);
+
+    // Attribute whose value is already shorter than the global limit
+    private static final String NON_TRUNCATED_KEY = "non_truncated_key";
+    private static final String NON_TRUNCATED_VALUE = TEST_VALUE.substring(0, 
GLOBAL_LIMIT - 1);
+
+    // Attribute with a zero limit (should be dropped)
+    private static final String ZERO_KEY = "zero_key";
+
+    // Attribute with a negative limit (no truncation, overrides global)
+    private static final String NEGATIVE_KEY = "negative_key";
+    private static final int NEGATIVE_LIMIT = -GLOBAL_LIMIT;
+
+    // Attribute with an Integer-typed config value — verifies 
MetricConfig.getInteger tolerates
+    // non-String values stored directly in the Properties map.
+    private static final String NUMERIC_KEY = "numeric_key";
+    private static final int NUMERIC_LIMIT = 42;
+
+    // Keys with invalid (non-integer) values that should be skipped
+    private static final String INVALID_STRING_KEY = "invalid_string";
+    private static final String INVALID_DURATION_KEY = "invalid_duration";
+
+    @Test
+    void testLimitsParsingFromConfiguration() {
+        final MetricConfig cfg = buildFullTestConfig();
+        final MetricAttributeTransformer transformer = new 
MetricAttributeTransformer(cfg);
+
+        assertThat(transformer.getAttributeValueLimits())
+                .containsEntry(MetricAttributeTransformer.GLOBAL_LIMIT_KEY, 
GLOBAL_LIMIT)
+                .containsEntry(CUSTOM_SMALL_KEY, CUSTOM_SMALL_LIMIT)
+                .containsEntry(CUSTOM_LARGE_KEY, CUSTOM_LARGE_LIMIT)
+                .containsEntry(ZERO_KEY, 0)
+                .containsEntry(NEGATIVE_KEY, NEGATIVE_LIMIT)
+                .containsEntry(NUMERIC_KEY, NUMERIC_LIMIT)
+                .doesNotContainKey(INVALID_STRING_KEY)
+                .doesNotContainKey(INVALID_DURATION_KEY);
+    }
+
+    @Test
+    void testAttributeValueLengthLimitsParsingAndTruncation() {
+        final MetricConfig cfg = buildFullTestConfig();
+        final MetricAttributeTransformer transformer = new 
MetricAttributeTransformer(cfg);
+
+        final Map<String, String> input = new HashMap<>();
+        input.put(CUSTOM_SMALL_KEY, TEST_VALUE);
+        input.put(CUSTOM_LARGE_KEY, TEST_VALUE);
+        input.put(GLOBAL_TRUNCATED_KEY, TEST_VALUE);
+        input.put(NON_TRUNCATED_KEY, NON_TRUNCATED_VALUE);
+        input.put(NEGATIVE_KEY, TEST_VALUE);
+        input.put(ZERO_KEY, TEST_VALUE);
+
+        final Map<String, String> result = transformer.transform("m", input);
+
+        assertThat(result)
+                .containsEntry(CUSTOM_SMALL_KEY, CUSTOM_SMALL_EXPECTED)
+                .containsEntry(CUSTOM_LARGE_KEY, CUSTOM_LARGE_EXPECTED)
+                .containsEntry(GLOBAL_TRUNCATED_KEY, GLOBAL_TRUNCATED_EXPECTED)
+                .containsEntry(NON_TRUNCATED_KEY, NON_TRUNCATED_VALUE)
+                .containsEntry(NEGATIVE_KEY, TEST_VALUE)
+                .doesNotContainKey(ZERO_KEY);
+
+        assertNotSame(input, result);
+    }
+
+    @Test
+    void testNoTruncationWhenConfigIsEmpty() {
+        final MetricConfig cfg = new MetricConfig();
+        final MetricAttributeTransformer transformer = new 
MetricAttributeTransformer(cfg);
+
+        final Map<String, String> input = new HashMap<>();
+        input.put("k", "v");
+
+        final Map<String, String> result = transformer.transform("m", input);
+
+        assertSame(input, result);
+    }
+
+    @Test
+    void testTruncationWithoutGlobalLimit() {
+        final MetricConfig cfg = new MetricConfig();
+        cfg.setProperty(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+                        + CUSTOM_SMALL_KEY,
+                String.valueOf(CUSTOM_SMALL_LIMIT));
+
+        final MetricAttributeTransformer transformer = new 
MetricAttributeTransformer(cfg);
+
+        final Map<String, String> input = new HashMap<>();
+        input.put(CUSTOM_SMALL_KEY, TEST_VALUE);
+        input.put(NON_TRUNCATED_KEY, TEST_VALUE);
+
+        final Map<String, String> result = transformer.transform("m", input);
+
+        assertThat(result)
+                .containsEntry(CUSTOM_SMALL_KEY, CUSTOM_SMALL_EXPECTED)
+                .containsEntry(NON_TRUNCATED_KEY, TEST_VALUE);
+    }
+
+    @Test
+    void testEmptyAttributeValueIsPreservedUnderNonZeroLimit() {
+        // Regression: an empty-string attribute value must survive any 
non-zero configured limit.
+        // A naive min(limit, length) computation would collapse to limit=0 
for empty values and
+        // silently drop the attribute, changing exported series identity.
+        final MetricAttributeTransformer transformer = 
buildTransformerWithGlobalLimit(100);
+        final MetricConfig cfgWithPerAttr = new MetricConfig();
+        cfgWithPerAttr.setProperty(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "k", 
"50");
+        final MetricAttributeTransformer perAttrTransformer =
+                new MetricAttributeTransformer(cfgWithPerAttr);
+
+        final Map<String, String> input = new HashMap<>();
+        input.put("k", "");
+
+        assertThat(transformer.transform("m", input)).containsEntry("k", "");
+        assertThat(perAttrTransformer.transform("m", 
input)).containsEntry("k", "");
+    }
+
+    @Test
+    void testEmptyAttributeNameIsRejected() {
+        final MetricConfig cfg = new MetricConfig();
+        // A config key with no attribute name suffix (the dot is the last 
character) must be
+        // silently rejected; the parsed limits map must not contain an 
empty-string key.
+        cfg.setProperty(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY, "10");
+
+        final MetricAttributeTransformer transformer = new 
MetricAttributeTransformer(cfg);
+
+        
assertThat(transformer.getAttributeValueLimits()).doesNotContainKey("");
+    }
+
+    @ParameterizedTest
+    @MethodSource("collisionCases")
+    void testCollisionSemantics(
+            final String caseName, final List<String> rawTaskNames, final long 
expectedCollisions) {
+        final MetricAttributeTransformer transformer = 
buildTransformerWithGlobalLimit(3);
+
+        for (final String taskName : rawTaskNames) {
+            final Map<String, String> raw = new HashMap<>();
+            raw.put("task_name", taskName);
+            transformer.transform("m", raw);
+        }
+
+        
assertThat(transformer.getCollisionCount()).as(caseName).isEqualTo(expectedCollisions);
+    }
+
+    static Stream<Arguments> collisionCases() {
+        return Stream.of(
+                Arguments.of("single raw — no collision", 
Arrays.asList("tsk_AAA"), 0L),
+                Arguments.of(
+                        "identical raws re-registered — no collision",
+                        Arrays.asList("tsk_AAA", "tsk_AAA", "tsk_AAA"),
+                        0L),
+                Arguments.of(
+                        "two distinct raws collapsing — one collision",
+                        Arrays.asList("tsk_AAA", "tsk_BBB"),
+                        1L),
+                Arguments.of(
+                        "multiple distinct raws at same slot — sentinel caps 
at one warning",
+                        Arrays.asList("tsk_AAA", "tsk_BBB", "tsk_CCC", 
"tsk_DDD", "tsk_EEE"),
+                        1L));
+    }
+
+    @Test
+    void testNoOpTransformDoesNotOccupyTrackingSlot() {
+        final MetricAttributeTransformer transformer = 
buildTransformerWithGlobalLimit(100);
+
+        final Map<String, String> raw = new HashMap<>();
+        // Value shorter than the limit — no truncation happens.
+        raw.put("task_name", "short");
+        transformer.transform("m", raw);
+
+        assertThat(transformer.getTrackedSlotCount())
+                .as("No-op transformation must not populate the collision 
tracker")
+                .isZero();
+    }
+
+    @Test
+    void testSlotTrackingIsBounded() {
+        // Per-attribute limits: `task_name` truncates; `task_attempt_id` is 
kept in full (negative
+        // limit), so its unique per-attempt value survives into the 
transformed map. Every
+        // registration therefore lands on a distinct slot — exactly the 
failover-loop pattern.
+        final int maxSlots = 32;
+        final MetricConfig cfg = new MetricConfig();
+        final String prefix = 
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY;
+        cfg.setProperty(prefix + "task_name", "3");
+        cfg.setProperty(prefix + "task_attempt_id", "-1");
+        cfg.setProperty(
+                OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY,
+                String.valueOf(maxSlots));
+        final MetricAttributeTransformer transformer = new 
MetricAttributeTransformer(cfg);
+
+        for (int i = 0; i < maxSlots * 4; i++) {
+            final Map<String, String> raw = new HashMap<>();
+            raw.put("task_attempt_id", "attempt-" + i);
+            raw.put("task_name", "long_task_name_that_truncates");
+            transformer.transform("m", raw);
+        }
+
+        assertThat(transformer.getTrackedSlotCount())
+                .as("Slot tracker must saturate at the configured maximum 
under distinct-slot load")
+                .isEqualTo(maxSlots);
+    }
+
+    @Test
+    void testHotSlotSurvivesLruEviction() {
+        // Confirms the tracker is access-ordered LRU, not insertion-ordered 
or arbitrary:
+        // a slot that keeps being re-observed must outlast cold slots under 
cap pressure.
+        final int maxSlots = 16;
+        final MetricConfig cfg = new MetricConfig();
+        final String prefix = 
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY;
+        cfg.setProperty(prefix + "task_name", "3"); // truncate task_name to 3 
chars
+        cfg.setProperty(prefix + "task_attempt_id", "-1"); // keep per-attempt 
UUID in full
+        cfg.setProperty(
+                OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY,
+                String.valueOf(maxSlots));
+        final MetricAttributeTransformer transformer = new 
MetricAttributeTransformer(cfg);
+
+        // Prime the hot slot — "hot_AAA" truncates to "hot".
+        final Map<String, String> hotRawA = new HashMap<>();
+        hotRawA.put("task_name", "hot_AAA");
+        transformer.transform("m", hotRawA);
+
+        // Fill the tracker past capacity with cold, distinct slots. Between 
each cold
+        // registration, re-touch the hot slot so it stays MRU.
+        for (int i = 0; i < maxSlots * 4; i++) {
+            final Map<String, String> coldRaw = new HashMap<>();
+            coldRaw.put("task_attempt_id", "attempt-" + i);
+            coldRaw.put("task_name", "long_task_name_that_truncates");
+            transformer.transform("m", coldRaw);
+            transformer.transform("m", hotRawA); // refresh LRU access order 
on the hot slot
+        }
+
+        // Register a second raw that collapses onto the same transformed slot 
as the hot one.
+        // If the hot slot is still tracked, this fires a collision WARN. If 
it was evicted
+        // (i.e., the map is not access-ordered LRU), no collision is recorded 
— the re-entry
+        // would just insert a new SlotState.
+        final Map<String, String> hotRawB = new HashMap<>();
+        hotRawB.put("task_name", "hot_BBB");
+        transformer.transform("m", hotRawB);
+
+        assertThat(transformer.getCollisionCount())
+                .as("Hot slot must survive LRU eviction and fire a collision 
on re-entry")
+                .isEqualTo(1);
+    }
+
+    @Test
+    void testNegativeCollisionTrackingMaxSlotsFallsBackToDefault() {
+        final MetricConfig cfg = new MetricConfig();
+        cfg.setProperty(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "*", 
"3");
+        
cfg.setProperty(OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY, 
"-1");
+
+        // Must not throw — invalid values log a WARN and fall back to the 
default, same as the
+        // attribute-limit parser's behavior for malformed entries.
+        final MetricAttributeTransformer transformer = new 
MetricAttributeTransformer(cfg);
+
+        // Tracking is enabled with the default cap — two distinct raws 
collapsing must still WARN.
+        final Map<String, String> firstRaw = new HashMap<>();
+        firstRaw.put("task_name", "tsk_AAA");
+        transformer.transform("m", firstRaw);
+        final Map<String, String> secondRaw = new HashMap<>();
+        secondRaw.put("task_name", "tsk_BBB");
+        transformer.transform("m", secondRaw);
+
+        assertThat(transformer.getCollisionCount())
+                .as("Negative maxSlots must fall back to default, leaving 
tracking enabled")
+                .isEqualTo(1);
+    }
+
+    @Test
+    void testMalformedCollisionTrackingMaxSlotsFallsBackToDefault() {
+        final MetricConfig cfg = new MetricConfig();
+        
cfg.setProperty(OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY, 
"not-a-num");
+
+        // Must not throw — malformed integers log a WARN and fall back to the 
default.
+        final MetricAttributeTransformer transformer = new 
MetricAttributeTransformer(cfg);
+
+        assertThat(transformer.getTrackedSlotCount())
+                .as("Malformed maxSlots must fall back to default, leaving 
tracking enabled")
+                .isZero();
+    }
+
+    @Test
+    void testCollisionTrackingCanBeDisabled() {
+        final MetricConfig cfg = new MetricConfig();
+        cfg.setProperty(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "*", 
"3");
+        
cfg.setProperty(OpenTelemetryReporterOptions.COLLISION_TRACKING_MAX_SLOTS_KEY, 
"0");
+        final MetricAttributeTransformer transformer = new 
MetricAttributeTransformer(cfg);
+
+        // Two distinct raws collapsing to the same transformed slot would 
normally WARN; with
+        // tracking disabled, nothing is recorded.
+        for (final String taskName : Arrays.asList("tsk_AAA", "tsk_BBB")) {
+            final Map<String, String> raw = new HashMap<>();
+            raw.put("task_name", taskName);
+            transformer.transform("m", raw);
+        }
+
+        assertThat(transformer.getCollisionCount())
+                .as("No collisions must be recorded when tracking is disabled")
+                .isZero();
+        assertThat(transformer.getTrackedSlotCount())
+                .as("Tracked slot count must be zero when tracking is 
disabled")
+                .isZero();
+    }
+
+    private static MetricAttributeTransformer 
buildTransformerWithGlobalLimit(final int limit) {
+        final MetricConfig cfg = new MetricConfig();
+        cfg.setProperty(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + "*",
+                String.valueOf(limit));
+        return new MetricAttributeTransformer(cfg);
+    }
+
+    @Test
+    void testTruncationWithIntegerConfigValue() {
+        final MetricConfig cfg = new MetricConfig();
+        // Put an Integer object (not a String) to verify that 
MetricConfig.getInteger handles
+        // non-String property values correctly.
+        cfg.put(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + 
NUMERIC_KEY,
+                NUMERIC_LIMIT);
+
+        final MetricAttributeTransformer transformer = new 
MetricAttributeTransformer(cfg);
+
+        
assertThat(transformer.getAttributeValueLimits()).containsEntry(NUMERIC_KEY, 
NUMERIC_LIMIT);
+    }
+
+    // -----------------------------------------------------------------------
+
+    private static MetricConfig buildFullTestConfig() {
+        final MetricConfig cfg = new MetricConfig();
+
+        cfg.setProperty(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+                        + MetricAttributeTransformer.GLOBAL_LIMIT_KEY,
+                String.valueOf(GLOBAL_LIMIT));
+        cfg.setProperty(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+                        + CUSTOM_SMALL_KEY,
+                String.valueOf(CUSTOM_SMALL_LIMIT));
+        cfg.setProperty(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+                        + CUSTOM_LARGE_KEY,
+                String.valueOf(CUSTOM_LARGE_LIMIT));
+        cfg.setProperty(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + 
ZERO_KEY,
+                String.valueOf(0));
+        cfg.setProperty(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+                        + NEGATIVE_KEY,
+                String.valueOf(NEGATIVE_LIMIT));
+
+        // Integer-typed value — should be parsed correctly.
+        cfg.put(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY + 
NUMERIC_KEY,
+                NUMERIC_LIMIT);
+
+        // Non-numeric string — should be skipped.
+        cfg.setProperty(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+                        + INVALID_STRING_KEY,
+                "3.5");
+
+        // Non-string value — should be skipped.
+        cfg.put(
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY
+                        + INVALID_DURATION_KEY,
+                Duration.ofSeconds(1));
+
+        return cfg;
+    }
+}
diff --git 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
index e7a96940f95..582a4c31511 100644
--- 
a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
+++ 
b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporterITCase.java
@@ -40,8 +40,13 @@ import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.time.Clock;
 import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -321,6 +326,99 @@ public class OpenTelemetryMetricReporterITCase extends 
OpenTelemetryTestBase {
                 });
     }
 
+    /**
+     * Exercises all four attribute-limit config semantics in a single 
end-to-end run, proving that
+     * the reporter routes the transformer through to the exported OTLP 
payload:
+     *
+     * <ul>
+     *   <li>{@code task_name} — truncated by the global limit (5).
+     *   <li>{@code operator_name} — truncated by a per-attribute limit (3) 
overriding the global.
+     *   <li>{@code job_id} — dropped entirely via limit {@code 0}.
+     *   <li>{@code job_name} — kept at full length via limit {@code -1}, 
overriding the global.
+     * </ul>
+     */
+    @Test
+    public void testAttributeValueTruncation() throws Exception {
+        final MetricConfig metricConfig = createMetricConfig();
+        final String limitsPrefix =
+                
OpenTelemetryReporterOptions.ATTRIBUTE_VALUE_LENGTH_LIMITS_PREFIX_KEY;
+        metricConfig.setProperty(limitsPrefix + "*", "5");
+        metricConfig.setProperty(limitsPrefix + "operator_name", "3");
+        metricConfig.setProperty(limitsPrefix + "job_id", "0");
+        metricConfig.setProperty(limitsPrefix + "job_name", "-1");
+
+        final String longTaskName = "task_name_longer_than_five";
+        final String longOperatorName = "operator_name_longer_than_three";
+        final String longJobId = "job_id_dropped_entirely";
+        final String longJobName = "job_name_kept_despite_global_limit";
+
+        final Map<String, String> rawVariables = new HashMap<>();
+        rawVariables.put("<task_name>", longTaskName);
+        rawVariables.put("<operator_name>", longOperatorName);
+        rawVariables.put("<job_id>", longJobId);
+        rawVariables.put("<job_name>", longJobName);
+        final MetricGroup group = new 
TestMetricGroupWithVariables(rawVariables);
+
+        reporter.open(metricConfig);
+        reporter.notifyOfAddedMetric(new SimpleCounter(), "trunc.counter", 
group);
+        reporter.report();
+        reporter.close();
+
+        final String metricName = "flink.logical.scope.trunc.counter";
+        eventuallyConsumeJson(
+                json -> {
+                    assertThat(extractMetricNames(json)).contains(metricName);
+
+                    assertThat(
+                                    collectAttributeValues(
+                                            json, metricName, 
"/sum/dataPoints", "task_name"))
+                            .as("task_name must be truncated to the global 
limit (5)")
+                            .containsExactly(longTaskName.substring(0, 5));
+
+                    assertThat(
+                                    collectAttributeValues(
+                                            json, metricName, 
"/sum/dataPoints", "operator_name"))
+                            .as("operator_name must be truncated by 
per-attribute limit (3)")
+                            .containsExactly(longOperatorName.substring(0, 3));
+
+                    assertThat(
+                                    collectAttributeValues(
+                                            json, metricName, 
"/sum/dataPoints", "job_id"))
+                            .as("job_id must be dropped entirely (limit 0)")
+                            .isEmpty();
+
+                    assertThat(
+                                    collectAttributeValues(
+                                            json, metricName, 
"/sum/dataPoints", "job_name"))
+                            .as(
+                                    "job_name must be kept at full length 
(negative limit overrides global)")
+                            .containsExactly(longJobName);
+                });
+    }
+
+    /**
+     * Collects the string values of attribute {@code attrKey} on every data 
point of the first
+     * metric in {@code json} matching {@code metricName}.
+     */
+    private static List<String> collectAttributeValues(
+            final JsonNode json,
+            final String metricName,
+            final String dataPointsPath,
+            final String attrKey) {
+        final List<String> values = new ArrayList<>();
+        
streamOf(json.findPath("resourceMetrics").findPath("scopeMetrics").findPath("metrics"))
+                .filter(metric -> 
metricName.equals(metric.findPath("name").asText()))
+                .flatMap(metric -> streamOf(metric.at(dataPointsPath)))
+                .flatMap(dp -> streamOf(dp.findPath("attributes")))
+                .filter(attr -> attrKey.equals(attr.findPath("key").asText()))
+                .forEach(attr -> 
values.add(attr.at("/value/stringValue").asText()));
+        return values;
+    }
+
+    private static Stream<JsonNode> streamOf(final JsonNode node) {
+        return StreamSupport.stream(node.spliterator(), false);
+    }
+
     static class TestMetricGroup extends UnregisteredMetricsGroup implements 
LogicalScopeProvider {
 
         @Override
@@ -338,4 +436,22 @@ public class OpenTelemetryMetricReporterITCase extends 
OpenTelemetryTestBase {
             return this;
         }
     }
+
+    /**
+     * A {@link TestMetricGroup} that additionally exposes a fixed set of 
variables via {@link
+     * #getAllVariables()}, enabling tests to verify attribute-level behaviour 
(e.g. truncation).
+     */
+    static class TestMetricGroupWithVariables extends TestMetricGroup {
+
+        private final Map<String, String> variables;
+
+        TestMetricGroupWithVariables(final Map<String, String> variables) {
+            this.variables = variables;
+        }
+
+        @Override
+        public Map<String, String> getAllVariables() {
+            return variables;
+        }
+    }
 }

Reply via email to