This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 780595b73c99 fix(flink): fix the write handle close for append write
(#18756)
780595b73c99 is described below
commit 780595b73c99801c1903c74b1efcb79f9f46ee6e
Author: Danny Chan <[email protected]>
AuthorDate: Sat May 16 14:17:39 2026 +0800
fix(flink): fix the write handle close for append write (#18756)
---
.../hudi/sink/append/AppendWriteFunction.java | 11 +++
.../AppendWriteFunctionWithBIMBufferSort.java | 12 ++-
.../hudi/sink/append/TestAppendWriteFunction.java | 17 ++++
.../TestAppendWriteFunctionWithBIMBufferSort.java | 106 +++++++++++++++++++++
4 files changed, 144 insertions(+), 2 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
index 5a3047646715..ca5bf00dc46d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java
@@ -166,6 +166,17 @@ public class AppendWriteFunction<I> extends
AbstractStreamWriteFunction<I> {
writeMetrics.registerMetrics();
}
+ @Override
+ public void close() throws Exception {
+ try {
+ if (this.writerHelper != null) {
+ this.writerHelper.close();
+ }
+ } finally {
+ super.close();
+ }
+ }
+
/**
* Update metrics and log for errors in write status.
*
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
index 89d4fd2c8f12..091f018c3c5f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
@@ -48,6 +48,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -228,8 +229,15 @@ public class AppendWriteFunctionWithBIMBufferSort<T>
extends AppendWriteFunction
@Override
public void close() throws Exception {
try {
- if (asyncWriteExecutor != null && !asyncWriteExecutor.isShutdown()) {
- asyncWriteExecutor.shutdown();
+ if (asyncWriteExecutor != null) {
+ if (!asyncWriteExecutor.isShutdown()) {
+ asyncWriteExecutor.shutdown();
+ }
+ // Do not release the sort buffers while an already-submitted flush is
still using them.
+ waitForAsyncWriteCompletion();
+ if (!asyncWriteExecutor.awaitTermination(10, TimeUnit.MINUTES)) {
+ log.warn("Timed out waiting for async write executor to terminate");
+ }
}
if (memorySegmentPools != null) {
for (MemorySegmentPool memorySegmentPool : memorySegmentPools) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
index 97d8ab934e0a..3b9d4ae3d7cb 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java
@@ -23,8 +23,11 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.util.StreamerUtil;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
import org.junit.jupiter.api.Test;
import java.util.Collections;
@@ -40,6 +43,8 @@ import org.junit.jupiter.api.BeforeEach;
import java.util.Arrays;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
/**
* Test cases for {@link AppendWriteFunction}.
@@ -121,4 +126,16 @@ public class TestAppendWriteFunction {
// Should record total 5 failures across both write statuses
assertEquals(5, flinkStreamWriteMetrics.getNumOfRecordWriteFailures());
}
+
+ @Test
+ void testCloseClosesWriterHelper() throws Exception {
+ AppendWriteFunction<Object> function =
+ new AppendWriteFunction<>(new Configuration(),
RowType.of(VarCharType.STRING_TYPE));
+ BulkInsertWriterHelper writerHelper = mock(BulkInsertWriterHelper.class);
+ function.writerHelper = writerHelper;
+
+ function.close();
+
+ verify(writerHelper).close();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunctionWithBIMBufferSort.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunctionWithBIMBufferSort.java
new file mode 100644
index 000000000000..c342e4062062
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunctionWithBIMBufferSort.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.append;
+
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test cases for {@link AppendWriteFunctionWithBIMBufferSort}.
+ */
+public class TestAppendWriteFunctionWithBIMBufferSort {
+
+ @Test
+ @Timeout(10)
+ void testCloseWaitsForAsyncWriteBeforeClosingWriterHelper() throws Exception
{
+ AppendWriteFunctionWithBIMBufferSort<Object> function =
+ new AppendWriteFunctionWithBIMBufferSort<>(new Configuration(),
RowType.of(VarCharType.STRING_TYPE));
+ ExecutorService asyncWriteExecutor = Executors.newSingleThreadExecutor();
+ CountDownLatch asyncWriteStarted = new CountDownLatch(1);
+ CountDownLatch releaseAsyncWrite = new CountDownLatch(1);
+ AtomicBoolean writerHelperClosed = new AtomicBoolean(false);
+ BulkInsertWriterHelper writerHelper = mock(BulkInsertWriterHelper.class);
+ doAnswer(invocation -> {
+ writerHelperClosed.set(true);
+ return null;
+ }).when(writerHelper).close();
+ function.writerHelper = writerHelper;
+
+ CompletableFuture<Void> asyncWriteTask = CompletableFuture.runAsync(() -> {
+ asyncWriteStarted.countDown();
+ try {
+ releaseAsyncWrite.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }, asyncWriteExecutor);
+ assertTrue(asyncWriteStarted.await(1, TimeUnit.SECONDS));
+
+ setField(function, "asyncWriteExecutor", asyncWriteExecutor);
+ setField(function, "asyncWriteTask", new
AtomicReference<>(asyncWriteTask));
+ setField(function, "isBackgroundBufferBeingProcessed", new
AtomicBoolean(true));
+
+ CountDownLatch closeStarted = new CountDownLatch(1);
+ CompletableFuture<Void> closeTask = CompletableFuture.runAsync(() -> {
+ try {
+ closeStarted.countDown();
+ function.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertTrue(closeStarted.await(1, TimeUnit.SECONDS));
+ Thread.sleep(100);
+ assertFalse(closeTask.isDone());
+ assertFalse(writerHelperClosed.get());
+
+ releaseAsyncWrite.countDown();
+ closeTask.get(1, TimeUnit.SECONDS);
+
+ assertTrue(writerHelperClosed.get());
+ verify(writerHelper).close();
+ }
+
+ private static void setField(Object target, String fieldName, Object value)
throws Exception {
+ Field field =
AppendWriteFunctionWithBIMBufferSort.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+}