This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fc85e3ea985c fix(flink): remove pre-seed in metrics in
GlobalRecordLevelIndexBackend and RecordLevelIndexBackend (#18875)
fc85e3ea985c is described below
commit fc85e3ea985c899ab7120e21c3652c1cec816c13
Author: Shihuan Liu <[email protected]>
AuthorDate: Thu May 28 19:06:52 2026 -0700
fix(flink): remove pre-seed in metrics in GlobalRecordLevelIndexBackend and
RecordLevelIndexBackend (#18875)
---
.../index/GlobalRecordLevelIndexBackend.java | 2 --
.../partitioner/index/RecordLevelIndexBackend.java | 6 ------
.../index/TestGlobalRecordLevelIndexBackend.java | 13 +-----------
.../index/TestRecordLevelIndexBackend.java | 23 ----------------------
4 files changed, 1 insertion(+), 43 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
index 1eeef79af9b2..4dea5fd49ba7 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java
@@ -18,7 +18,6 @@
package org.apache.hudi.sink.partitioner.index;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.data.HoodiePairData;
@@ -71,7 +70,6 @@ public class GlobalRecordLevelIndexBackend implements
MinibatchIndexBackend {
this.metaClient = StreamerUtil.createMetaClient(conf);
this.conf = conf;
this.recordIndexCache = new RecordIndexCache(conf, initCheckpointId);
- registerMetrics(new UnregisteredMetricsGroup());
reloadMetadataTable();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
index d1a5f0b05c7c..8128c625158d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RecordLevelIndexBackend.java
@@ -41,12 +41,10 @@ import org.apache.hudi.sink.utils.SamplingActionExecutor;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
-import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import java.io.Closeable;
import java.io.IOException;
@@ -78,7 +76,6 @@ public class RecordLevelIndexBackend implements
PartitionedIndexBackend {
private final long maxCacheSizeInBytes;
private final BootstrapFilter bootstrapFilter;
private HoodieTableMetadata metadataTable;
- @Getter(AccessLevel.PACKAGE)
private FlinkPartitionedIndexBackendMetrics metrics;
@Getter
@@ -102,9 +99,6 @@ public class RecordLevelIndexBackend implements
PartitionedIndexBackend {
this.metaClient = StreamerUtil.createMetaClient(conf);
this.maxCacheSizeInBytes = conf.get(FlinkOptions.INDEX_RLI_CACHE_SIZE) *
1024 * 1024;
this.bootstrapFilter = bootstrapFilter;
- // Pre-seed metrics with an unregistered group so the backend is safe to
use before the
- // bucket assign operator wires the real metric group via
registerMetrics(MetricGroup).
- registerMetrics(new UnregisteredMetricsGroup());
reloadMetadataTable();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
index edfad3481560..9e4d8f9eb0ec 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestGlobalRecordLevelIndexBackend.java
@@ -138,6 +138,7 @@ public class TestGlobalRecordLevelIndexBackend {
conf.set(FlinkOptions.INDEX_RLI_CACHE_SIZE, 1L);
try (GlobalRecordLevelIndexBackend globalRecordLevelIndexBackend = new
GlobalRecordLevelIndexBackend(conf, -1)) {
+
globalRecordLevelIndexBackend.registerMetrics(TaskManagerMetricGroup.createTaskManagerMetricGroup(registry,
"localhost", ResourceID.generate()));
for (int i = 0; i < 1500; i++) {
globalRecordLevelIndexBackend.update("id1_" + i,
@@ -203,18 +204,6 @@ public class TestGlobalRecordLevelIndexBackend {
}
}
- @Test
- void testLookupWithoutMetricsRegistrationIsNullSafe() throws Exception {
- // Verifies that the if (metrics != null) guards in get(List) don't throw
even when
- // registerMetrics was never called.
- try (GlobalRecordLevelIndexBackend backend = new
GlobalRecordLevelIndexBackend(conf, -1)) {
- HoodieRecordGlobalLocation location = new
HoodieRecordGlobalLocation("par1", "000000001", "file-id-1");
- backend.update("null_metrics_key", location);
- Map<String, HoodieRecordGlobalLocation> result =
backend.get(Collections.singletonList("null_metrics_key"));
- assertEquals(location, result.get("null_metrics_key"));
- }
- }
-
@Test
void testRegisterMetricsIsIdempotent() throws Exception {
// The second registerMetrics call must be a no-op and must not throw.
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
index f3431a4d1acd..cea4db07294a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/index/TestRecordLevelIndexBackend.java
@@ -20,7 +20,6 @@ package org.apache.hudi.sink.partitioner.index;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.metrics.FlinkPartitionedIndexBackendMetrics;
import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -41,7 +40,6 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@@ -157,15 +155,6 @@ public class TestRecordLevelIndexBackend {
}
}
- @Test
- public void testConstructorPreSeedsMetricsForNullSafety() throws Exception {
- // Without pre-seeding, bootstrapPartition would NPE before the bucket
assign operator
- // wires the real MetricGroup via registerMetrics(MetricGroup).
- try (RecordLevelIndexBackend backend = createBackend()) {
- assertNotNull(backend.getMetrics());
- }
- }
-
@Test
public void testRegisterMetricsRegistersPartitionBootstrapHistograms()
throws Exception {
CapturingMetricGroup metricGroup = new CapturingMetricGroup();
@@ -177,18 +166,6 @@ public class TestRecordLevelIndexBackend {
}
}
- @Test
- public void testRegisterMetricsReplacesPreSeededMetrics() throws Exception {
- try (RecordLevelIndexBackend backend = createBackend()) {
- FlinkPartitionedIndexBackendMetrics preSeeded = backend.getMetrics();
- backend.registerMetrics(new CapturingMetricGroup());
-
- assertNotNull(backend.getMetrics());
- // The second call must rewire the field so the active metrics publish
to the real metric group.
- assertNotSame(preSeeded, backend.getMetrics());
- }
- }
-
private RecordLevelIndexBackend createBackend() {
return new RecordLevelIndexBackend(conf, (partitionPath, recordKey,
fileId) -> true);
}