This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2288c079833 [HUDI-8782] BulkInsertWriterHelper parallel close (#12518)
2288c079833 is described below
commit 2288c07983322017bfb63fa22c4e998a25dca2c5
Author: fhan <[email protected]>
AuthorDate: Tue Jan 7 15:42:16 2025 +0800
[HUDI-8782] BulkInsertWriterHelper parallel close (#12518)
* parralel close draft
* update awaitTermination to 10 minutes in close
* deal with empty handles in close
* hard code close to 10 max parallelism
---------
Co-authored-by: fhan <[email protected]>
---
.../hudi/sink/bulk/BulkInsertWriterHelper.java | 34 ++++++++++++++++++++--
1 file changed, 31 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
index 9b9b85ba6e3..dd2368b1ff5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriterHelper.java
@@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
import org.apache.hudi.table.HoodieTable;
@@ -45,6 +46,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.FutureUtils.allOf;
/**
* Helper class for bulk insert used by Flink.
@@ -164,9 +172,29 @@ public class BulkInsertWriterHelper {
}
public void close() throws IOException {
- for (HoodieRowDataCreateHandle rowCreateHandle : handles.values()) {
- LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName());
- writeStatusList.add(closeWriteHandle(rowCreateHandle));
+ if (handles.isEmpty()) {
+ return;
+ }
+ int handsSize = Math.min(handles.size(), 10);
+ ExecutorService executorService = Executors.newFixedThreadPool(handsSize);
+ allOf(handles.values().stream()
+ .map(rowCreateHandle -> CompletableFuture.supplyAsync(() -> {
+ try {
+ LOG.info("Closing bulk insert file " +
rowCreateHandle.getFileName());
+ return rowCreateHandle.close();
+ } catch (IOException e) {
+ throw new HoodieIOException("IOE during rowCreateHandle.close()",
e);
+ }
+ }, executorService))
+ .collect(Collectors.toList())
+ ).whenComplete((result, throwable) -> {
+ writeStatusList.addAll(result);
+ }).join();
+ try {
+ executorService.shutdown();
+ executorService.awaitTermination(10, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
handles.clear();
handle = null;