This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b203f53a128 Move FileIO close from RecordWriter to RecordWriterManager 
(#37782)
b203f53a128 is described below

commit b203f53a1283598448ce46f5343877c366a6ad03
Author: Deji Ibrahim <[email protected]>
AuthorDate: Mon Mar 9 19:14:58 2026 +0100

    Move FileIO close from RecordWriter to RecordWriterManager (#37782)
    
    * Move FileIO close from RecordWriter to RecordWriterManager
    
    * fix
    
    * clarify FileIO ownership comments and verify close
---
 .../apache/beam/sdk/io/iceberg/RecordWriter.java   |  25 +----
 .../beam/sdk/io/iceberg/RecordWriterManager.java   |  64 ++++++++-----
 .../sdk/io/iceberg/RecordWriterManagerTest.java    | 102 ++++++++++++++++++++-
 3 files changed, 145 insertions(+), 46 deletions(-)

diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
index d233b0ac05b..82251c00e72 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
@@ -31,10 +31,8 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
 import org.apache.iceberg.encryption.EncryptionKeyMetadata;
 import org.apache.iceberg.io.DataWriter;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.parquet.Parquet;
-import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +45,6 @@ class RecordWriter {
   private final Table table;
   private final String absoluteFilename;
   private final FileFormat fileFormat;
-  private @Nullable FileIO io;
 
   RecordWriter(
       Catalog catalog, IcebergDestination destination, String filename, 
PartitionKey partitionKey)
@@ -74,11 +71,9 @@ class RecordWriter {
     }
     OutputFile outputFile;
     EncryptionKeyMetadata keyMetadata;
-    // Keep FileIO open for the lifetime of this writer to avoid
-    // premature shutdown of underlying client pools (e.g., S3),
-    // which manifests as "Connection pool shut down" (Issue #36438).
-    this.io = table.io();
-    OutputFile tmpFile = io.newOutputFile(absoluteFilename);
+    // table.io() may return a shared FileIO instance.
+    // FileIO lifecycle is managed by RecordWriterManager.close().
+    OutputFile tmpFile = table.io().newOutputFile(absoluteFilename);
     EncryptedOutputFile encryptedOutputFile = 
table.encryption().encrypt(tmpFile);
     outputFile = encryptedOutputFile.encryptingOutputFile();
     keyMetadata = encryptedOutputFile.keyMetadata();
@@ -135,20 +130,6 @@ class RecordWriter {
                   fileFormat, table.name(), absoluteFilename),
               e);
     } finally {
-      // Always attempt to close FileIO and decrement metrics
-      if (io != null) {
-        try {
-          io.close();
-        } catch (Exception ioCloseError) {
-          if (closeError != null) {
-            closeError.addSuppressed(ioCloseError);
-          } else {
-            closeError = new IOException("Failed to close FileIO", 
ioCloseError);
-          }
-        } finally {
-          io = null;
-        }
-      }
       activeIcebergWriters.dec();
     }
 
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
index da62fb65884..eb79513df4f 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
@@ -29,8 +29,10 @@ import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.schemas.Schema;
@@ -58,6 +60,7 @@ import org.apache.iceberg.data.InternalRecordWrapper;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.transforms.Transforms;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
@@ -403,33 +406,50 @@ class RecordWriterManager implements AutoCloseable {
    */
   @Override
   public void close() throws IOException {
-    for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
-        windowedDestinationAndState : destinations.entrySet()) {
-      DestinationState state = windowedDestinationAndState.getValue();
+    try {
+      for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
+          windowedDestinationAndState : destinations.entrySet()) {
+        DestinationState state = windowedDestinationAndState.getValue();
 
-      // removing writers from the state's cache will trigger the logic to 
collect each writer's
-      // data file.
-      state.writers.invalidateAll();
-      // first check for any exceptions swallowed by the cache
-      if (!state.exceptions.isEmpty()) {
-        IllegalStateException exception =
-            new IllegalStateException(
-                String.format("Encountered %s failed writer(s).", 
state.exceptions.size()));
-        for (Exception e : state.exceptions) {
-          exception.addSuppressed(e);
+        // removing writers from the state's cache will trigger the logic to 
collect each writer's
+        // data file.
+        state.writers.invalidateAll();
+        // first check for any exceptions swallowed by the cache
+        if (!state.exceptions.isEmpty()) {
+          IllegalStateException exception =
+              new IllegalStateException(
+                  String.format("Encountered %s failed writer(s).", 
state.exceptions.size()));
+          for (Exception e : state.exceptions) {
+            exception.addSuppressed(e);
+          }
+          throw exception;
         }
-        throw exception;
-      }
 
-      if (state.dataFiles.isEmpty()) {
-        continue;
-      }
+        if (state.dataFiles.isEmpty()) {
+          continue;
+        }
 
-      totalSerializableDataFiles.put(
-          windowedDestinationAndState.getKey(), new 
ArrayList<>(state.dataFiles));
-      state.dataFiles.clear();
+        totalSerializableDataFiles.put(
+            windowedDestinationAndState.getKey(), new 
ArrayList<>(state.dataFiles));
+        state.dataFiles.clear();
+      }
+    } finally {
+      // Close unique FileIO instances now that all writers are done.
+      // table.io() may return a shared FileIO; we deduplicate by identity
+      // so we close each underlying connection pool exactly once.
+      Set<FileIO> closedIOs = new HashSet<>();
+      for (DestinationState state : destinations.values()) {
+        FileIO io = state.table.io();
+        if (io != null && closedIOs.add(io)) {
+          try {
+            io.close();
+          } catch (Exception e) {
+            LOG.warn("Failed to close FileIO for table '{}'", 
state.table.name(), e);
+          }
+        }
+      }
+      destinations.clear();
     }
-    destinations.clear();
     checkArgument(
         openWriters == 0,
         "Expected all data writers to be closed, but found %s data writer(s) 
still open",
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
index 375d9073711..2672ac70c08 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
@@ -65,6 +65,7 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
@@ -957,7 +958,7 @@ public class RecordWriterManagerTest {
   }
 
   @Test
-  public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
+  public void testRecordWriterDoesNotCloseSharedFileIO() throws IOException {
     TableIdentifier tableId =
         TableIdentifier.of(
             "default",
@@ -980,7 +981,104 @@ public class RecordWriterManagerTest {
     writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
     writer.close();
 
-    assertTrue("FileIO should be closed after writer close", 
trackingFileIO.closed);
+    // RecordWriter must NOT close FileIO — it may be a shared instance.
+    assertFalse("RecordWriter.close() must not close the shared FileIO", 
trackingFileIO.closed);
+  }
+
+  /**
+   * Verifies that when multiple writers share the same FileIO, closing any 
writer does not close
+   * the shared FileIO — that is the responsibility of 
RecordWriterManager.close().
+   */
+  @Test
+  public void testMultipleWritersSharingFileIOSurviveBatchClose() throws 
IOException {
+    // Create two tables that share the same FileIO (simulating dynamic 
destinations
+    // backed by the same catalog)
+    TableIdentifier tableId1 =
+        TableIdentifier.of(
+            "default",
+            "table_batch_close_a_" + UUID.randomUUID().toString().replace("-", 
"").substring(0, 6));
+    TableIdentifier tableId2 =
+        TableIdentifier.of(
+            "default",
+            "table_batch_close_b_" + UUID.randomUUID().toString().replace("-", 
"").substring(0, 6));
+    Table table1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA);
+    Table table2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA);
+
+    // Both tables share the same CloseTrackingFileIO — mirrors how some 
catalogs
+    // return a shared FileIO instance across tables
+    CloseTrackingFileIO sharedFileIO = new CloseTrackingFileIO(table1.io());
+    Table spyTable1 = Mockito.spy(table1);
+    Table spyTable2 = Mockito.spy(table2);
+    Mockito.doReturn(sharedFileIO).when(spyTable1).io();
+    Mockito.doReturn(sharedFileIO).when(spyTable2).io();
+
+    PartitionKey pk1 = new PartitionKey(spyTable1.spec(), spyTable1.schema());
+    PartitionKey pk2 = new PartitionKey(spyTable2.spec(), spyTable2.schema());
+
+    RecordWriter writer1 = new RecordWriter(spyTable1, FileFormat.PARQUET, 
"file1.parquet", pk1);
+    RecordWriter writer2 = new RecordWriter(spyTable2, FileFormat.PARQUET, 
"file2.parquet", pk2);
+
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    Record record = IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row);
+
+    writer1.write(record);
+    writer2.write(record);
+
+    writer1.close();
+    assertFalse("FileIO must remain open between batch writer closes", 
sharedFileIO.closed);
+
+    writer2.close();
+    assertFalse("FileIO must remain open after all writers close", 
sharedFileIO.closed);
+
+    // Both writers produced valid data files
+    assertNotNull(writer1.getDataFile());
+    assertNotNull(writer2.getDataFile());
+  }
+
+  /**
+   * Verifies that RecordWriterManager.close() flushes data files from 
multiple destinations and
+   * closes the shared FileIO.
+   */
+  @Test
+  public void testRecordWriterManagerClosesSharedFileIOAfterFlush() throws 
IOException {
+    String tableName1 =
+        "table_mgr_io_a_" + UUID.randomUUID().toString().replace("-", 
"").substring(0, 6);
+    String tableName2 =
+        "table_mgr_io_b_" + UUID.randomUUID().toString().replace("-", 
"").substring(0, 6);
+    TableIdentifier tableId1 = TableIdentifier.of("default", tableName1);
+    TableIdentifier tableId2 = TableIdentifier.of("default", tableName2);
+
+    Table realTable1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA);
+    Table realTable2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA);
+
+    CloseTrackingFileIO sharedTrackingIO = new 
CloseTrackingFileIO(realTable1.io());
+    Table spyTable1 = Mockito.spy(realTable1);
+    Table spyTable2 = Mockito.spy(realTable2);
+    Mockito.doReturn(sharedTrackingIO).when(spyTable1).io();
+    Mockito.doReturn(sharedTrackingIO).when(spyTable2).io();
+
+    Catalog spyCatalog = Mockito.spy(catalog);
+    Mockito.doReturn(spyTable1).when(spyCatalog).loadTable(tableId1);
+    Mockito.doReturn(spyTable2).when(spyCatalog).loadTable(tableId2);
+
+    WindowedValue<IcebergDestination> dest1 = 
getWindowedDestination(tableName1, null);
+    WindowedValue<IcebergDestination> dest2 = 
getWindowedDestination(tableName2, null);
+
+    RecordWriterManager writerManager =
+        new RecordWriterManager(spyCatalog, "test_file_name", 1000, 3);
+
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+    assertTrue(writerManager.write(dest1, row));
+    assertTrue(writerManager.write(dest2, row));
+    assertEquals(2, writerManager.openWriters);
+
+    writerManager.close();
+
+    Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>> 
dataFiles =
+        writerManager.getSerializableDataFiles();
+    assertTrue("Should have data files for dest1", 
dataFiles.containsKey(dest1));
+    assertTrue("Should have data files for dest2", 
dataFiles.containsKey(dest2));
+    assertTrue("Shared FileIO should be closed", sharedTrackingIO.closed);
   }
 
   private static final class CloseTrackingFileIO implements FileIO {

Reply via email to