This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 780595b73c99 fix(flink): fix the write handle close for append write 
(#18756)
780595b73c99 is described below

commit 780595b73c99801c1903c74b1efcb79f9f46ee6e
Author: Danny Chan <[email protected]>
AuthorDate: Sat May 16 14:17:39 2026 +0800

    fix(flink): fix the write handle close for append write (#18756)
---
 .../hudi/sink/append/AppendWriteFunction.java      |  11 +++
 .../AppendWriteFunctionWithBIMBufferSort.java      |  12 ++-
 .../hudi/sink/append/TestAppendWriteFunction.java  |  17 ++++
 .../TestAppendWriteFunctionWithBIMBufferSort.java  | 106 +++++++++++++++++++++
 4 files changed, 144 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 5a3047646715..ca5bf00dc46d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -166,6 +166,17 @@ public class AppendWriteFunction<I> extends 
AbstractStreamWriteFunction<I> {
     writeMetrics.registerMetrics();
   }
 
+  @Override
+  public void close() throws Exception {
+    try {
+      if (this.writerHelper != null) {
+        this.writerHelper.close();
+      }
+    } finally {
+      super.close();
+    }
+  }
+
   /**
    * Update metrics and log for errors in write status.
    *
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
index 89d4fd2c8f12..091f018c3c5f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
@@ -48,6 +48,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -228,8 +229,15 @@ public class AppendWriteFunctionWithBIMBufferSort<T> 
extends AppendWriteFunction
   @Override
   public void close() throws Exception {
     try {
-      if (asyncWriteExecutor != null && !asyncWriteExecutor.isShutdown()) {
-        asyncWriteExecutor.shutdown();
+      if (asyncWriteExecutor != null) {
+        if (!asyncWriteExecutor.isShutdown()) {
+          asyncWriteExecutor.shutdown();
+        }
+        // Do not release the sort buffers while an already-submitted flush is 
still using them.
+        waitForAsyncWriteCompletion();
+        if (!asyncWriteExecutor.awaitTermination(10, TimeUnit.MINUTES)) {
+          log.warn("Timed out waiting for async write executor to terminate");
+        }
       }
       if (memorySegmentPools != null) {
         for (MemorySegmentPool memorySegmentPool : memorySegmentPools) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
index 97d8ab934e0a..3b9d4ae3d7cb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
@@ -23,8 +23,11 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
 import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
@@ -40,6 +43,8 @@ import org.junit.jupiter.api.BeforeEach;
 import java.util.Arrays;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 /**
  * Test cases for {@link AppendWriteFunction}.
@@ -121,4 +126,16 @@ public class TestAppendWriteFunction {
     // Should record total 5 failures across both write statuses
     assertEquals(5, flinkStreamWriteMetrics.getNumOfRecordWriteFailures());
   }
+
+  @Test
+  void testCloseClosesWriterHelper() throws Exception {
+    AppendWriteFunction<Object> function =
+        new AppendWriteFunction<>(new Configuration(), 
RowType.of(VarCharType.STRING_TYPE));
+    BulkInsertWriterHelper writerHelper = mock(BulkInsertWriterHelper.class);
+    function.writerHelper = writerHelper;
+
+    function.close();
+
+    verify(writerHelper).close();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunctionWithBIMBufferSort.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunctionWithBIMBufferSort.java
new file mode 100644
index 000000000000..c342e4062062
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunctionWithBIMBufferSort.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.append;
+
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test cases for {@link AppendWriteFunctionWithBIMBufferSort}.
+ */
+public class TestAppendWriteFunctionWithBIMBufferSort {
+
+  @Test
+  @Timeout(10)
+  void testCloseWaitsForAsyncWriteBeforeClosingWriterHelper() throws Exception 
{
+    AppendWriteFunctionWithBIMBufferSort<Object> function =
+        new AppendWriteFunctionWithBIMBufferSort<>(new Configuration(), 
RowType.of(VarCharType.STRING_TYPE));
+    ExecutorService asyncWriteExecutor = Executors.newSingleThreadExecutor();
+    CountDownLatch asyncWriteStarted = new CountDownLatch(1);
+    CountDownLatch releaseAsyncWrite = new CountDownLatch(1);
+    AtomicBoolean writerHelperClosed = new AtomicBoolean(false);
+    BulkInsertWriterHelper writerHelper = mock(BulkInsertWriterHelper.class);
+    doAnswer(invocation -> {
+      writerHelperClosed.set(true);
+      return null;
+    }).when(writerHelper).close();
+    function.writerHelper = writerHelper;
+
+    CompletableFuture<Void> asyncWriteTask = CompletableFuture.runAsync(() -> {
+      asyncWriteStarted.countDown();
+      try {
+        releaseAsyncWrite.await();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }, asyncWriteExecutor);
+    assertTrue(asyncWriteStarted.await(1, TimeUnit.SECONDS));
+
+    setField(function, "asyncWriteExecutor", asyncWriteExecutor);
+    setField(function, "asyncWriteTask", new 
AtomicReference<>(asyncWriteTask));
+    setField(function, "isBackgroundBufferBeingProcessed", new 
AtomicBoolean(true));
+
+    CountDownLatch closeStarted = new CountDownLatch(1);
+    CompletableFuture<Void> closeTask = CompletableFuture.runAsync(() -> {
+      try {
+        closeStarted.countDown();
+        function.close();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    assertTrue(closeStarted.await(1, TimeUnit.SECONDS));
+    Thread.sleep(100);
+    assertFalse(closeTask.isDone());
+    assertFalse(writerHelperClosed.get());
+
+    releaseAsyncWrite.countDown();
+    closeTask.get(1, TimeUnit.SECONDS);
+
+    assertTrue(writerHelperClosed.get());
+    verify(writerHelper).close();
+  }
+
+  private static void setField(Object target, String fieldName, Object value) 
throws Exception {
+    Field field = 
AppendWriteFunctionWithBIMBufferSort.class.getDeclaredField(fieldName);
+    field.setAccessible(true);
+    field.set(target, value);
+  }
+}

Reply via email to