This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ca27526d759 Iceberg fileio close (#37168)
ca27526d759 is described below

commit ca27526d759ea617166958940671019dea6547e2
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Mon Dec 22 17:23:06 2025 -0600

    Iceberg fileio close (#37168)
    
    * fix(iceberg): prevent premature FileIO closure in RecordWriter
    
    Keep FileIO open for writer lifetime to avoid connection pool shutdown 
issues
    Add test to verify FileIO remains open until writer close
    
    * trigger ITs
    
    * also apply in AppendFilesToTables
    
    ---------
    
    Co-authored-by: liferoad <[email protected]>
---
 .../IO_Iceberg_Integration_Tests.json              |   2 +-
 .../beam/sdk/io/iceberg/AppendFilesToTables.java   |  14 +--
 .../apache/beam/sdk/io/iceberg/RecordWriter.java   |  50 +++++++---
 .../sdk/io/iceberg/RecordWriterManagerTest.java    | 105 +++++++++++++++++++++
 4 files changed, 151 insertions(+), 20 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index 34a6e02150e..b73af5e61a4 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-    "modification": 4
+    "modification": 1
 }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
index 12888b4e4e0..1789932d69a 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java
@@ -189,14 +189,14 @@ class AppendFilesToTables
         ManifestWriter<DataFile> writer;
         try (FileIO io = table.io()) {
           writer = createManifestWriter(table.location(), uuid, spec, io);
+          for (DataFile file : files) {
+            writer.add(file);
+            committedDataFileByteSize.update(file.fileSizeInBytes());
+            committedDataFileRecordCount.update(file.recordCount());
+          }
+          writer.close();
+          update.appendManifest(writer.toManifestFile());
         }
-        for (DataFile file : files) {
-          writer.add(file);
-          committedDataFileByteSize.update(file.fileSizeInBytes());
-          committedDataFileRecordCount.update(file.recordCount());
-        }
-        writer.close();
-        update.appendManifest(writer.toManifestFile());
       }
       update.commit();
     }
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
index d4a61c6d3e1..d233b0ac05b 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.io.DataWriter;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.parquet.Parquet;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +47,7 @@ class RecordWriter {
   private final Table table;
   private final String absoluteFilename;
   private final FileFormat fileFormat;
+  private @Nullable FileIO io;
 
   RecordWriter(
       Catalog catalog, IcebergDestination destination, String filename, 
PartitionKey partitionKey)
@@ -72,12 +74,14 @@ class RecordWriter {
     }
     OutputFile outputFile;
     EncryptionKeyMetadata keyMetadata;
-    try (FileIO io = table.io()) {
-      OutputFile tmpFile = io.newOutputFile(absoluteFilename);
-      EncryptedOutputFile encryptedOutputFile = 
table.encryption().encrypt(tmpFile);
-      outputFile = encryptedOutputFile.encryptingOutputFile();
-      keyMetadata = encryptedOutputFile.keyMetadata();
-    }
+    // Keep FileIO open for the lifetime of this writer to avoid
+    // premature shutdown of underlying client pools (e.g., S3),
+    // which manifests as "Connection pool shut down" (Issue #36438).
+    this.io = table.io();
+    OutputFile tmpFile = io.newOutputFile(absoluteFilename);
+    EncryptedOutputFile encryptedOutputFile = 
table.encryption().encrypt(tmpFile);
+    outputFile = encryptedOutputFile.encryptingOutputFile();
+    keyMetadata = encryptedOutputFile.keyMetadata();
 
     switch (fileFormat) {
       case AVRO:
@@ -120,16 +124,38 @@ class RecordWriter {
   }
 
   public void close() throws IOException {
+    IOException closeError = null;
     try {
       icebergDataWriter.close();
     } catch (IOException e) {
-      throw new IOException(
-          String.format(
-              "Failed to close %s writer for table %s, path: %s",
-              fileFormat, table.name(), absoluteFilename),
-          e);
+      closeError =
+          new IOException(
+              String.format(
+                  "Failed to close %s writer for table %s, path: %s",
+                  fileFormat, table.name(), absoluteFilename),
+              e);
+    } finally {
+      // Always attempt to close FileIO and decrement metrics
+      if (io != null) {
+        try {
+          io.close();
+        } catch (Exception ioCloseError) {
+          if (closeError != null) {
+            closeError.addSuppressed(ioCloseError);
+          } else {
+            closeError = new IOException("Failed to close FileIO", 
ioCloseError);
+          }
+        } finally {
+          io = null;
+        }
+      }
+      activeIcebergWriters.dec();
+    }
+
+    if (closeError != null) {
+      throw closeError;
     }
-    activeIcebergWriters.dec();
+
     DataFile dataFile = icebergDataWriter.toDataFile();
     LOG.info(
         "Closed {} writer for table '{}' ({} records, {} bytes), path: {}",
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
index 7bce0b16cb1..375d9073711 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
@@ -45,6 +45,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.values.Row;
@@ -65,6 +66,10 @@ import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
 import org.apache.iceberg.transforms.Transform;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
@@ -83,6 +88,7 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
 
 /** Test class for {@link RecordWriterManager}. */
 @RunWith(JUnit4.class)
@@ -950,6 +956,105 @@ public class RecordWriterManagerTest {
     }
   }
 
+  @Test
+  public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
+    TableIdentifier tableId =
+        TableIdentifier.of(
+            "default",
+            "table_"
+                + testName.getMethodName()
+                + "_"
+                + UUID.randomUUID().toString().replace("-", "").substring(0, 
6));
+    Table table = warehouse.createTable(tableId, ICEBERG_SCHEMA);
+
+    CloseTrackingFileIO trackingFileIO = new CloseTrackingFileIO(table.io());
+    Table spyTable = Mockito.spy(table);
+    Mockito.doReturn(trackingFileIO).when(spyTable).io();
+
+    PartitionKey partitionKey = new PartitionKey(spyTable.spec(), 
spyTable.schema());
+    RecordWriter writer =
+        new RecordWriter(spyTable, FileFormat.PARQUET, "file.parquet", 
partitionKey);
+
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+
+    writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
+    writer.close();
+
+    assertTrue("FileIO should be closed after writer close", 
trackingFileIO.closed);
+  }
+
+  private static final class CloseTrackingFileIO implements FileIO {
+    private final FileIO delegate;
+    volatile boolean closed = false;
+
+    CloseTrackingFileIO(FileIO delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public InputFile newInputFile(String path) {
+      return delegate.newInputFile(path);
+    }
+
+    @Override
+    public OutputFile newOutputFile(String path) {
+      OutputFile underlying = delegate.newOutputFile(path);
+      return new CloseAwareOutputFile(underlying, this);
+    }
+
+    @Override
+    public void deleteFile(String path) {
+      delegate.deleteFile(path);
+    }
+
+    @Override
+    public Map<String, String> properties() {
+      return delegate.properties();
+    }
+
+    @Override
+    public void close() {
+      closed = true;
+      delegate.close();
+    }
+  }
+
+  private static final class CloseAwareOutputFile implements OutputFile {
+    private final OutputFile delegate;
+    private final CloseTrackingFileIO io;
+
+    CloseAwareOutputFile(OutputFile delegate, CloseTrackingFileIO io) {
+      this.delegate = delegate;
+      this.io = io;
+    }
+
+    @Override
+    public PositionOutputStream create() {
+      if (io.closed) {
+        throw new IllegalStateException("Connection pool shut down");
+      }
+      return delegate.create();
+    }
+
+    @Override
+    public PositionOutputStream createOrOverwrite() {
+      if (io.closed) {
+        throw new IllegalStateException("Connection pool shut down");
+      }
+      return delegate.createOrOverwrite();
+    }
+
+    @Override
+    public String location() {
+      return delegate.location();
+    }
+
+    @Override
+    public InputFile toInputFile() {
+      return delegate.toInputFile();
+    }
+  }
+
   @Test
   public void testGetOrCreateTable_refreshLogic() {
     Table mockTable = mock(Table.class);

Reply via email to