This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fc85e3ea985c fix(flink): remove pre-seed in metrics in 
GlobalRecordLevelIndexBackend and RecordLevelIndexBackend (#18875)
fc85e3ea985c is described below

commit fc85e3ea985c899ab7120e21c3652c1cec816c13
Author: Shihuan Liu <[email protected]>
AuthorDate: Thu May 28 19:06:52 2026 -0700

    fix(flink): remove pre-seed in metrics in GlobalRecordLevelIndexBackend and 
RecordLevelIndexBackend (#18875)
---
 .../index/GlobalRecordLevelIndexBackend.java       |  2 --
 .../partitioner/index/RecordLevelIndexBackend.java |  6 ------
 .../index/TestGlobalRecordLevelIndexBackend.java   | 13 +-----------
 .../index/TestRecordLevelIndexBackend.java         | 23 ----------------------
 4 files changed, 1 insertion(+), 43 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
index 1eeef79af9b2..4dea5fd49ba7 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.sink.partitioner.index;
 
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.data.HoodiePairData;
@@ -71,7 +70,6 @@ public class GlobalRecordLevelIndexBackend implements 
MinibatchIndexBackend {
     this.metaClient = StreamerUtil.createMetaClient(conf);
     this.conf = conf;
     this.recordIndexCache = new RecordIndexCache(conf, initCheckpointId);
-    registerMetrics(new UnregisteredMetricsGroup());
     reloadMetadataTable();
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
index d1a5f0b05c7c..8128c625158d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
@@ -41,12 +41,10 @@ import org.apache.hudi.sink.utils.SamplingActionExecutor;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
 
-import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -78,7 +76,6 @@ public class RecordLevelIndexBackend implements 
PartitionedIndexBackend {
   private final long maxCacheSizeInBytes;
   private final BootstrapFilter bootstrapFilter;
   private HoodieTableMetadata metadataTable;
-  @Getter(AccessLevel.PACKAGE)
   private FlinkPartitionedIndexBackendMetrics metrics;
 
   @Getter
@@ -102,9 +99,6 @@ public class RecordLevelIndexBackend implements 
PartitionedIndexBackend {
     this.metaClient = StreamerUtil.createMetaClient(conf);
     this.maxCacheSizeInBytes = conf.get(FlinkOptions.INDEX_RLI_CACHE_SIZE) * 
1024 * 1024;
     this.bootstrapFilter = bootstrapFilter;
-    // Pre-seed metrics with an unregistered group so the backend is safe to 
use before the
-    // bucket assign operator wires the real metric group via 
registerMetrics(MetricGroup).
-    registerMetrics(new UnregisteredMetricsGroup());
     reloadMetadataTable();
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
index edfad3481560..9e4d8f9eb0ec 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
@@ -138,6 +138,7 @@ public class TestGlobalRecordLevelIndexBackend {
     conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L);
 
     try (GlobalRecordLevelIndexBackend globalRecordLevelIndexBackend = new 
GlobalRecordLevelIndexBackend(conf, -1)) {
+      
globalRecordLevelIndexBackend.registerMetrics(TaskManagerMetricGroup.createTaskManagerMetricGroup(registry,
 "localhost", ResourceID.generate()));
 
       for (int i = 0; i < 1500; i++) {
         globalRecordLevelIndexBackend.update("id1_" + i,
@@ -203,18 +204,6 @@ public class TestGlobalRecordLevelIndexBackend {
     }
   }
 
-  @Test
-  void testLookupWithoutMetricsRegistrationIsNullSafe() throws Exception {
-    // Verifies that the if (metrics != null) guards in get(List) don't throw 
even when
-    // registerMetrics was never called.
-    try (GlobalRecordLevelIndexBackend backend = new 
GlobalRecordLevelIndexBackend(conf, -1)) {
-      HoodieRecordGlobalLocation location = new 
HoodieRecordGlobalLocation("par1", "000000001", "file-id-1");
-      backend.update("null_metrics_key", location);
-      Map<String, HoodieRecordGlobalLocation> result = 
backend.get(Collections.singletonList("null_metrics_key"));
-      assertEquals(location, result.get("null_metrics_key"));
-    }
-  }
-
   @Test
   void testRegisterMetricsIsIdempotent() throws Exception {
     // The second registerMetrics call must be a no-op and must not throw.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
index f3431a4d1acd..cea4db07294a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
@@ -20,7 +20,6 @@ package org.apache.hudi.sink.partitioner.index;
 
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.metrics.FlinkPartitionedIndexBackendMetrics;
 import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -41,7 +40,6 @@ import java.util.Map;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.when;
@@ -157,15 +155,6 @@ public class TestRecordLevelIndexBackend {
     }
   }
 
-  @Test
-  public void testConstructorPreSeedsMetricsForNullSafety() throws Exception {
-    // Without pre-seeding, bootstrapPartition would NPE before the bucket 
assign operator
-    // wires the real MetricGroup via registerMetrics(MetricGroup).
-    try (RecordLevelIndexBackend backend = createBackend()) {
-      assertNotNull(backend.getMetrics());
-    }
-  }
-
   @Test
   public void testRegisterMetricsRegistersPartitionBootstrapHistograms() 
throws Exception {
     CapturingMetricGroup metricGroup = new CapturingMetricGroup();
@@ -177,18 +166,6 @@ public class TestRecordLevelIndexBackend {
     }
   }
 
-  @Test
-  public void testRegisterMetricsReplacesPreSeededMetrics() throws Exception {
-    try (RecordLevelIndexBackend backend = createBackend()) {
-      FlinkPartitionedIndexBackendMetrics preSeeded = backend.getMetrics();
-      backend.registerMetrics(new CapturingMetricGroup());
-
-      assertNotNull(backend.getMetrics());
-      // The second call must rewire the field so the active metrics publish 
to the real metric group.
-      assertNotSame(preSeeded, backend.getMetrics());
-    }
-  }
-
   private RecordLevelIndexBackend createBackend() {
     return new RecordLevelIndexBackend(conf, (partitionPath, recordKey, 
fileId) -> true);
   }

Reply via email to