hudi-agent commented on code in PR #18762:
URL: https://github.com/apache/hudi/pull/18762#discussion_r3260370283


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import com.codahale.metrics.SlidingWindowReservoir;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+/**
+ * Metrics for flink bucket assign functions (BucketAssignFunction, 
MinibatchBucketAssignFunction,
+ * DynamicBucketAssignFunction). Tracks index access latency and record 
buffering time.
+ */
+public class FlinkBucketAssignMetrics extends HoodieFlinkMetrics {
+
+  private static final String LOCAL_INDEX_LOOKUP_KEY = "local_index_lookup";
+  private static final String REMOTE_INDEX_LOOKUP_KEY = "remote_index_lookup";
+  private static final String RECORD_BUFFERING_KEY = "record_buffering";
+
+  /** Latency of the local (cache) phase of each index lookup, in 
milliseconds. */
+  private final Histogram localIndexLookupLatency;
+
+  /** Latency of the remote (metadata table) phase of each index lookup, in 
milliseconds. */
+  private final Histogram remoteIndexLookupLatency;
+
+  /** Number of keys resolved from the local cache per lookup. */
+  private final Histogram localLookupKeysNum;
+
+  /** Number of keys that missed the local cache and were fetched remotely per 
lookup. */
+  private final Histogram remoteLookupKeysNum;
+
+  /**
+   * Time records spend buffered before being processed, in milliseconds.
+   * Only populated by MinibatchBucketAssignFunction.
+   */
+  private final Histogram recordBufferingTime;
+
+  public FlinkBucketAssignMetrics(MetricGroup metricGroup) {
+    super(metricGroup);
+    this.localIndexLookupLatency = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));

Review Comment:
   🤖 nit: the window size `100` appears five times with no explanation — could 
you pull it out as a named constant (e.g. `HISTOGRAM_WINDOW_SIZE`)? Makes the 
intent clearer and keeps future tuning to one place.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java:
##########
@@ -120,10 +121,16 @@ public void processElement(HoodieFlinkInternalRow record, 
Context ctx, Collector
       // handle index records immediately, do not need buffering
       delegateFunction.processIndexRecord(record, record.getRecordKey());
     } else {
+      // Start buffering timer when first record enters an empty buffer
+      if (recordBuffer.isEmpty()) {
+        delegateFunction.getMetrics().startRecordBuffering();
+      }
       // Add data records to the buffer
       recordBuffer.add(record);
       // Process the buffer if it reaches the configured size
       if (recordBuffer.size() >= miniBatchSize) {
+        // Record how long the oldest record in the batch was buffered
+        delegateFunction.getMetrics().endRecordBuffering();

Review Comment:
   🤖 `endRecordBuffering()` is only called when the buffer fills to 
`miniBatchSize`, but `processBufferedRecords()` is also invoked from 
`prepareSnapshotPreBarrier()` (line 202) and `endInput()` (line 223) on partial 
buffers. In those paths the timer is never stopped, so the buffering metric is 
dropped for partial batches and the next `startRecordBuffering()` will hit the 
`Restarting timer for name: record_buffering, overriding the existing value` 
warning in `HoodieFlinkMetrics#startTimer` on every checkpoint. Could you move 
the `endRecordBuffering()` call into `processBufferedRecords()` (after the 
`recordBuffer.isEmpty()` guard) so all three flush paths are covered?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import com.codahale.metrics.SlidingWindowReservoir;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+/**
+ * Metrics for flink bucket assign functions (BucketAssignFunction, 
MinibatchBucketAssignFunction,
+ * DynamicBucketAssignFunction). Tracks index access latency and record 
buffering time.
+ */
+public class FlinkBucketAssignMetrics extends HoodieFlinkMetrics {
+
+  private static final String LOCAL_INDEX_LOOKUP_KEY = "local_index_lookup";
+  private static final String REMOTE_INDEX_LOOKUP_KEY = "remote_index_lookup";
+  private static final String RECORD_BUFFERING_KEY = "record_buffering";
+
+  /** Latency of the local (cache) phase of each index lookup, in 
milliseconds. */
+  private final Histogram localIndexLookupLatency;
+
+  /** Latency of the remote (metadata table) phase of each index lookup, in 
milliseconds. */
+  private final Histogram remoteIndexLookupLatency;
+
+  /** Number of keys resolved from the local cache per lookup. */
+  private final Histogram localLookupKeysNum;
+
+  /** Number of keys that missed the local cache and were fetched remotely per 
lookup. */
+  private final Histogram remoteLookupKeysNum;
+
+  /**
+   * Time records spend buffered before being processed, in milliseconds.
+   * Only populated by MinibatchBucketAssignFunction.
+   */
+  private final Histogram recordBufferingTime;
+
+  public FlinkBucketAssignMetrics(MetricGroup metricGroup) {
+    super(metricGroup);
+    this.localIndexLookupLatency = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+    this.remoteIndexLookupLatency = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+    this.localLookupKeysNum = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+    this.remoteLookupKeysNum = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+    this.recordBufferingTime = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+  }
+
+  @Override
+  public void registerMetrics() {
+    metricGroup.histogram("localIndexLookupLatency", localIndexLookupLatency);
+    metricGroup.histogram("remoteIndexLookupLatency", 
remoteIndexLookupLatency);
+    metricGroup.histogram("localLookupKeysNum", localLookupKeysNum);
+    metricGroup.histogram("remoteLookupKeysNum", remoteLookupKeysNum);
+    metricGroup.histogram("recordBufferingTime", recordBufferingTime);
+  }
+
+  public void startLocalIndexLookup() {
+    startTimer(LOCAL_INDEX_LOOKUP_KEY);
+  }
+
+  public void endLocalIndexLookup() {
+    localIndexLookupLatency.update(stopTimer(LOCAL_INDEX_LOOKUP_KEY));
+  }
+
+  public void updateLocalLookupKeysNum(long n) {
+    localLookupKeysNum.update(n);
+  }
+
+  public void startRemoteIndexLookup() {
+    startTimer(REMOTE_INDEX_LOOKUP_KEY);
+  }
+
+  public void endRemoteIndexLookup() {
+    remoteIndexLookupLatency.update(stopTimer(REMOTE_INDEX_LOOKUP_KEY));
+  }
+
+  public void updateRemoteLookupKeysNum(long n) {
+    remoteLookupKeysNum.update(n);
+  }
+
+  public void startRecordBuffering() {
+    startTimer(RECORD_BUFFERING_KEY);
+  }
+
+  public void endRecordBuffering() {
+    recordBufferingTime.update(stopTimer(RECORD_BUFFERING_KEY));
+  }
+
+  @VisibleForTesting
+  public long getLocalIndexLookupCount() {
+    return localIndexLookupLatency.getCount();
+  }
+
+  @VisibleForTesting
+  public long getRemoteIndexLookupCount() {
+    return remoteIndexLookupLatency.getCount();
+  }
+
+  @VisibleForTesting
+  public long getLocalLookupKeysNumCount() {

Review Comment:
   🤖 nit: `getLocalLookupKeysNumCount()` is easy to misread — 'Num' 
(keys-per-lookup distribution) plus 'Count' (histogram sample count) together 
read as 'total key count', which is not what this returns. Something like 
`getLocalLookupKeysSampleCount()` would be less ambiguous (same concern for 
`getRemoteLookupKeysNumCount()`).
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java:
##########
@@ -84,21 +90,37 @@ public void update(String recordKey, 
HoodieRecordGlobalLocation recordGlobalLoca
   public Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys) 
throws IOException {
     Map<String, HoodieRecordGlobalLocation> keysAndLocations = new HashMap<>();
     List<String> missedKeys = new ArrayList<>();
-    for (String key: recordKeys) {
+    if (metrics != null) {
+      metrics.startLocalIndexLookup();

Review Comment:
   🤖 nit: the four scattered `if (metrics != null)` guards fragment the lookup 
logic and make it easy to accidentally omit one when adding a new metric. Have 
you considered initializing `metrics` to a no-op instance so the guards can be 
dropped entirely?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to