This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7c9a1ddc1b2 [HUDI-9303] Fix timer is not reset properly after
RowDataLogWriteHandle flushing a data block (#13132)
7c9a1ddc1b2 is described below
commit 7c9a1ddc1b2b39b5a3a634691a83054b1b876f0b
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Apr 11 10:43:34 2025 +0800
[HUDI-9303] Fix timer is not reset properly after RowDataLogWriteHandle
flushing a data block (#13132)
---
.../org/apache/hudi/io/v2/RowDataLogWriteHandle.java | 1 +
.../org/apache/hudi/sink/TestWriteMergeOnRead.java | 20 ++++++++++++++++++++
2 files changed, 21 insertions(+)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
index 281b197171b..f89169e75fd 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
@@ -149,6 +149,7 @@ public class RowDataLogWriteHandle<T, I, K, O>
assert stat.getRuntimeStats() != null;
LOG.info("WriteHandle for partitionPath {} filePath {}, took {} ms.",
partitionPath, stat.getPath(),
stat.getRuntimeStats().getTotalUpsertTime());
+ timer.startTimer();
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 94214661ea8..ce0581f3300 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -18,6 +18,7 @@
package org.apache.hudi.sink;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieClusteringConfig;
@@ -201,6 +202,25 @@ public class TestWriteMergeOnRead extends
TestWriteCopyOnWrite {
.end();
}
+ @Test
+ void testWriteMorWithSmallLogBlock() throws Exception {
+ // total 5 records, average records size is 48,
+ // set max block size as 128 to trigger a flush during write log data
blocks
+ conf.setString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(),
"128");
+
+ Map<String, String> expected = new HashMap<>();
+ expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
+
+ preparePipeline()
+ .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+ .assertEmptyDataFiles()
+ .checkpoint(1)
+ .assertNextEvent()
+ .checkpointComplete(1)
+ .checkWrittenData(expected, 1)
+ .end();
+ }
+
@Override
protected Map<String, String> getExpectedBeforeCheckpointComplete() {
return EXPECTED1;