dejii commented on code in PR #38149:
URL: https://github.com/apache/beam/pull/38149#discussion_r3075097221


##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java:
##########
@@ -1201,4 +1244,57 @@ public void testGetOrCreateTable_refreshLogic() {
     // Verify that refresh() WAS called exactly once because the entry was 
stale.
     verify(mockTable, times(1)).refresh();
   }
+
+  /**
+   * Simulates the full DoFn lifecycle: multiple bundles using a shared 
Closeable catalog, then
+   * catalog.close() (as @Teardown would do). Verifies that: 1. FileIO 
survives across bundles
+   * (RecordWriterManager.close() does not close it) 2. Closing the catalog at 
the end properly
+   * closes the FileIO
+   */
+  @Test
+  public void testFullLifecycleBundlesThenCatalogClose() throws IOException {
+    String tableName =
+        "table_lifecycle_" + UUID.randomUUID().toString().replace("-", 
"").substring(0, 6);
+    TableIdentifier tableId = TableIdentifier.of("default", tableName);
+
+    Table realTable = warehouse.createTable(tableId, ICEBERG_SCHEMA);
+
+    CloseTrackingFileIO sharedIO = new CloseTrackingFileIO(realTable.io());
+    Table spyTable = Mockito.spy(realTable);
+    Mockito.doReturn(sharedIO).when(spyTable).io();
+
+    // Create a catalog spy that also implements Closeable, simulating 
RESTCatalog/GlueCatalog
+    Catalog spyCatalog =
+        mock(Catalog.class, 
Mockito.withSettings().extraInterfaces(Closeable.class));
+    Mockito.doReturn(spyTable).when(spyCatalog).loadTable(tableId);
+
+    // Wire Closeable.close() to close the shared FileIO (simulating what real 
catalogs do)
+    Mockito.doAnswer(
+            invocation -> {
+              sharedIO.close();
+              return null;
+            })
+        .when((Closeable) spyCatalog)
+        .close();
+
+    WindowedValue<IcebergDestination> dest = getWindowedDestination(tableName, 
null);
+    Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();
+
+    // Bundle 1
+    RecordWriterManager bundle1 = new RecordWriterManager(spyCatalog, 
"file_lc1", 1000, 3);
+    assertTrue(bundle1.write(dest, row));
+    bundle1.close();
+    assertFalse("FileIO must survive after bundle 1", sharedIO.closed);
+
+    // Bundle 2
+    RecordWriterManager bundle2 = new RecordWriterManager(spyCatalog, 
"file_lc2", 1000, 3);
+    assertTrue(bundle2.write(dest, row));
+    bundle2.close();
+    assertFalse("FileIO must survive after bundle 2", sharedIO.closed);
+
+    // Simulate @Teardown: close the catalog
+    ((Closeable) spyCatalog).close();
+    verify((Closeable) spyCatalog, times(1)).close();
+    assertTrue("FileIO should be closed after catalog.close()", 
sharedIO.closed);

Review Comment:
   I don't think this can happen in Beam's lifecycle. `@Teardown` only runs 
after all bundles on that DoFn instance have completed as shown in the diagram, 
and once called the `DoFn` is decommissioned. Each DoFn also has its own 
isolated cachedCatalog as [discussed 
earlier](https://github.com/apache/beam/pull/38149#discussion_r3075022050).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to