hudi-agent commented on code in PR #18762: URL: https://github.com/apache/hudi/pull/18762#discussion_r3260370283
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics; + +import com.codahale.metrics.SlidingWindowReservoir; +import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; +import org.apache.hudi.common.util.VisibleForTesting; + +/** + * Metrics for flink bucket assign functions (BucketAssignFunction, MinibatchBucketAssignFunction, + * DynamicBucketAssignFunction). Tracks index access latency and record buffering time. + */ +public class FlinkBucketAssignMetrics extends HoodieFlinkMetrics { + + private static final String LOCAL_INDEX_LOOKUP_KEY = "local_index_lookup"; + private static final String REMOTE_INDEX_LOOKUP_KEY = "remote_index_lookup"; + private static final String RECORD_BUFFERING_KEY = "record_buffering"; + + /** Latency of the local (cache) phase of each index lookup, in milliseconds. */ + private final Histogram localIndexLookupLatency; + + /** Latency of the remote (metadata table) phase of each index lookup, in milliseconds. */ + private final Histogram remoteIndexLookupLatency; + + /** Number of keys resolved from the local cache per lookup. */ + private final Histogram localLookupKeysNum; + + /** Number of keys that missed the local cache and were fetched remotely per lookup. */ + private final Histogram remoteLookupKeysNum; + + /** + * Time records spend buffered before being processed, in milliseconds. + * Only populated by MinibatchBucketAssignFunction. + */ + private final Histogram recordBufferingTime; + + public FlinkBucketAssignMetrics(MetricGroup metricGroup) { + super(metricGroup); + this.localIndexLookupLatency = new DropwizardHistogramWrapper( + new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100))); Review Comment: 🤖 nit: the window size `100` appears five times with no explanation — could you pull it out as a named constant (e.g. `HISTOGRAM_WINDOW_SIZE`)? Makes the intent clearer and keeps future tuning to one place. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/MinibatchBucketAssignFunction.java: ########## @@ -120,10 +121,16 @@ public void processElement(HoodieFlinkInternalRow record, Context ctx, Collector // handle index records immediately, do not need buffering delegateFunction.processIndexRecord(record, record.getRecordKey()); } else { + // Start buffering timer when first record enters an empty buffer + if (recordBuffer.isEmpty()) { + delegateFunction.getMetrics().startRecordBuffering(); + } // Add data records to the buffer recordBuffer.add(record); // Process the buffer if it reaches the configured size if (recordBuffer.size() >= miniBatchSize) { + // Record how long the oldest record in the batch was buffered + delegateFunction.getMetrics().endRecordBuffering(); Review Comment: 🤖 `endRecordBuffering()` is only called when the buffer fills to `miniBatchSize`, but `processBufferedRecords()` is also invoked from `prepareSnapshotPreBarrier()` (line 202) and `endInput()` (line 223) on partial buffers. In those paths the timer is never stopped, so the buffering metric is dropped for partial batches and the next `startRecordBuffering()` will hit the `Restarting timer for name: record_buffering, overriding the existing value` warning in `HoodieFlinkMetrics#startTimer` on every checkpoint. Could you move the `endRecordBuffering()` call into `processBufferedRecords()` (after the `recordBuffer.isEmpty()` guard) so all three flush paths are covered? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkBucketAssignMetrics.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics; + +import com.codahale.metrics.SlidingWindowReservoir; +import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; +import org.apache.hudi.common.util.VisibleForTesting; + +/** + * Metrics for flink bucket assign functions (BucketAssignFunction, MinibatchBucketAssignFunction, + * DynamicBucketAssignFunction). Tracks index access latency and record buffering time. + */ +public class FlinkBucketAssignMetrics extends HoodieFlinkMetrics { + + private static final String LOCAL_INDEX_LOOKUP_KEY = "local_index_lookup"; + private static final String REMOTE_INDEX_LOOKUP_KEY = "remote_index_lookup"; + private static final String RECORD_BUFFERING_KEY = "record_buffering"; + + /** Latency of the local (cache) phase of each index lookup, in milliseconds. */ + private final Histogram localIndexLookupLatency; + + /** Latency of the remote (metadata table) phase of each index lookup, in milliseconds. */ + private final Histogram remoteIndexLookupLatency; + + /** Number of keys resolved from the local cache per lookup. */ + private final Histogram localLookupKeysNum; + + /** Number of keys that missed the local cache and were fetched remotely per lookup. */ + private final Histogram remoteLookupKeysNum; + + /** + * Time records spend buffered before being processed, in milliseconds. + * Only populated by MinibatchBucketAssignFunction. + */ + private final Histogram recordBufferingTime; + + public FlinkBucketAssignMetrics(MetricGroup metricGroup) { + super(metricGroup); + this.localIndexLookupLatency = new DropwizardHistogramWrapper( + new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100))); + this.remoteIndexLookupLatency = new DropwizardHistogramWrapper( + new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100))); + this.localLookupKeysNum = new DropwizardHistogramWrapper( + new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100))); + this.remoteLookupKeysNum = new DropwizardHistogramWrapper( + new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100))); + this.recordBufferingTime = new DropwizardHistogramWrapper( + new com.codahale.metrics.Histogram(new SlidingWindowReservoir(100))); + } + + @Override + public void registerMetrics() { + metricGroup.histogram("localIndexLookupLatency", localIndexLookupLatency); + metricGroup.histogram("remoteIndexLookupLatency", remoteIndexLookupLatency); + metricGroup.histogram("localLookupKeysNum", localLookupKeysNum); + metricGroup.histogram("remoteLookupKeysNum", remoteLookupKeysNum); + metricGroup.histogram("recordBufferingTime", recordBufferingTime); + } + + public void startLocalIndexLookup() { + startTimer(LOCAL_INDEX_LOOKUP_KEY); + } + + public void endLocalIndexLookup() { + localIndexLookupLatency.update(stopTimer(LOCAL_INDEX_LOOKUP_KEY)); + } + + public void updateLocalLookupKeysNum(long n) { + localLookupKeysNum.update(n); + } + + public void startRemoteIndexLookup() { + startTimer(REMOTE_INDEX_LOOKUP_KEY); + } + + public void endRemoteIndexLookup() { + remoteIndexLookupLatency.update(stopTimer(REMOTE_INDEX_LOOKUP_KEY)); + } + + public void updateRemoteLookupKeysNum(long n) { + remoteLookupKeysNum.update(n); + } + + public void startRecordBuffering() { + startTimer(RECORD_BUFFERING_KEY); + } + + public void endRecordBuffering() { + recordBufferingTime.update(stopTimer(RECORD_BUFFERING_KEY)); + } + + @VisibleForTesting + public long getLocalIndexLookupCount() { + return localIndexLookupLatency.getCount(); + } + + @VisibleForTesting + public long getRemoteIndexLookupCount() { + return remoteIndexLookupLatency.getCount(); + } + + @VisibleForTesting + public long getLocalLookupKeysNumCount() { Review Comment: 🤖 nit: `getLocalLookupKeysNumCount()` is easy to misread — 'Num' (keys-per-lookup distribution) plus 'Count' (histogram sample count) together read as 'total key count', which is not what this returns. Something like `getLocalLookupKeysSampleCount()` would be less ambiguous (same concern for `getRemoteLookupKeysNumCount()`). <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/GlobalRecordLevelIndexBackend.java: ########## @@ -84,21 +90,37 @@ public void update(String recordKey, HoodieRecordGlobalLocation recordGlobalLoca public Map<String, HoodieRecordGlobalLocation> get(List<String> recordKeys) throws IOException { Map<String, HoodieRecordGlobalLocation> keysAndLocations = new HashMap<>(); List<String> missedKeys = new ArrayList<>(); - for (String key: recordKeys) { + if (metrics != null) { + metrics.startLocalIndexLookup(); Review Comment: 🤖 nit: the four scattered `if (metrics != null)` guards fragment the lookup logic and make it easy to accidentally omit one when adding a new metric. Have you considered initializing `metrics` to a no-op instance so the guards can be dropped entirely? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
