This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new af38b880de5c feat(flink): add shard assign metrics for bucket assign
function (#18790)
af38b880de5c is described below
commit af38b880de5c0bf15c35639fc39a7a94613d99a6
Author: Peter Huang <[email protected]>
AuthorDate: Mon Jun 1 21:11:00 2026 -0700
feat(flink): add shard assign metrics for bucket assign function (#18790)
---
.../hudi/metrics/FlinkBucketAssignMetrics.java | 25 ++++++-
.../sink/partitioner/BucketAssignFunction.java | 22 ++++++
.../partitioner/GlobalRecordIndexPartitioner.java | 22 ++++--
.../hudi/metrics/TestFlinkBucketAssignMetrics.java | 40 +++++++++++
.../TestGlobalRecordIndexPartitioner.java | 80 +++++++++++++++++++++-
.../TestMinibatchBucketAssignFunction.java | 28 ++++++++
6 files changed, 210 insertions(+), 7 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
index 9c08f2b36a5d..97ea86fcbc41 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
@@ -25,9 +25,11 @@ import
org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Metrics for flink bucket assign functions (BucketAssignFunction,
MinibatchBucketAssignFunction,
- * DynamicBucketAssignFunction). Tracks record buffering time.
+ * DynamicBucketAssignFunction). Tracks record buffering time and RLI shard
assignment distribution.
*/
public class FlinkBucketAssignMetrics extends HoodieFlinkMetrics {
private static final int HISTOGRAM_WINDOW_SIZE = 100;
@@ -39,6 +41,13 @@ public class FlinkBucketAssignMetrics extends
HoodieFlinkMetrics {
*/
private final Histogram recordBufferingTime;
+ /**
+ * Number of RLI file group shards assigned to this bucket assign task.
+ * Set once during open() when global RLI is active; remains -1 otherwise.
+ * Compare across task subtasks to detect skew in shard distribution.
+ */
+ private final AtomicInteger numShardsAssigned = new AtomicInteger(-1);
+
public FlinkBucketAssignMetrics(MetricGroup metricGroup) {
super(metricGroup);
this.recordBufferingTime = new DropwizardHistogramWrapper(
@@ -48,6 +57,15 @@ public class FlinkBucketAssignMetrics extends
HoodieFlinkMetrics {
@Override
public void registerMetrics() {
metricGroup.histogram("recordBufferingTime", recordBufferingTime);
+ metricGroup.gauge("numShardsAssigned", numShardsAssigned::get);
+ }
+
+ /**
+ * Sets the number of RLI shards owned by this task. Call once during {@code
open()} when global
+ * RLI is active; the gauge value changes from the sentinel -1 to {@code
count}.
+ */
+ public void setNumShardsAssigned(int count) {
+ numShardsAssigned.set(count);
}
public void startRecordBuffering() {
@@ -62,4 +80,9 @@ public class FlinkBucketAssignMetrics extends
HoodieFlinkMetrics {
public long getRecordBufferingCount() {
return recordBufferingTime.getCount();
}
+
+ @VisibleForTesting
+ public int getNumShardsAssigned() {
+ return numShardsAssigned.get();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 6caacbbc3843..3ac55d4daa9b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -145,6 +145,28 @@ public class BucketAssignFunction
this.recordProcessor = initRecordProcessor();
this.metrics = new
FlinkBucketAssignMetrics(getRuntimeContext().getMetricGroup());
this.metrics.registerMetrics();
+ initRliShardAssignMetric();
+ }
+
+ /**
+ * Computes and registers the number of RLI shards assigned to this task
when global RLI is active.
+ * Each task owns the file groups whose index satisfies {@code fgIndex %
numPartitions == taskIndex}.
+ * No-ops when global RLI is not enabled or during index bootstrap (metadata
table file-group count
+ * may be unavailable at bootstrap time), leaving {@code numShardsAssigned}
at its sentinel -1.
+ */
+ private void initRliShardAssignMetric() {
+ if (!OptionsResolver.isGlobalRecordLevelIndex(conf) ||
conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+ return;
+ }
+ try {
+ int numFileGroups =
GlobalRecordIndexPartitioner.getNumFileGroupsForRecordIndexPartition(conf);
+ int taskIndex =
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
+ int numPartitions =
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+ this.metrics.setNumShardsAssigned(
+ GlobalRecordIndexPartitioner.computeNumShardsAssigned(taskIndex,
numPartitions, numFileGroups));
+ } catch (Exception e) {
+ log.warn("Failed to compute RLI shard assignment count for metrics", e);
+ }
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
index 8b6f1ae22ad9..9dca405f5550 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
@@ -67,7 +67,7 @@ public class GlobalRecordIndexPartitioner implements
Partitioner<HoodieKey> {
public int partition(HoodieKey recordKey, int numPartitions) {
// initialize numFileGroupsForRecordIndexPartition lazily.
if (numFileGroupsForRecordIndexPartition < 0) {
- numFileGroupsForRecordIndexPartition =
getNumFileGroupsForRecordIndexPartition();
+ numFileGroupsForRecordIndexPartition =
getNumFileGroupsForRecordIndexPartition(conf);
}
int fgIndex = HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(
recordKey.getRecordKey(), numFileGroupsForRecordIndexPartition);
@@ -75,15 +75,29 @@ public class GlobalRecordIndexPartitioner implements
Partitioner<HoodieKey> {
}
/**
- * Get the number of file groups for record index partition in metadata
table.
+ * Returns the number of RLI shards (file group indices in [0,
numFileGroups)) assigned to the given task.
+ *
+ * <p>The assignment follows the same modulo logic used in {@link
#partition}: shard {@code fgIndex}
+ * is owned by task {@code fgIndex % numPartitions}. The count is {@code
numFileGroups / numPartitions},
+ * plus one for tasks whose index is less than {@code numFileGroups %
numPartitions}.
+ */
+ public static int computeNumShardsAssigned(int taskIndex, int numPartitions,
int numFileGroups) {
+ int base = numFileGroups / numPartitions;
+ int remainder = numFileGroups % numPartitions;
+ return taskIndex < remainder ? base + 1 : base;
+ }
+
+ /**
+ * Reads the file group count for the record index partition from the
metadata table.
*/
- private int getNumFileGroupsForRecordIndexPartition() {
+ static int getNumFileGroupsForRecordIndexPartition(Configuration conf) {
+ String tablePath = conf.get(FlinkOptions.PATH);
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
try (HoodieTableMetadata metadataTable =
metaClient.getTableFormat().getMetadataFactory().create(
HoodieFlinkEngineContext.DEFAULT,
metaClient.getStorage(),
StreamerUtil.metadataConfig(conf),
- conf.get(FlinkOptions.PATH))) {
+ tablePath)) {
return
metadataTable.getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
} catch (Exception e) {
throw new HoodieException("Failed to get file group count for global
record index partition.", e);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
index e5d9fbd9f95b..c4df6d028f6f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
@@ -18,6 +18,7 @@
package org.apache.hudi.metrics;
+import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.junit.jupiter.api.BeforeEach;
@@ -116,8 +117,37 @@ class TestFlinkBucketAssignMetrics {
assertEquals(110, metrics.getRecordBufferingCount());
}
+ @Test
+ void testNumShardsAssignedGaugeDefaultIsNegativeOne() {
+ Gauge<?> gauge = metricGroup.getGauge("numShardsAssigned");
+ assertNotNull(gauge, "numShardsAssigned gauge should be registered");
+ assertEquals(-1, gauge.getValue(), "Default numShardsAssigned must be -1
(unset sentinel)");
+ }
+
+ @Test
+ void testSetNumShardsAssignedUpdatesGetterAndGauge() {
+ metrics.setNumShardsAssigned(7);
+ assertEquals(7, metrics.getNumShardsAssigned());
+ assertEquals(7, metricGroup.getGauge("numShardsAssigned").getValue());
+ }
+
+ @Test
+ void testSetNumShardsAssignedOverwrite() {
+ metrics.setNumShardsAssigned(3);
+ metrics.setNumShardsAssigned(5);
+ assertEquals(5, metrics.getNumShardsAssigned());
+ assertEquals(5, metricGroup.getGauge("numShardsAssigned").getValue());
+ }
+
+ @Test
+ void testGetNumShardsAssignedDefaultIsNegativeOne() {
+ assertEquals(-1, metrics.getNumShardsAssigned(),
+ "numShardsAssigned must return -1 before setNumShardsAssigned is
called");
+ }
+
private static class CapturingMetricGroup extends UnregisteredMetricsGroup {
private final Map<String, Histogram> histograms = new HashMap<>();
+ private final Map<String, Gauge<?>> gauges = new HashMap<>();
@Override
public <H extends Histogram> H histogram(String name, H histogram) {
@@ -125,8 +155,18 @@ class TestFlinkBucketAssignMetrics {
return histogram;
}
+ @Override
+ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
+ gauges.put(name, gauge);
+ return gauge;
+ }
+
Histogram getHistogram(String name) {
return histograms.get(name);
}
+
+ Gauge<?> getGauge(String name) {
+ return gauges.get(name);
+ }
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
index 32163baf38d2..809e4b55da35 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
@@ -39,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestGlobalRecordIndexPartitioner {
private static GlobalRecordIndexPartitioner partitioner;
+ private static Configuration conf;
@TempDir
static File tempFile;
@@ -46,7 +47,7 @@ public class TestGlobalRecordIndexPartitioner {
@BeforeAll
public static void beforeAll() throws Exception {
final String basePath = tempFile.getAbsolutePath();
- Configuration conf = TestConfigurations.getDefaultConf(basePath);
+ conf = TestConfigurations.getDefaultConf(basePath);
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
TestData.writeData(TestData.DATA_SET_INSERT, conf);
partitioner = new GlobalRecordIndexPartitioner(conf);
@@ -100,8 +101,83 @@ public class TestGlobalRecordIndexPartitioner {
void testLargeNumberOfPartitions() {
HoodieKey key = new HoodieKey("test_key_for_large_partition",
"partition_path");
int numPartitions = 100; // Large number of partitions
-
+
int partition = partitioner.partition(key, numPartitions);
assertTrue(partition >= 0 && partition < numPartitions);
}
+
+ @Test
+ void testComputeNumShardsAssignedEvenDistribution() {
+ // 10 shards, 5 tasks: each task gets exactly 2 shards
+ for (int taskIndex = 0; taskIndex < 5; taskIndex++) {
+ assertEquals(2,
GlobalRecordIndexPartitioner.computeNumShardsAssigned(taskIndex, 5, 10));
+ }
+ }
+
+ @Test
+ void testComputeNumShardsAssignedUnevenDistribution() {
+ // 11 shards, 4 tasks: tasks 0-2 get 3 shards, task 3 gets 2
+ assertEquals(3, GlobalRecordIndexPartitioner.computeNumShardsAssigned(0,
4, 11));
+ assertEquals(3, GlobalRecordIndexPartitioner.computeNumShardsAssigned(1,
4, 11));
+ assertEquals(3, GlobalRecordIndexPartitioner.computeNumShardsAssigned(2,
4, 11));
+ assertEquals(2, GlobalRecordIndexPartitioner.computeNumShardsAssigned(3,
4, 11));
+ }
+
+ @Test
+ void testComputeNumShardsAssignedTotalMatchesNumFileGroups() {
+ // The sum across all tasks must equal numFileGroups
+ int numFileGroups = 13;
+ int numPartitions = 5;
+ int total = 0;
+ for (int t = 0; t < numPartitions; t++) {
+ total += GlobalRecordIndexPartitioner.computeNumShardsAssigned(t,
numPartitions, numFileGroups);
+ }
+ assertEquals(numFileGroups, total);
+ }
+
+ @Test
+ void testComputeNumShardsAssignedConsistentWithPartitionMethod() {
+ // Verify that computeNumShardsAssigned matches the actual partition()
routing
+ int numPartitions = 3;
+ int numFileGroups = 8;
+ int[] shardCountPerTask = new int[numPartitions];
+ for (int fgIndex = 0; fgIndex < numFileGroups; fgIndex++) {
+ shardCountPerTask[fgIndex % numPartitions]++;
+ }
+ for (int taskIndex = 0; taskIndex < numPartitions; taskIndex++) {
+ assertEquals(shardCountPerTask[taskIndex],
+ GlobalRecordIndexPartitioner.computeNumShardsAssigned(taskIndex,
numPartitions, numFileGroups));
+ }
+ }
+
+ @Test
+ void testComputeNumShardsAssignedFewerFileGroupsThanPartitions() {
+ // 2 file groups, 5 partitions: tasks 0 and 1 own 1 shard each; tasks 2-4
own 0.
+ assertEquals(1, GlobalRecordIndexPartitioner.computeNumShardsAssigned(0,
5, 2));
+ assertEquals(1, GlobalRecordIndexPartitioner.computeNumShardsAssigned(1,
5, 2));
+ assertEquals(0, GlobalRecordIndexPartitioner.computeNumShardsAssigned(2,
5, 2));
+ assertEquals(0, GlobalRecordIndexPartitioner.computeNumShardsAssigned(3,
5, 2));
+ assertEquals(0, GlobalRecordIndexPartitioner.computeNumShardsAssigned(4,
5, 2));
+ }
+
+ @Test
+ void testComputeNumShardsAssignedZeroFileGroups() {
+ // 0 file groups: every task gets 0 shards.
+ for (int taskIndex = 0; taskIndex < 4; taskIndex++) {
+ assertEquals(0,
GlobalRecordIndexPartitioner.computeNumShardsAssigned(taskIndex, 4, 0));
+ }
+ }
+
+ @Test
+ void testFetchNumFileGroupsForRecordIndexPartitionReturnsPositiveValue() {
+ int count =
GlobalRecordIndexPartitioner.getNumFileGroupsForRecordIndexPartition(conf);
+ assertTrue(count > 0, "File group count must be positive for a table with
global RLI enabled");
+ }
+
+ @Test
+ void testFetchNumFileGroupsForRecordIndexPartitionIsConsistent() {
+ int first =
GlobalRecordIndexPartitioner.getNumFileGroupsForRecordIndexPartition(conf);
+ int second =
GlobalRecordIndexPartitioner.getNumFileGroupsForRecordIndexPartition(conf);
+ assertEquals(first, second, "Repeated calls must return the same result");
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
index 0676aa89222d..8b69180a33eb 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
@@ -368,4 +368,32 @@ public class TestMinibatchBucketAssignFunction {
assertEquals(1, function.getDelegateMetrics().getRecordBufferingCount(),
"One buffering cycle should be recorded after a checkpoint flush");
}
+
+ @Test
+ public void testNumShardsAssignedMetricIsSet() throws Exception {
+ // With global RLI enabled the numShardsAssigned gauge must be a
non-negative value after open().
+ // The test harness runs with parallelism 1, so this single task owns all
shards.
+ FlinkBucketAssignMetrics metrics = function.getDelegateMetrics();
+ assertTrue(metrics.getNumShardsAssigned() >= 0,
+ "numShardsAssigned must be set when global RLI is active");
+ }
+
+ @Test
+ public void testNumShardsAssignedIsNegativeOneWhenBootstrapEnabled() throws
Exception {
+ // During index bootstrap the metadata file-group count may be unavailable;
+ // numShardsAssigned must stay at its sentinel -1.
+ Configuration bootstrapConf = Configuration.fromMap(conf.toMap());
+ bootstrapConf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+
+ MinibatchBucketAssignFunction bootstrapFunction = new
MinibatchBucketAssignFunction(bootstrapConf);
+ OneInputStreamOperatorTestHarness<HoodieFlinkInternalRow,
HoodieFlinkInternalRow> bootstrapHarness =
+ new OneInputStreamOperatorTestHarness<>(new
MiniBatchBucketAssignOperator(bootstrapFunction, new OperatorID()), 1, 1, 0);
+ bootstrapHarness.open();
+ try {
+ assertEquals(-1,
bootstrapFunction.getDelegateMetrics().getNumShardsAssigned(),
+ "numShardsAssigned must remain -1 when INDEX_BOOTSTRAP_ENABLED is
true");
+ } finally {
+ bootstrapHarness.close();
+ }
+ }
}