hudi-agent commented on code in PR #18790:
URL: https://github.com/apache/hudi/pull/18790#discussion_r3277564312
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java:
##########
@@ -145,6 +145,26 @@ public void open(Configuration parameters) throws
Exception {
this.recordProcessor = initRecordProcessor();
this.metrics = new
FlinkBucketAssignMetrics(getRuntimeContext().getMetricGroup());
this.metrics.registerMetrics();
+ initRliShardAssignMetric();
+ }
+
+ /**
+ * Computes and registers the number of RLI shards assigned to this task
when global RLI is active.
Review Comment:
🤖 nit: "registers" is a bit misleading here — the gauge is already
registered in `registerMetrics()` which runs just before this call. Could you
change it to something like "Computes and sets the value of the
numShardsAssigned gauge" so a reader doesn't go hunting for a second
registration path?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java:
##########
@@ -47,6 +56,11 @@ public FlinkBucketAssignMetrics(MetricGroup metricGroup) {
@Override
public void registerMetrics() {
metricGroup.histogram("recordBufferingTime", recordBufferingTime);
+ metricGroup.gauge("numShardsAssigned", numShardsAssigned::get);
+ }
+
+ public void setNumShardsAssigned(int count) {
Review Comment:
🤖 nit: the parameter `count` is a bit generic given the method is
`setNumShardsAssigned` and the field is `numShardsAssigned` — could you rename
it to `numShards` to keep the naming consistent throughout?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java:
##########
@@ -145,6 +145,26 @@ public void open(Configuration parameters) throws
Exception {
this.recordProcessor = initRecordProcessor();
this.metrics = new
FlinkBucketAssignMetrics(getRuntimeContext().getMetricGroup());
this.metrics.registerMetrics();
+ initRliShardAssignMetric();
+ }
+
+ /**
+ * Computes and registers the number of RLI shards assigned to this task
when global RLI is active.
+ * Each task owns the file groups whose index satisfies {@code fgIndex %
numPartitions == taskIndex}.
+ */
+ private void initRliShardAssignMetric() {
+ if (!OptionsResolver.isGlobalRecordLevelIndex(conf)) {
Review Comment:
🤖 In `Pipelines.createBucketAssignStream` (around line 464),
`GlobalRecordIndexPartitioner` is only wired upstream when
`isGlobalRecordLevelIndex(conf) && !INDEX_BOOTSTRAP_ENABLED`; the
bootstrap-with-global-RLI case falls through to the `else` branch and uses
plain `BucketAssignFunction` with `keyBy(recordKey)` instead. Should this guard
also exclude `OptionsResolver.isRLIWithBootstrap(conf)` so the gauge only
reports a value when the partitioner is actually routing records?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]