stream2000 commented on code in PR #9118: URL: https://github.com/apache/hudi/pull/9118#discussion_r1354278561
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java: ########## @@ -385,16 +393,24 @@ private String getBucketID(HoodieRecord<?> record) { * @param value HoodieRecord */ protected void bufferRecord(HoodieRecord<?> value) { + writeMetrics.markRecordIn(); final String bucketID = getBucketID(value); DataBucket bucket = this.buckets.computeIfAbsent(bucketID, - k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value)); + k -> { + // create a new bucket and update metrics + writeMetrics.increaseNumOfOpenHandle(); + return new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value); Review Comment: Yes, for stream write we will only create the handle when flushing buckets. To avoid misleading, remove metrics here since for stream write, the metrics `numOfFilesWritten` is enough. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java: ########## @@ -488,11 +505,24 @@ private void flushRemaining(boolean endInput) { this.writeStatuses.addAll(writeStatus); // blocks flushing until the coordinator starts a new instant this.confirming = true; + + writeMetrics.endFlushing(); + writeMetrics.resetAfterCommit(); + } + + private void registerMetrics() { + MetricGroup metrics = getRuntimeContext().getMetricGroup(); + writeMetrics = new FlinkStreamWriteMetrics(metrics); + writeMetrics.registerMetrics(); } protected List<WriteStatus> writeBucket(String instant, DataBucket bucket, List<HoodieRecord> records) { bucket.preWrite(records); - return writeFunction.apply(records, instant); + writeMetrics.startHandleClose(); + List<WriteStatus> statuses = writeFunction.apply(records, instant); Review Comment: This name is from append write handle close, which is actually file flush. Rename it to `singleFileFlush`, which make sense in both stream write and append write. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org