danny0405 commented on code in PR #18843:
URL: https://github.com/apache/hudi/pull/18843#discussion_r3303487238


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -672,17 +702,28 @@ private void bufferDelete(HoodieRecord<T> hoodieRecord) {
 
   /**
    * Checks if the number of records have reached the set threshold and then 
flushes the records to disk.
+   *
+   * <p>{@code bufferedRecord} is the record that was just appended to {@link 
#recordList} by
+   * {@link #writeToBuffer} (or {@code null} for delete/ignored windows where 
{@code recordList}
+   * did not grow). Sizing this object — rather than the incoming pre-{@code 
prepareRecord}
+   * record — keeps {@link #averageRecordSize} aligned with what is actually 
retained in heap,
+   * which matters on Spark engines where the incoming {@code 
HoodieSparkRecord}/{@code UnsafeRow}
+   * is many times smaller than the buffered {@code HoodieAvroIndexedRecord}.
    */
-  protected void flushToDiskIfRequired(HoodieRecord record, boolean 
appendDeleteBlocks) {
-    if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)
-        || numberOfRecords % NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE == 0) {
-      averageRecordSize = (long) (averageRecordSize * 0.8 + 
sizeEstimator.sizeEstimate(record) * 0.2);
+  protected void flushToDiskIfRequired(HoodieRecord bufferedRecord, boolean 
appendDeleteBlocks) {
+    if (bufferedRecord != null
+        && (averageRecordSize == 0
+            || numberOfRecords >= (int) (maxBlockSize / 
Math.max(averageRecordSize, 1))
+            || numberOfRecords % NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE == 
0)) {
+      long sampled = sizeEstimator.sizeEstimate(bufferedRecord);
+      averageRecordSize = averageRecordSize == 0
+          ? sampled
+          : (long) (averageRecordSize * 0.8 + sampled * 0.2);
     }
 
-    // Append if max number of records reached to achieve block size
-    if (numberOfRecords >= (maxBlockSize / averageRecordSize)) {
-      // Recompute averageRecordSize before writing a new block and update 
existing value with
-      // avg of new and old
+    // Append if max number of records reached to achieve block size.
+    // Skip when averageRecordSize is still 0 (delete-only prefix before any 
insert/update).
+    if (averageRecordSize > 0 && numberOfRecords >= (maxBlockSize / 
averageRecordSize)) {

Review Comment:
   `averageRecordSize > 0` is this right? we should allow pure delete block to 
flush? the delete record buffer also takes small amount of memory too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to