stankiewicz commented on code in PR #38149:
URL: https://github.com/apache/beam/pull/38149#discussion_r3074218949


##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -1201,4 +1244,57 @@ public void testGetOrCreateTable_refreshLogic() {
     // Verify that refresh() WAS called exactly once because the entry was 
stale.
     verify(mockTable, times(1)).refresh();
   }
+
+  /**
+   * Simulates the full DoFn lifecycle: multiple bundles using a shared 
Closeable catalog, then
+   * catalog.close() (as @Teardown would do). Verifies that: 1. FileIO 
survives across bundles
+   * (RecordWriterManager.close() does not close it) 2. Closing the catalog at 
the end properly
+   * closes the FileIO
+   */
+  @Test
+  public void testFullLifecycleBundlesThenCatalogClose() throws IOException {
+    String tableName =
+        "table_lifecycle_" + UUID.randomUUID().toString().replace("-", 
"").substring(0, 6);
+    TableIdentifier tableId = TableIdentifier.of("default", tableName);
+
+    Table realTable = warehouse.createTable(tableId, ICEBERG_SCHEMA);
+
+    CloseTrackingFileIO sharedIO = new CloseTrackingFileIO(realTable.io());
+    Table spyTable = Mockito.spy(realTable);
+    Mockito.doReturn(sharedIO).when(spyTable).io();
+
+    // Create a catalog spy that also implements Closeable, simulating 
RESTCatalog/GlueCatalog
+    Catalog spyCatalog =
+        mock(Catalog.class, 
Mockito.withSettings().extraInterfaces(Closeable.class));
+    Mockito.doReturn(spyTable).when(spyCatalog).loadTable(tableId);
+
+    // Wire Closeable.close() to close the shared FileIO (simulating what real 
catalogs do)
+    Mockito.doAnswer(
+            invocation -> {
+              sharedIO.close();
+              return null;
+            })
+        .when((Closeable) spyCatalog)
+        .close();
+
+    WindowedValue<IcebergDestination> dest = getWindowedDestination(tableName, 
null);
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+
+    // Bundle 1
+    RecordWriterManager bundle1 = new RecordWriterManager(spyCatalog, 
"file_lc1", 1000, 3);
+    assertTrue(bundle1.write(dest, row));
+    bundle1.close();
+    assertFalse("FileIO must survive after bundle 1", sharedIO.closed);
+
+    // Bundle 2
+    RecordWriterManager bundle2 = new RecordWriterManager(spyCatalog, 
"file_lc2", 1000, 3);
+    assertTrue(bundle2.write(dest, row));
+    bundle2.close();
+    assertFalse("FileIO must survive after bundle 2", sharedIO.closed);
+
+    // Simulate @Teardown: close the catalog
+    ((Closeable) spyCatalog).close();
+    verify((Closeable) spyCatalog, times(1)).close();
+    assertTrue("FileIO should be closed after catalog.close()", 
sharedIO.closed);

Review Comment:
   try also
   ```
       // Bundle 1
       RecordWriterManager bundle1 = new RecordWriterManager(spyCatalog, 
"file_lc1", 1000, 3);
       assertTrue(bundle1.write(dest, row));
       
   // exception
       // Simulate @Teardown: close the catalog
       ((Closeable) spyCatalog).close();
       verify((Closeable) spyCatalog, times(1)).close();
   
       // Bundle 2 -  
       RecordWriterManager bundle2 = new RecordWriterManager(spyCatalog, 
"file_lc2", 1000, 3);
       assertTrue(bundle2.write(dest, row));
       bundle2.close();
       assertFalse("FileIO must survive after bundle 2", sharedIO.closed);
   ```



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteDirectRowsToFiles.java:
##########
@@ -137,5 +139,12 @@ public void finishBundle(FinishBundleContext context) 
throws Exception {
       }
       recordWriterManager = null;
     }
+
+    @Teardown
+    public void teardown() throws IOException {
+      if (catalog instanceof Closeable) {
+        ((Closeable) catalog).close();

Review Comment:
   as catalog is shared, close() is not thread safe. 



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java:
##########


Review Comment:
   this returns cachedCatalog, also, inside it's not synchronized, so we may 
end up with multiple catalogs created and one cached. 



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AppendFilesToTables.java:
##########
@@ -251,5 +252,12 @@ private boolean shouldSkip(Table table, 
Iterable<FileWriteResult> fileWriteResul
       }
       return false;
     }
+
+    @Teardown
+    public void teardown() throws IOException {
+      if (catalog instanceof Closeable) {
+        ((Closeable) catalog).close();

Review Comment:
   this catalog is set  via `catalog = catalogConfig.catalog();`
   catalogConfig is single object that is passed to multiple transforms and  is 
returning `cachedCatalog` if one exists. 
   
   what happens to `catalog` instance in other transform if `cachedCatalog` is 
closed?



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java:
##########
@@ -434,20 +431,6 @@ public void close() throws IOException {
         state.dataFiles.clear();
       }
     } finally {
-      // Close unique FileIO instances now that all writers are done.
-      // table.io() may return a shared FileIO; we deduplicate by identity
-      // so we close each underlying connection pool exactly once.
-      Set<FileIO> closedIOs = new HashSet<>();
-      for (DestinationState state : destinations.values()) {
-        FileIO io = state.table.io();
-        if (io != null && closedIOs.add(io)) {
-          try {
-            io.close();

Review Comment:
   are there any issues if io is not closed? Dataflow invokes teardown only 
when doFn needs to be recycled due to exception. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to