This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new af38b880de5c feat(flink): add shard assign metrics for bucket assign 
function (#18790)
af38b880de5c is described below

commit af38b880de5c0bf15c35639fc39a7a94613d99a6
Author: Peter Huang <[email protected]>
AuthorDate: Mon Jun 1 21:11:00 2026 -0700

    feat(flink): add shard assign metrics for bucket assign function (#18790)
---
 .../hudi/metrics/FlinkBucketAssignMetrics.java     | 25 ++++++-
 .../sink/partitioner/BucketAssignFunction.java     | 22 ++++++
 .../partitioner/GlobalRecordIndexPartitioner.java  | 22 ++++--
 .../hudi/metrics/TestFlinkBucketAssignMetrics.java | 40 +++++++++++
 .../TestGlobalRecordIndexPartitioner.java          | 80 +++++++++++++++++++++-
 .../TestMinibatchBucketAssignFunction.java         | 28 ++++++++
 6 files changed, 210 insertions(+), 7 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
index 9c08f2b36a5d..97ea86fcbc41 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java
@@ -25,9 +25,11 @@ import 
org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.MetricGroup;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Metrics for flink bucket assign functions (BucketAssignFunction, 
MinibatchBucketAssignFunction,
- * DynamicBucketAssignFunction). Tracks record buffering time.
+ * DynamicBucketAssignFunction). Tracks record buffering time and RLI shard 
assignment distribution.
  */
 public class FlinkBucketAssignMetrics extends HoodieFlinkMetrics {
   private static final int HISTOGRAM_WINDOW_SIZE = 100;
@@ -39,6 +41,13 @@ public class FlinkBucketAssignMetrics extends 
HoodieFlinkMetrics {
    */
   private final Histogram recordBufferingTime;
 
+  /**
+   * Number of RLI file group shards assigned to this bucket assign task.
+   * Set once during open() when global RLI is active; remains -1 otherwise.
+   * Compare across task subtasks to detect skew in shard distribution.
+   */
+  private final AtomicInteger numShardsAssigned = new AtomicInteger(-1);
+
   public FlinkBucketAssignMetrics(MetricGroup metricGroup) {
     super(metricGroup);
     this.recordBufferingTime = new DropwizardHistogramWrapper(
@@ -48,6 +57,15 @@ public class FlinkBucketAssignMetrics extends 
HoodieFlinkMetrics {
   @Override
   public void registerMetrics() {
     metricGroup.histogram("recordBufferingTime", recordBufferingTime);
+    metricGroup.gauge("numShardsAssigned", numShardsAssigned::get);
+  }
+
+  /**
+   * Sets the number of RLI shards owned by this task. Call once during {@code 
open()} when global
+   * RLI is active; the gauge value changes from the sentinel -1 to {@code 
count}.
+   */
+  public void setNumShardsAssigned(int count) {
+    numShardsAssigned.set(count);
   }
 
   public void startRecordBuffering() {
@@ -62,4 +80,9 @@ public class FlinkBucketAssignMetrics extends 
HoodieFlinkMetrics {
   public long getRecordBufferingCount() {
     return recordBufferingTime.getCount();
   }
+
+  @VisibleForTesting
+  public int getNumShardsAssigned() {
+    return numShardsAssigned.get();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 6caacbbc3843..3ac55d4daa9b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -145,6 +145,28 @@ public class BucketAssignFunction
     this.recordProcessor = initRecordProcessor();
     this.metrics = new 
FlinkBucketAssignMetrics(getRuntimeContext().getMetricGroup());
     this.metrics.registerMetrics();
+    initRliShardAssignMetric();
+  }
+
+  /**
+   * Computes and registers the number of RLI shards assigned to this task 
when global RLI is active.
+   * Each task owns the file groups whose index satisfies {@code fgIndex % 
numPartitions == taskIndex}.
+   * No-ops when global RLI is not enabled or during index bootstrap (metadata 
table file-group count
+   * may be unavailable at bootstrap time), leaving {@code numShardsAssigned} 
at its sentinel -1.
+   */
+  private void initRliShardAssignMetric() {
+    if (!OptionsResolver.isGlobalRecordLevelIndex(conf) || 
conf.get(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
+      return;
+    }
+    try {
+      int numFileGroups = 
GlobalRecordIndexPartitioner.getNumFileGroupsForRecordIndexPartition(conf);
+      int taskIndex = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
+      int numPartitions = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
+      this.metrics.setNumShardsAssigned(
+          GlobalRecordIndexPartitioner.computeNumShardsAssigned(taskIndex, 
numPartitions, numFileGroups));
+    } catch (Exception e) {
+      log.warn("Failed to compute RLI shard assignment count for metrics", e);
+    }
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
index 8b6f1ae22ad9..9dca405f5550 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/GlobalRecordIndexPartitioner.java
@@ -67,7 +67,7 @@ public class GlobalRecordIndexPartitioner implements 
Partitioner<HoodieKey> {
   public int partition(HoodieKey recordKey, int numPartitions) {
     // initialize numFileGroupsForRecordIndexPartition lazily.
     if (numFileGroupsForRecordIndexPartition < 0) {
-      numFileGroupsForRecordIndexPartition = 
getNumFileGroupsForRecordIndexPartition();
+      numFileGroupsForRecordIndexPartition = 
getNumFileGroupsForRecordIndexPartition(conf);
     }
     int fgIndex = HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(
         recordKey.getRecordKey(), numFileGroupsForRecordIndexPartition);
@@ -75,15 +75,29 @@ public class GlobalRecordIndexPartitioner implements 
Partitioner<HoodieKey> {
   }
 
   /**
-   * Get the number of file groups for record index partition in metadata 
table.
+   * Returns the number of RLI shards (file group indices in [0, 
numFileGroups)) assigned to the given task.
+   *
+   * <p>The assignment follows the same modulo logic used in {@link 
#partition}: shard {@code fgIndex}
+   * is owned by task {@code fgIndex % numPartitions}. The count is {@code 
numFileGroups / numPartitions},
+   * plus one for tasks whose index is less than {@code numFileGroups % 
numPartitions}.
+   */
+  public static int computeNumShardsAssigned(int taskIndex, int numPartitions, 
int numFileGroups) {
+    int base = numFileGroups / numPartitions;
+    int remainder = numFileGroups % numPartitions;
+    return taskIndex < remainder ? base + 1 : base;
+  }
+
+  /**
+   * Reads the file group count for the record index partition from the 
metadata table.
    */
-  private int getNumFileGroupsForRecordIndexPartition() {
+  static int getNumFileGroupsForRecordIndexPartition(Configuration conf) {
+    String tablePath = conf.get(FlinkOptions.PATH);
     HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
     try (HoodieTableMetadata metadataTable = 
metaClient.getTableFormat().getMetadataFactory().create(
         HoodieFlinkEngineContext.DEFAULT,
         metaClient.getStorage(),
         StreamerUtil.metadataConfig(conf),
-        conf.get(FlinkOptions.PATH))) {
+        tablePath)) {
       return 
metadataTable.getNumFileGroupsForPartition(MetadataPartitionType.RECORD_INDEX);
     } catch (Exception e) {
       throw new HoodieException("Failed to get file group count for global 
record index partition.", e);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
index e5d9fbd9f95b..c4df6d028f6f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/metrics/TestFlinkBucketAssignMetrics.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.metrics;
 
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.junit.jupiter.api.BeforeEach;
@@ -116,8 +117,37 @@ class TestFlinkBucketAssignMetrics {
     assertEquals(110, metrics.getRecordBufferingCount());
   }
 
+  @Test
+  void testNumShardsAssignedGaugeDefaultIsNegativeOne() {
+    Gauge<?> gauge = metricGroup.getGauge("numShardsAssigned");
+    assertNotNull(gauge, "numShardsAssigned gauge should be registered");
+    assertEquals(-1, gauge.getValue(), "Default numShardsAssigned must be -1 
(unset sentinel)");
+  }
+
+  @Test
+  void testSetNumShardsAssignedUpdatesGetterAndGauge() {
+    metrics.setNumShardsAssigned(7);
+    assertEquals(7, metrics.getNumShardsAssigned());
+    assertEquals(7, metricGroup.getGauge("numShardsAssigned").getValue());
+  }
+
+  @Test
+  void testSetNumShardsAssignedOverwrite() {
+    metrics.setNumShardsAssigned(3);
+    metrics.setNumShardsAssigned(5);
+    assertEquals(5, metrics.getNumShardsAssigned());
+    assertEquals(5, metricGroup.getGauge("numShardsAssigned").getValue());
+  }
+
+  @Test
+  void testGetNumShardsAssignedDefaultIsNegativeOne() {
+    assertEquals(-1, metrics.getNumShardsAssigned(),
+        "numShardsAssigned must return -1 before setNumShardsAssigned is 
called");
+  }
+
   private static class CapturingMetricGroup extends UnregisteredMetricsGroup {
     private final Map<String, Histogram> histograms = new HashMap<>();
+    private final Map<String, Gauge<?>> gauges = new HashMap<>();
 
     @Override
     public <H extends Histogram> H histogram(String name, H histogram) {
@@ -125,8 +155,18 @@ class TestFlinkBucketAssignMetrics {
       return histogram;
     }
 
+    @Override
+    public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
+      gauges.put(name, gauge);
+      return gauge;
+    }
+
     Histogram getHistogram(String name) {
       return histograms.get(name);
     }
+
+    Gauge<?> getGauge(String name) {
+      return gauges.get(name);
+    }
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
index 32163baf38d2..809e4b55da35 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestGlobalRecordIndexPartitioner.java
@@ -39,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class TestGlobalRecordIndexPartitioner {
 
   private static GlobalRecordIndexPartitioner partitioner;
+  private static Configuration conf;
 
   @TempDir
   static File tempFile;
@@ -46,7 +47,7 @@ public class TestGlobalRecordIndexPartitioner {
   @BeforeAll
   public static void beforeAll() throws Exception {
     final String basePath = tempFile.getAbsolutePath();
-    Configuration conf = TestConfigurations.getDefaultConf(basePath);
+    conf = TestConfigurations.getDefaultConf(basePath);
     
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
     partitioner = new GlobalRecordIndexPartitioner(conf);
@@ -100,8 +101,83 @@ public class TestGlobalRecordIndexPartitioner {
   void testLargeNumberOfPartitions() {
     HoodieKey key = new HoodieKey("test_key_for_large_partition", 
"partition_path");
     int numPartitions = 100; // Large number of partitions
-    
+
     int partition = partitioner.partition(key, numPartitions);
     assertTrue(partition >= 0 && partition < numPartitions);
   }
+
+  @Test
+  void testComputeNumShardsAssignedEvenDistribution() {
+    // 10 shards, 5 tasks: each task gets exactly 2 shards
+    for (int taskIndex = 0; taskIndex < 5; taskIndex++) {
+      assertEquals(2, 
GlobalRecordIndexPartitioner.computeNumShardsAssigned(taskIndex, 5, 10));
+    }
+  }
+
+  @Test
+  void testComputeNumShardsAssignedUnevenDistribution() {
+    // 11 shards, 4 tasks: tasks 0-2 get 3 shards, task 3 gets 2
+    assertEquals(3, GlobalRecordIndexPartitioner.computeNumShardsAssigned(0, 
4, 11));
+    assertEquals(3, GlobalRecordIndexPartitioner.computeNumShardsAssigned(1, 
4, 11));
+    assertEquals(3, GlobalRecordIndexPartitioner.computeNumShardsAssigned(2, 
4, 11));
+    assertEquals(2, GlobalRecordIndexPartitioner.computeNumShardsAssigned(3, 
4, 11));
+  }
+
+  @Test
+  void testComputeNumShardsAssignedTotalMatchesNumFileGroups() {
+    // The sum across all tasks must equal numFileGroups
+    int numFileGroups = 13;
+    int numPartitions = 5;
+    int total = 0;
+    for (int t = 0; t < numPartitions; t++) {
+      total += GlobalRecordIndexPartitioner.computeNumShardsAssigned(t, 
numPartitions, numFileGroups);
+    }
+    assertEquals(numFileGroups, total);
+  }
+
+  @Test
+  void testComputeNumShardsAssignedConsistentWithPartitionMethod() {
+    // Verify that computeNumShardsAssigned matches the actual partition() 
routing
+    int numPartitions = 3;
+    int numFileGroups = 8;
+    int[] shardCountPerTask = new int[numPartitions];
+    for (int fgIndex = 0; fgIndex < numFileGroups; fgIndex++) {
+      shardCountPerTask[fgIndex % numPartitions]++;
+    }
+    for (int taskIndex = 0; taskIndex < numPartitions; taskIndex++) {
+      assertEquals(shardCountPerTask[taskIndex],
+          GlobalRecordIndexPartitioner.computeNumShardsAssigned(taskIndex, 
numPartitions, numFileGroups));
+    }
+  }
+
+  @Test
+  void testComputeNumShardsAssignedFewerFileGroupsThanPartitions() {
+    // 2 file groups, 5 partitions: tasks 0 and 1 own 1 shard each; tasks 2-4 
own 0.
+    assertEquals(1, GlobalRecordIndexPartitioner.computeNumShardsAssigned(0, 
5, 2));
+    assertEquals(1, GlobalRecordIndexPartitioner.computeNumShardsAssigned(1, 
5, 2));
+    assertEquals(0, GlobalRecordIndexPartitioner.computeNumShardsAssigned(2, 
5, 2));
+    assertEquals(0, GlobalRecordIndexPartitioner.computeNumShardsAssigned(3, 
5, 2));
+    assertEquals(0, GlobalRecordIndexPartitioner.computeNumShardsAssigned(4, 
5, 2));
+  }
+
+  @Test
+  void testComputeNumShardsAssignedZeroFileGroups() {
+    // 0 file groups: every task gets 0 shards.
+    for (int taskIndex = 0; taskIndex < 4; taskIndex++) {
+      assertEquals(0, 
GlobalRecordIndexPartitioner.computeNumShardsAssigned(taskIndex, 4, 0));
+    }
+  }
+
+  @Test
+  void testFetchNumFileGroupsForRecordIndexPartitionReturnsPositiveValue() {
+    int count = 
GlobalRecordIndexPartitioner.getNumFileGroupsForRecordIndexPartition(conf);
+    assertTrue(count > 0, "File group count must be positive for a table with 
global RLI enabled");
+  }
+
+  @Test
+  void testFetchNumFileGroupsForRecordIndexPartitionIsConsistent() {
+    int first = 
GlobalRecordIndexPartitioner.getNumFileGroupsForRecordIndexPartition(conf);
+    int second = 
GlobalRecordIndexPartitioner.getNumFileGroupsForRecordIndexPartition(conf);
+    assertEquals(first, second, "Repeated calls must return the same result");
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
index 0676aa89222d..8b69180a33eb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestMinibatchBucketAssignFunction.java
@@ -368,4 +368,32 @@ public class TestMinibatchBucketAssignFunction {
     assertEquals(1, function.getDelegateMetrics().getRecordBufferingCount(),
         "One buffering cycle should be recorded after a checkpoint flush");
   }
+
+  @Test
+  public void testNumShardsAssignedMetricIsSet() throws Exception {
+    // With global RLI enabled the numShardsAssigned gauge must be a 
non-negative value after open().
+    // The test harness runs with parallelism 1, so this single task owns all 
shards.
+    FlinkBucketAssignMetrics metrics = function.getDelegateMetrics();
+    assertTrue(metrics.getNumShardsAssigned() >= 0,
+            "numShardsAssigned must be set when global RLI is active");
+  }
+
+  @Test
+  public void testNumShardsAssignedIsNegativeOneWhenBootstrapEnabled() throws 
Exception {
+    // During index bootstrap the metadata file-group count may be unavailable;
+    // numShardsAssigned must stay at its sentinel -1.
+    Configuration bootstrapConf = Configuration.fromMap(conf.toMap());
+    bootstrapConf.set(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, true);
+
+    MinibatchBucketAssignFunction bootstrapFunction = new 
MinibatchBucketAssignFunction(bootstrapConf);
+    OneInputStreamOperatorTestHarness<HoodieFlinkInternalRow, 
HoodieFlinkInternalRow> bootstrapHarness =
+        new OneInputStreamOperatorTestHarness<>(new 
MiniBatchBucketAssignOperator(bootstrapFunction, new OperatorID()), 1, 1, 0);
+    bootstrapHarness.open();
+    try {
+      assertEquals(-1, 
bootstrapFunction.getDelegateMetrics().getNumShardsAssigned(),
+          "numShardsAssigned must remain -1 when INDEX_BOOTSTRAP_ENABLED is 
true");
+    } finally {
+      bootstrapHarness.close();
+    }
+  }
 }

Reply via email to