lrsb commented on code in PR #16015:
URL: https://github.com/apache/iceberg/pull/16015#discussion_r3169154730


##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java:
##########
@@ -172,34 +182,65 @@ public String toString() {
 
   @Override
   public Collection<DynamicWriteResult> prepareCommit() throws IOException {
-    List<DynamicWriteResult> result = Lists.newArrayList();
-    for (Map.Entry<WriteTarget, TaskWriter<RowData>> entry : 
writers.entrySet()) {
-      long startNano = System.nanoTime();
-      WriteResult writeResult = entry.getValue().complete();
-      WriteTarget writeTarget = entry.getKey();
-      metrics.updateFlushResult(writeTarget.tableName(), writeResult);
-      metrics.flushDuration(
-          writeTarget.tableName(), 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano));
-      LOG.debug(
-          "Iceberg writer for table {} subtask {} attempt {} flushed {} data 
files and {} delete files",
-          writeTarget.tableName(),
-          subTaskId,
-          attemptId,
-          writeResult.dataFiles().length,
-          writeResult.deleteFiles().length);
+    List<CompletableFuture<FlushOutcome>> futures =
+        writers.entrySet().stream().map(e -> flushAsync(e.getKey(), 
e.getValue())).toList();
+    writers.clear();
+
+    try {
+      
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
+    } catch (CompletionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof UncheckedIOException uncheckedIOException) {
+        throw uncheckedIOException.getCause();
+      }
+      throw new IOException(cause);
+    }
 
+    List<DynamicWriteResult> result = Lists.newArrayList();
+    for (CompletableFuture<FlushOutcome> future : futures) {
+      FlushOutcome flush = future.join();
+      recordFlushMetrics(flush.writeTarget(), flush.writeResult(), 
flush.durationNanos());
       result.add(
           new DynamicWriteResult(
-              new TableKey(writeTarget.tableName(), writeTarget.branch()),
-              writeTarget.specId(),
-              writeResult));
+              new TableKey(flush.writeTarget().tableName(), 
flush.writeTarget().branch()),
+              flush.writeTarget().specId(),
+              flush.writeResult()));
     }
 
-    writers.clear();
-
     return result;
   }
 
+  private CompletableFuture<FlushOutcome> flushAsync(
+      WriteTarget writeTarget, TaskWriter<RowData> writer) {
+    return CompletableFuture.supplyAsync(
+        () -> {
+          try {
+            long startNano = System.nanoTime();
+            WriteResult writeResult = writer.complete();
+            return new FlushOutcome(writeTarget, writeResult, 
System.nanoTime() - startNano);

Review Comment:
   Moved to a concurrent HM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to