This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 42ae8584a36c9b152a162832d987e6da79d7f89d Author: Danny Chan <[email protected]> AuthorDate: Sat May 16 14:17:39 2026 +0800 fix(flink): fix the write handle close for append write (#18756) --- .../hudi/sink/append/AppendWriteFunction.java | 11 +++ .../AppendWriteFunctionWithBIMBufferSort.java | 12 ++- .../hudi/sink/append/TestAppendWriteFunction.java | 17 ++++ .../TestAppendWriteFunctionWithBIMBufferSort.java | 106 +++++++++++++++++++++ 4 files changed, 144 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java index 5a3047646715..ca5bf00dc46d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java @@ -166,6 +166,17 @@ public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> { writeMetrics.registerMetrics(); } + @Override + public void close() throws Exception { + try { + if (this.writerHelper != null) { + this.writerHelper.close(); + } + } finally { + super.close(); + } + } + /** * Update metrics and log for errors in write status. * diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java index 89d4fd2c8f12..091f018c3c5f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -228,8 +229,15 @@ public class AppendWriteFunctionWithBIMBufferSort<T> extends AppendWriteFunction @Override public void close() throws Exception { try { - if (asyncWriteExecutor != null && !asyncWriteExecutor.isShutdown()) { - asyncWriteExecutor.shutdown(); + if (asyncWriteExecutor != null) { + if (!asyncWriteExecutor.isShutdown()) { + asyncWriteExecutor.shutdown(); + } + // Do not release the sort buffers while an already-submitted flush is still using them. + waitForAsyncWriteCompletion(); + if (!asyncWriteExecutor.awaitTermination(10, TimeUnit.MINUTES)) { + log.warn("Timed out waiting for async write executor to terminate"); + } } if (memorySegmentPools != null) { for (MemorySegmentPool memorySegmentPool : memorySegmentPools) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java index 97d8ab934e0a..3b9d4ae3d7cb 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunction.java @@ -23,8 +23,11 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; import org.apache.hudi.util.StreamerUtil; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; import org.junit.jupiter.api.Test; import java.util.Collections; @@ -40,6 +43,8 @@ import org.junit.jupiter.api.BeforeEach; import java.util.Arrays; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; /** * Test cases for {@link AppendWriteFunction}. @@ -121,4 +126,16 @@ public class TestAppendWriteFunction { // Should record total 5 failures across both write statuses assertEquals(5, flinkStreamWriteMetrics.getNumOfRecordWriteFailures()); } + + @Test + void testCloseClosesWriterHelper() throws Exception { + AppendWriteFunction<Object> function = + new AppendWriteFunction<>(new Configuration(), RowType.of(VarCharType.STRING_TYPE)); + BulkInsertWriterHelper writerHelper = mock(BulkInsertWriterHelper.class); + function.writerHelper = writerHelper; + + function.close(); + + verify(writerHelper).close(); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunctionWithBIMBufferSort.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunctionWithBIMBufferSort.java new file mode 100644 index 000000000000..c342e4062062 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/TestAppendWriteFunctionWithBIMBufferSort.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.lang.reflect.Field; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +/** + * Test cases for {@link AppendWriteFunctionWithBIMBufferSort}. + */ +public class TestAppendWriteFunctionWithBIMBufferSort { + + @Test + @Timeout(10) + void testCloseWaitsForAsyncWriteBeforeClosingWriterHelper() throws Exception { + AppendWriteFunctionWithBIMBufferSort<Object> function = + new AppendWriteFunctionWithBIMBufferSort<>(new Configuration(), RowType.of(VarCharType.STRING_TYPE)); + ExecutorService asyncWriteExecutor = Executors.newSingleThreadExecutor(); + CountDownLatch asyncWriteStarted = new CountDownLatch(1); + CountDownLatch releaseAsyncWrite = new CountDownLatch(1); + AtomicBoolean writerHelperClosed = new AtomicBoolean(false); + BulkInsertWriterHelper writerHelper = mock(BulkInsertWriterHelper.class); + doAnswer(invocation -> { + writerHelperClosed.set(true); + return null; + }).when(writerHelper).close(); + function.writerHelper = writerHelper; + + CompletableFuture<Void> asyncWriteTask = CompletableFuture.runAsync(() -> { + asyncWriteStarted.countDown(); + try { + releaseAsyncWrite.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, asyncWriteExecutor); + assertTrue(asyncWriteStarted.await(1, TimeUnit.SECONDS)); + + setField(function, "asyncWriteExecutor", asyncWriteExecutor); + setField(function, "asyncWriteTask", new AtomicReference<>(asyncWriteTask)); + setField(function, "isBackgroundBufferBeingProcessed", new AtomicBoolean(true)); + + CountDownLatch closeStarted = new CountDownLatch(1); + CompletableFuture<Void> closeTask = CompletableFuture.runAsync(() -> { + try { + closeStarted.countDown(); + function.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + assertTrue(closeStarted.await(1, TimeUnit.SECONDS)); + Thread.sleep(100); + assertFalse(closeTask.isDone()); + assertFalse(writerHelperClosed.get()); + + releaseAsyncWrite.countDown(); + closeTask.get(1, TimeUnit.SECONDS); + + assertTrue(writerHelperClosed.get()); + verify(writerHelper).close(); + } + + private static void setField(Object target, String fieldName, Object value) throws Exception { + Field field = AppendWriteFunctionWithBIMBufferSort.class.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +}
