This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b203f53a128 Move FileIO close from RecordWriter to RecordWriterManager
(#37782)
b203f53a128 is described below
commit b203f53a1283598448ce46f5343877c366a6ad03
Author: Deji Ibrahim <[email protected]>
AuthorDate: Mon Mar 9 19:14:58 2026 +0100
Move FileIO close from RecordWriter to RecordWriterManager (#37782)
* Move FileIO close from RecordWriter to RecordWriterManager
* fix
* clarify FileIO ownership comments and verify close
---
.../apache/beam/sdk/io/iceberg/RecordWriter.java | 25 +----
.../beam/sdk/io/iceberg/RecordWriterManager.java | 64 ++++++++-----
.../sdk/io/iceberg/RecordWriterManagerTest.java | 102 ++++++++++++++++++++-
3 files changed, 145 insertions(+), 46 deletions(-)
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
index d233b0ac05b..82251c00e72 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriter.java
@@ -31,10 +31,8 @@ import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.DataWriter;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
-import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +45,6 @@ class RecordWriter {
private final Table table;
private final String absoluteFilename;
private final FileFormat fileFormat;
- private @Nullable FileIO io;
RecordWriter(
Catalog catalog, IcebergDestination destination, String filename,
PartitionKey partitionKey)
@@ -74,11 +71,9 @@ class RecordWriter {
}
OutputFile outputFile;
EncryptionKeyMetadata keyMetadata;
- // Keep FileIO open for the lifetime of this writer to avoid
- // premature shutdown of underlying client pools (e.g., S3),
- // which manifests as "Connection pool shut down" (Issue #36438).
- this.io = table.io();
- OutputFile tmpFile = io.newOutputFile(absoluteFilename);
+ // table.io() may return a shared FileIO instance.
+ // FileIO lifecycle is managed by RecordWriterManager.close().
+ OutputFile tmpFile = table.io().newOutputFile(absoluteFilename);
EncryptedOutputFile encryptedOutputFile =
table.encryption().encrypt(tmpFile);
outputFile = encryptedOutputFile.encryptingOutputFile();
keyMetadata = encryptedOutputFile.keyMetadata();
@@ -135,20 +130,6 @@ class RecordWriter {
fileFormat, table.name(), absoluteFilename),
e);
} finally {
- // Always attempt to close FileIO and decrement metrics
- if (io != null) {
- try {
- io.close();
- } catch (Exception ioCloseError) {
- if (closeError != null) {
- closeError.addSuppressed(ioCloseError);
- } else {
- closeError = new IOException("Failed to close FileIO",
ioCloseError);
- }
- } finally {
- io = null;
- }
- }
activeIcebergWriters.dec();
}
diff --git
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
index da62fb65884..eb79513df4f 100644
---
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
+++
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
@@ -29,8 +29,10 @@ import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.schemas.Schema;
@@ -58,6 +60,7 @@ import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.transforms.Transforms;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
@@ -403,33 +406,50 @@ class RecordWriterManager implements AutoCloseable {
*/
@Override
public void close() throws IOException {
- for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
- windowedDestinationAndState : destinations.entrySet()) {
- DestinationState state = windowedDestinationAndState.getValue();
+ try {
+ for (Map.Entry<WindowedValue<IcebergDestination>, DestinationState>
+ windowedDestinationAndState : destinations.entrySet()) {
+ DestinationState state = windowedDestinationAndState.getValue();
- // removing writers from the state's cache will trigger the logic to
collect each writer's
- // data file.
- state.writers.invalidateAll();
- // first check for any exceptions swallowed by the cache
- if (!state.exceptions.isEmpty()) {
- IllegalStateException exception =
- new IllegalStateException(
- String.format("Encountered %s failed writer(s).",
state.exceptions.size()));
- for (Exception e : state.exceptions) {
- exception.addSuppressed(e);
+ // removing writers from the state's cache will trigger the logic to
collect each writer's
+ // data file.
+ state.writers.invalidateAll();
+ // first check for any exceptions swallowed by the cache
+ if (!state.exceptions.isEmpty()) {
+ IllegalStateException exception =
+ new IllegalStateException(
+ String.format("Encountered %s failed writer(s).",
state.exceptions.size()));
+ for (Exception e : state.exceptions) {
+ exception.addSuppressed(e);
+ }
+ throw exception;
}
- throw exception;
- }
- if (state.dataFiles.isEmpty()) {
- continue;
- }
+ if (state.dataFiles.isEmpty()) {
+ continue;
+ }
- totalSerializableDataFiles.put(
- windowedDestinationAndState.getKey(), new
ArrayList<>(state.dataFiles));
- state.dataFiles.clear();
+ totalSerializableDataFiles.put(
+ windowedDestinationAndState.getKey(), new
ArrayList<>(state.dataFiles));
+ state.dataFiles.clear();
+ }
+ } finally {
+ // Close unique FileIO instances now that all writers are done.
+ // table.io() may return a shared FileIO; we deduplicate by identity
+ // so we close each underlying connection pool exactly once.
+ Set<FileIO> closedIOs = new HashSet<>();
+ for (DestinationState state : destinations.values()) {
+ FileIO io = state.table.io();
+ if (io != null && closedIOs.add(io)) {
+ try {
+ io.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close FileIO for table '{}'",
state.table.name(), e);
+ }
+ }
+ }
+ destinations.clear();
}
- destinations.clear();
checkArgument(
openWriters == 0,
"Expected all data writers to be closed, but found %s data writer(s)
still open",
diff --git
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
index 375d9073711..2672ac70c08 100644
---
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
+++
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
@@ -65,6 +65,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
@@ -957,7 +958,7 @@ public class RecordWriterManagerTest {
}
@Test
- public void testRecordWriterKeepsFileIOOpenUntilClose() throws IOException {
+ public void testRecordWriterDoesNotCloseSharedFileIO() throws IOException {
TableIdentifier tableId =
TableIdentifier.of(
"default",
@@ -980,7 +981,104 @@ public class RecordWriterManagerTest {
writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
writer.close();
- assertTrue("FileIO should be closed after writer close",
trackingFileIO.closed);
+ // RecordWriter must NOT close FileIO — it may be a shared instance.
+ assertFalse("RecordWriter.close() must not close the shared FileIO",
trackingFileIO.closed);
+ }
+
+ /**
+ * Verifies that when multiple writers share the same FileIO, closing any
writer does not close
+ * the shared FileIO — that is the responsibility of
RecordWriterManager.close().
+ */
+ @Test
+ public void testMultipleWritersSharingFileIOSurviveBatchClose() throws
IOException {
+ // Create two tables that share the same FileIO (simulating dynamic
destinations
+ // backed by the same catalog)
+ TableIdentifier tableId1 =
+ TableIdentifier.of(
+ "default",
+ "table_batch_close_a_" + UUID.randomUUID().toString().replace("-",
"").substring(0, 6));
+ TableIdentifier tableId2 =
+ TableIdentifier.of(
+ "default",
+ "table_batch_close_b_" + UUID.randomUUID().toString().replace("-",
"").substring(0, 6));
+ Table table1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA);
+ Table table2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA);
+
+ // Both tables share the same CloseTrackingFileIO — mirrors how some
catalogs
+ // return a shared FileIO instance across tables
+ CloseTrackingFileIO sharedFileIO = new CloseTrackingFileIO(table1.io());
+ Table spyTable1 = Mockito.spy(table1);
+ Table spyTable2 = Mockito.spy(table2);
+ Mockito.doReturn(sharedFileIO).when(spyTable1).io();
+ Mockito.doReturn(sharedFileIO).when(spyTable2).io();
+
+ PartitionKey pk1 = new PartitionKey(spyTable1.spec(), spyTable1.schema());
+ PartitionKey pk2 = new PartitionKey(spyTable2.spec(), spyTable2.schema());
+
+ RecordWriter writer1 = new RecordWriter(spyTable1, FileFormat.PARQUET,
"file1.parquet", pk1);
+ RecordWriter writer2 = new RecordWriter(spyTable2, FileFormat.PARQUET,
"file2.parquet", pk2);
+
+ Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+ Record record = IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row);
+
+ writer1.write(record);
+ writer2.write(record);
+
+ writer1.close();
+ assertFalse("FileIO must remain open between batch writer closes",
sharedFileIO.closed);
+
+ writer2.close();
+ assertFalse("FileIO must remain open after all writers close",
sharedFileIO.closed);
+
+ // Both writers produced valid data files
+ assertNotNull(writer1.getDataFile());
+ assertNotNull(writer2.getDataFile());
+ }
+
+ /**
+ * Verifies that RecordWriterManager.close() flushes data files from
multiple destinations and
+ * closes the shared FileIO.
+ */
+ @Test
+ public void testRecordWriterManagerClosesSharedFileIOAfterFlush() throws
IOException {
+ String tableName1 =
+ "table_mgr_io_a_" + UUID.randomUUID().toString().replace("-",
"").substring(0, 6);
+ String tableName2 =
+ "table_mgr_io_b_" + UUID.randomUUID().toString().replace("-",
"").substring(0, 6);
+ TableIdentifier tableId1 = TableIdentifier.of("default", tableName1);
+ TableIdentifier tableId2 = TableIdentifier.of("default", tableName2);
+
+ Table realTable1 = warehouse.createTable(tableId1, ICEBERG_SCHEMA);
+ Table realTable2 = warehouse.createTable(tableId2, ICEBERG_SCHEMA);
+
+ CloseTrackingFileIO sharedTrackingIO = new
CloseTrackingFileIO(realTable1.io());
+ Table spyTable1 = Mockito.spy(realTable1);
+ Table spyTable2 = Mockito.spy(realTable2);
+ Mockito.doReturn(sharedTrackingIO).when(spyTable1).io();
+ Mockito.doReturn(sharedTrackingIO).when(spyTable2).io();
+
+ Catalog spyCatalog = Mockito.spy(catalog);
+ Mockito.doReturn(spyTable1).when(spyCatalog).loadTable(tableId1);
+ Mockito.doReturn(spyTable2).when(spyCatalog).loadTable(tableId2);
+
+ WindowedValue<IcebergDestination> dest1 =
getWindowedDestination(tableName1, null);
+ WindowedValue<IcebergDestination> dest2 =
getWindowedDestination(tableName2, null);
+
+ RecordWriterManager writerManager =
+ new RecordWriterManager(spyCatalog, "test_file_name", 1000, 3);
+
+ Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+ assertTrue(writerManager.write(dest1, row));
+ assertTrue(writerManager.write(dest2, row));
+ assertEquals(2, writerManager.openWriters);
+
+ writerManager.close();
+
+ Map<WindowedValue<IcebergDestination>, List<SerializableDataFile>>
dataFiles =
+ writerManager.getSerializableDataFiles();
+ assertTrue("Should have data files for dest1",
dataFiles.containsKey(dest1));
+ assertTrue("Should have data files for dest2",
dataFiles.containsKey(dest2));
+ assertTrue("Shared FileIO should be closed", sharedTrackingIO.closed);
}
private static final class CloseTrackingFileIO implements FileIO {