This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b82a5b2fe835 feat: add metrics for bucketassign.minibatch cache hit
ratio (#18761)
b82a5b2fe835 is described below
commit b82a5b2fe83548ad31a6f57e6b0a8f3368518914
Author: Yao Li <[email protected]>
AuthorDate: Thu May 21 01:35:26 2026 -0700
feat: add metrics for bucketassign.minibatch cache hit ratio (#18761)
Co-authored-by: Cursor <[email protected]>
---
.../hudi/metrics/FlinkIndexBackendMetrics.java | 35 ++++++++-
.../index/GlobalRecordLevelIndexBackend.java | 6 +-
.../hudi/metrics/TestFlinkIndexBackendMetrics.java | 52 +++++++++++++
.../index/TestGlobalRecordLevelIndexBackend.java | 87 ++++++++++++++++++++++
4 files changed, 177 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
index 65fad78e93a7..4c8fc700c282 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkIndexBackendMetrics.java
@@ -20,19 +20,23 @@ package org.apache.hudi.metrics;
import com.codahale.metrics.SlidingWindowReservoir;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
import org.apache.hudi.common.util.VisibleForTesting;
/**
* Metrics for the {@link
org.apache.hudi.sink.partitioner.index.GlobalRecordLevelIndexBackend}.
- * Tracks cache hit/miss counts and the latency of local (cache) vs. remote
(metadata table) lookups.
+ * Tracks the latency of local (cache) vs. remote (metadata table) lookups,
the per-lookup key
+ * counts, and the per-mini-batch in-memory cache hit ratio.
*/
public class FlinkIndexBackendMetrics extends HoodieFlinkMetrics {
private static final int HISTOGRAM_WINDOW_SIZE = 100;
private static final String LOCAL_INDEX_LOOKUP_KEY = "local_index_lookup";
private static final String REMOTE_INDEX_LOOKUP_KEY = "remote_index_lookup";
+ public static final String LOOKUP_CACHE_HIT_RATIO = "lookupCacheHitRatio";
+
/** Latency of the local (cache) phase of each index lookup, in
milliseconds. */
private final Histogram localIndexLookupLatency;
@@ -45,6 +49,15 @@ public class FlinkIndexBackendMetrics extends
HoodieFlinkMetrics {
/** Number of keys that missed the local cache and were fetched remotely per
lookup. */
private final Histogram remoteLookupKeysNum;
+ /**
+ * Latest per-mini-batch in-memory cache hit ratio (hits / (hits + misses)).
+ * Set on each {@link #updateLookupCacheHitRatio(long, long)} call; defaults
to 0.0
+ * and is left unchanged when a lookup observes zero total keys.
+ * Marked {@code volatile} because Flink's reporter thread reads it while the
+ * bucket-assign task thread writes to it.
+ */
+ private volatile double lookupCacheHitRatio = 0.0D;
+
public FlinkIndexBackendMetrics(MetricGroup metricGroup) {
super(metricGroup);
this.localIndexLookupLatency = new DropwizardHistogramWrapper(
@@ -63,6 +76,21 @@ public class FlinkIndexBackendMetrics extends
HoodieFlinkMetrics {
metricGroup.histogram("remoteIndexLookupLatency",
remoteIndexLookupLatency);
metricGroup.histogram("localLookupKeysNum", localLookupKeysNum);
metricGroup.histogram("remoteLookupKeysNum", remoteLookupKeysNum);
+ metricGroup.gauge(LOOKUP_CACHE_HIT_RATIO, (Gauge<Double>) () ->
lookupCacheHitRatio);
+ }
+
+ /**
+ * Updates the per-mini-batch cache hit-ratio gauge from the hit/miss counts
already
+ * fed into {@link #updateLocalLookupKeysCount(long)} and {@link
#updateRemoteLookupKeysCount(long)}.
+ * When the lookup observed no keys, the previous value is preserved so
dashboards
+ * don't oscillate back to zero on idle mini-batches.
+ */
+ public void updateLookupCacheHitRatio(long hitCount, long missCount) {
+ long total = hitCount + missCount;
+ if (total <= 0L) {
+ return;
+ }
+ lookupCacheHitRatio = (double) hitCount / total;
}
public void startLocalIndexLookup() {
@@ -108,4 +136,9 @@ public class FlinkIndexBackendMetrics extends
HoodieFlinkMetrics {
public long getRemoteLookupKeysSampleCount() {
return remoteLookupKeysNum.getCount();
}
+
+ @VisibleForTesting
+ public double getLookupCacheHitRatio() {
+ return lookupCacheHitRatio;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
index 628e20e86b81..1eeef79af9b2 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
@@ -18,7 +18,6 @@
package org.apache.hudi.sink.partitioner.index;
-import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieListData;
@@ -37,6 +36,7 @@ import org.apache.hudi.util.StreamerUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
import java.io.IOException;
import java.util.ArrayList;
@@ -106,8 +106,10 @@ public class GlobalRecordLevelIndexBackend implements
MinibatchIndexBackend {
}
}
+ int hitCount = recordKeys.size() - missedKeys.size();
metrics.endLocalIndexLookup();
- metrics.updateLocalLookupKeysCount(recordKeys.size() - missedKeys.size());
+ metrics.updateLocalLookupKeysCount(hitCount);
+ metrics.updateLookupCacheHitRatio(hitCount, missedKeys.size());
if (!missedKeys.isEmpty()) {
metrics.startRemoteIndexLookup();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
index 3a11348e2431..a41e713a8496 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkIndexBackendMetrics.java
@@ -18,6 +18,7 @@
package org.apache.hudi.metrics;
+import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -54,6 +55,46 @@ class TestFlinkIndexBackendMetrics {
assertNotNull(metricGroup.getHistogram("remoteLookupKeysNum"));
}
+ @Test
+ void testRegisterMetricsRegistersHitRatioGauge() {
+ Gauge<?> gauge =
metricGroup.getGauge(FlinkIndexBackendMetrics.LOOKUP_CACHE_HIT_RATIO);
+ assertNotNull(gauge);
+ assertEquals(0.0D, ((Double) gauge.getValue()).doubleValue());
+ }
+
+ @Test
+ void testUpdateLookupCacheHitRatioTracksLatestMiniBatch() {
+ Gauge<?> gauge =
metricGroup.getGauge(FlinkIndexBackendMetrics.LOOKUP_CACHE_HIT_RATIO);
+
+ metrics.updateLookupCacheHitRatio(3L, 1L);
+ assertEquals(0.75D, ((Double) gauge.getValue()).doubleValue());
+ assertEquals(0.75D, metrics.getLookupCacheHitRatio());
+
+ // gauge reflects the latest mini-batch, not a running average.
+ metrics.updateLookupCacheHitRatio(1L, 3L);
+ assertEquals(0.25D, ((Double) gauge.getValue()).doubleValue());
+
+ metrics.updateLookupCacheHitRatio(5L, 0L);
+ assertEquals(1.0D, ((Double) gauge.getValue()).doubleValue());
+
+ metrics.updateLookupCacheHitRatio(0L, 5L);
+ assertEquals(0.0D, ((Double) gauge.getValue()).doubleValue());
+ }
+
+ @Test
+ void testUpdateLookupCacheHitRatioPreservesPreviousValueOnEmptyBatch() {
+ metrics.updateLookupCacheHitRatio(3L, 1L);
+ assertEquals(0.75D, metrics.getLookupCacheHitRatio());
+
+ // empty mini-batch (no keys looked up) must not reset the ratio to NaN or
0.
+ metrics.updateLookupCacheHitRatio(0L, 0L);
+ assertEquals(0.75D, metrics.getLookupCacheHitRatio());
+
+ // negative counts are treated as an empty batch (defensive guard).
+ metrics.updateLookupCacheHitRatio(-1L, -2L);
+ assertEquals(0.75D, metrics.getLookupCacheHitRatio());
+ }
+
@Test
void testLocalIndexLookupUpdatesHistogramCount() {
Histogram hist = metricGroup.getHistogram("localIndexLookupLatency");
@@ -189,6 +230,7 @@ class TestFlinkIndexBackendMetrics {
private static class CapturingMetricGroup extends UnregisteredMetricsGroup {
private final Map<String, Histogram> histograms = new HashMap<>();
+ private final Map<String, Gauge<?>> gauges = new HashMap<>();
@Override
public <H extends Histogram> H histogram(String name, H histogram) {
@@ -196,8 +238,18 @@ class TestFlinkIndexBackendMetrics {
return histogram;
}
+ @Override
+ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
+ gauges.put(name, gauge);
+ return gauge;
+ }
+
Histogram getHistogram(String name) {
return histograms.get(name);
}
+
+ Gauge<?> getGauge(String name) {
+ return gauges.get(name);
+ }
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
index 3f1a3b28af1b..edfad3481560 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
@@ -24,6 +24,7 @@ import
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.metrics.FlinkIndexBackendMetrics;
import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -31,6 +32,8 @@ import org.apache.hudi.utils.TestData;
import org.apache.hudi.utils.TestUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -48,8 +51,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -222,4 +229,84 @@ public class TestGlobalRecordLevelIndexBackend {
assertEquals(location,
backend.get(Collections.singletonList("idempotent_key")).get("idempotent_key"));
}
}
+
+ @Test
+ void testGetUpdatesCacheHitRatioGauge() throws Exception {
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ try (GlobalRecordLevelIndexBackend backend = new
GlobalRecordLevelIndexBackend(conf, -1)) {
+ Map<String, Gauge<?>> gauges = new HashMap<>();
+ backend.registerMetrics(captureGauges(gauges));
+
+ Gauge<?> hitRatioGauge =
gauges.get(FlinkIndexBackendMetrics.LOOKUP_CACHE_HIT_RATIO);
+ assertNotNull(hitRatioGauge);
+ // no lookups yet: gauge is at its initial value.
+ assertEquals(0.0D, ((Double) hitRatioGauge.getValue()).doubleValue());
+
+ // first lookup for two known + one unknown key: 0 / 3 hit ratio, then
the two known keys
+ // are populated into the cache by the metadata table fallback.
+ Map<String, HoodieRecordGlobalLocation> firstLookup =
+ backend.get(Arrays.asList("id1", "id2", "missing_key"));
+ assertEquals(2, firstLookup.size());
+ assertEquals(0.0D, ((Double) hitRatioGauge.getValue()).doubleValue());
+
+ // second lookup: id1 and id2 are now cached so they should be hits;
missing_key remains a miss.
+ // ratio = 2 hits / (2 hits + 1 miss).
+ backend.get(Arrays.asList("id1", "id2", "missing_key"));
+ assertEquals(2.0D / 3.0D, ((Double)
hitRatioGauge.getValue()).doubleValue());
+ }
+ }
+
+ @Test
+ void testRegisterMetricsRegistersHitRatioGauge() throws Exception {
+ try (GlobalRecordLevelIndexBackend backend = new
GlobalRecordLevelIndexBackend(conf, -1)) {
+ Map<String, Gauge<?>> gauges = new HashMap<>();
+ backend.registerMetrics(captureGauges(gauges));
+
+
assertTrue(gauges.containsKey(FlinkIndexBackendMetrics.LOOKUP_CACHE_HIT_RATIO));
+ assertEquals(0.0D, ((Double)
gauges.get(FlinkIndexBackendMetrics.LOOKUP_CACHE_HIT_RATIO).getValue()).doubleValue());
+ }
+ }
+
+ @Test
+ void testGetUpdatesCacheHitRatioGaugeForCachedLookup() throws Exception {
+ // Cache-only variant of testGetUpdatesCacheHitRatioGauge: pre-populates
the cache
+ // so the metadata-table fallback is skipped, which keeps the test from
depending on
+ // any writer/runtime setup while still exercising the get(List) -> gauge
wire-up.
+ try (GlobalRecordLevelIndexBackend backend = new
GlobalRecordLevelIndexBackend(conf, -1)) {
+ Map<String, Gauge<?>> gauges = new HashMap<>();
+ backend.registerMetrics(captureGauges(gauges));
+
+ Gauge<?> hitRatioGauge =
gauges.get(FlinkIndexBackendMetrics.LOOKUP_CACHE_HIT_RATIO);
+ assertNotNull(hitRatioGauge);
+ // no lookups yet: gauge is at its initial value.
+ assertEquals(0.0D, ((Double) hitRatioGauge.getValue()).doubleValue());
+
+ HoodieRecordGlobalLocation location1 = new
HoodieRecordGlobalLocation("par1", "000000001", "file-id-1");
+ HoodieRecordGlobalLocation location2 = new
HoodieRecordGlobalLocation("par1", "000000001", "file-id-2");
+ backend.update("cached_key_1", location1);
+ backend.update("cached_key_2", location2);
+
+ // all keys served from the in-memory cache: ratio = 2 / 2 = 1.0.
+ Map<String, HoodieRecordGlobalLocation> result =
+ backend.get(Arrays.asList("cached_key_1", "cached_key_2"));
+ assertEquals(2, result.size());
+ assertEquals(1.0D, ((Double) hitRatioGauge.getValue()).doubleValue());
+
+ // a second, single-key cached lookup keeps the ratio at 1.0.
+ backend.get(Collections.singletonList("cached_key_1"));
+ assertEquals(1.0D, ((Double) hitRatioGauge.getValue()).doubleValue());
+ }
+ }
+
+ private static MetricGroup captureGauges(Map<String, Gauge<?>> sink) {
+ MetricGroup metricGroup = mock(MetricGroup.class);
+ doAnswer(invocation -> {
+ String name = invocation.getArgument(0);
+ Gauge<?> gauge = invocation.getArgument(1);
+ sink.put(name, gauge);
+ return gauge;
+ }).when(metricGroup).gauge(anyString(), any(Gauge.class));
+ return metricGroup;
+ }
}