hudi-agent commented on code in PR #18762:
URL: https://github.com/apache/hudi/pull/18762#discussion_r3262672288


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java:
##########
@@ -84,21 +90,37 @@ public void update(String recordKey, 
HoodieRecordGlobalLocation recordGlobalLoca
   public Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys) 
throws IOException {
     Map<String, HoodieRecordGlobalLocation> keysAndLocations = new HashMap<>();
     List<String> missedKeys = new ArrayList<>();
-    for (String key: recordKeys) {
+    if (metrics != null) {

Review Comment:
   🤖 nit: there are four `if (metrics != null)` guards woven through this 
method, which makes the core cache-then-remote-fetch logic harder to follow. 
Have you considered initializing `metrics` to a lightweight no-op stub instead 
of `null`, so the guards disappear entirely and the happy-path logic is 
uninterrupted?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import com.codahale.metrics.SlidingWindowReservoir;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+/**
+ * Metrics for flink bucket assign functions (BucketAssignFunction, 
MinibatchBucketAssignFunction,
+ * DynamicBucketAssignFunction). Tracks index access latency and record 
buffering time.
+ */
+public class FlinkBucketAssignMetrics extends HoodieFlinkMetrics {
+
+  private static final String LOCAL_INDEX_LOOKUP_KEY = "local_index_lookup";
+  private static final String REMOTE_INDEX_LOOKUP_KEY = "remote_index_lookup";
+  private static final String RECORD_BUFFERING_KEY = "record_buffering";
+
+  /** Latency of the local (cache) phase of each index lookup, in 
milliseconds. */
+  private final Histogram localIndexLookupLatency;
+
+  /** Latency of the remote (metadata table) phase of each index lookup, in 
milliseconds. */
+  private final Histogram remoteIndexLookupLatency;
+
+  /** Number of keys resolved from the local cache per lookup. */
+  private final Histogram localLookupKeysNum;
+
+  /** Number of keys that missed the local cache and were fetched remotely per 
lookup. */
+  private final Histogram remoteLookupKeysNum;
+
+  /**
+   * Time records spend buffered before being processed, in milliseconds.
+   * Only populated by MinibatchBucketAssignFunction.
+   */
+  private final Histogram recordBufferingTime;
+
+  public FlinkBucketAssignMetrics(MetricGroup metricGroup) {
+    super(metricGroup);
+    this.localIndexLookupLatency = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+    this.remoteIndexLookupLatency = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+    this.localLookupKeysNum = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+    this.remoteLookupKeysNum = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+    this.recordBufferingTime = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+  }
+
+  @Override
+  public void registerMetrics() {
+    metricGroup.histogram("localIndexLookupLatency", localIndexLookupLatency);
+    metricGroup.histogram("remoteIndexLookupLatency", 
remoteIndexLookupLatency);
+    metricGroup.histogram("localLookupKeysNum", localLookupKeysNum);
+    metricGroup.histogram("remoteLookupKeysNum", remoteLookupKeysNum);
+    metricGroup.histogram("recordBufferingTime", recordBufferingTime);
+  }
+
+  public void startLocalIndexLookup() {
+    startTimer(LOCAL_INDEX_LOOKUP_KEY);
+  }
+
+  public void endLocalIndexLookup() {
+    localIndexLookupLatency.update(stopTimer(LOCAL_INDEX_LOOKUP_KEY));
+  }
+
+  public void updateLocalLookupKeysNum(long n) {
+    localLookupKeysNum.update(n);

Review Comment:
   🤖 nit: `n` is pretty terse here (and again in `updateRemoteLookupKeysNum`). 
`count` or `keyCount` would make both call-sites and usages inside the method 
read more clearly.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import com.codahale.metrics.SlidingWindowReservoir;
+import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+/**
+ * Metrics for flink bucket assign functions (BucketAssignFunction, 
MinibatchBucketAssignFunction,
+ * DynamicBucketAssignFunction). Tracks index access latency and record 
buffering time.
+ */
+public class FlinkBucketAssignMetrics extends HoodieFlinkMetrics {
+
+  private static final String LOCAL_INDEX_LOOKUP_KEY = "local_index_lookup";
+  private static final String REMOTE_INDEX_LOOKUP_KEY = "remote_index_lookup";
+  private static final String RECORD_BUFFERING_KEY = "record_buffering";
+
+  /** Latency of the local (cache) phase of each index lookup, in 
milliseconds. */
+  private final Histogram localIndexLookupLatency;
+
+  /** Latency of the remote (metadata table) phase of each index lookup, in 
milliseconds. */
+  private final Histogram remoteIndexLookupLatency;
+
+  /** Number of keys resolved from the local cache per lookup. */
+  private final Histogram localLookupKeysNum;
+
+  /** Number of keys that missed the local cache and were fetched remotely per 
lookup. */
+  private final Histogram remoteLookupKeysNum;
+
+  /**
+   * Time records spend buffered before being processed, in milliseconds.
+   * Only populated by MinibatchBucketAssignFunction.
+   */
+  private final Histogram recordBufferingTime;
+
+  public FlinkBucketAssignMetrics(MetricGroup metricGroup) {
+    super(metricGroup);
+    this.localIndexLookupLatency = new DropwizardHistogramWrapper(
+        new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100)));
+    this.remoteIndexLookupLatency = new DropwizardHistogramWrapper(

Review Comment:
   🤖 nit: the `100` in `SlidingWindowReservoir(100)` is repeated five times 
with no explanation of why 100. Could you extract it as a named constant like 
`private static final int RESERVOIR_SIZE = 100;` so it's easy to change and 
readers know it's intentional?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to