stream2000 commented on code in PR #9118:
URL: https://github.com/apache/hudi/pull/9118#discussion_r1354278561


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java:
##########
@@ -385,16 +393,24 @@ private String getBucketID(HoodieRecord<?> record) {
    * @param value HoodieRecord
    */
   protected void bufferRecord(HoodieRecord<?> value) {
+    writeMetrics.markRecordIn();
     final String bucketID = getBucketID(value);
 
     DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
-        k -> new 
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
+        k -> {
+          // create a new bucket and update metrics
+          writeMetrics.increaseNumOfOpenHandle();
+          return new 
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value);

Review Comment:
   Yes, for stream write we will only create the handle when flushing buckets. 
To avoid misleading, remove metrics here since for stream write, the metrics 
`numOfFilesWritten` is enough. 



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java:
##########
@@ -488,11 +505,24 @@ private void flushRemaining(boolean endInput) {
     this.writeStatuses.addAll(writeStatus);
     // blocks flushing until the coordinator starts a new instant
     this.confirming = true;
+
+    writeMetrics.endFlushing();
+    writeMetrics.resetAfterCommit();
+  }
+
+  private void registerMetrics() {
+    MetricGroup metrics = getRuntimeContext().getMetricGroup();
+    writeMetrics = new FlinkStreamWriteMetrics(metrics);
+    writeMetrics.registerMetrics();
   }
 
   protected List<WriteStatus> writeBucket(String instant, DataBucket bucket, 
List<HoodieRecord> records) {
     bucket.preWrite(records);
-    return writeFunction.apply(records, instant);
+    writeMetrics.startHandleClose();
+    List<WriteStatus> statuses = writeFunction.apply(records, instant);

Review Comment:
   This name is from append write handle close, which is actually file flush. 
Rename it to `singleFileFlush`,  which make sense in both stream write and 
append write.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to