ahmedabu98 commented on code in PR #35228: URL: https://github.com/apache/beam/pull/35228#discussion_r2139839244
########## sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java: ########## @@ -851,6 +857,113 @@ public void testStreamToPartitionedDynamicDestinations() throws IOException { writeToDynamicDestinations(null, true, true); } + @Test + public void testWriteToDynamicNamespaces() throws IOException { + // run this test only on catalogs that support namespace management + assumeTrue(catalog instanceof SupportsNamespaces); + + long salt = System.currentTimeMillis(); + String tableIdentifierTemplate = + String.format("namespace_{modulo_5}_%s.table_{bool_field}", salt); + Map<String, Object> writeConfig = new HashMap<>(managedIcebergConfig(tableIdentifierTemplate)); + // override with table template + writeConfig.put("table", tableIdentifierTemplate); + + Namespace namespace0 = Namespace.of("namespace_0_" + salt); + Namespace namespace1 = Namespace.of("namespace_1_" + salt); + Namespace namespace2 = Namespace.of("namespace_2_" + salt); + Namespace namespace3 = Namespace.of("namespace_3_" + salt); + Namespace namespace4 = Namespace.of("namespace_4_" + salt); + + TableIdentifier tableId0true = TableIdentifier.of(namespace0, "table_true"); + TableIdentifier tableId0false = TableIdentifier.of(namespace0, "table_false"); + TableIdentifier tableId1true = TableIdentifier.of(namespace1, "table_true"); + TableIdentifier tableId1false = TableIdentifier.of(namespace1, "table_false"); + TableIdentifier tableId2true = TableIdentifier.of(namespace2, "table_true"); + TableIdentifier tableId2false = TableIdentifier.of(namespace2, "table_false"); + TableIdentifier tableId3true = TableIdentifier.of(namespace3, "table_true"); + TableIdentifier tableId3false = TableIdentifier.of(namespace3, "table_false"); + TableIdentifier tableId4true = TableIdentifier.of(namespace4, "table_true"); + TableIdentifier tableId4false = TableIdentifier.of(namespace4, "table_false"); + + List<Namespace> namespaces = + Arrays.asList(namespace0, namespace1, namespace2, namespace3, namespace4); + SupportsNamespaces sN = (SupportsNamespaces) catalog; + // assert namespace don't exist beforehand + namespaces.forEach(n -> assertFalse(sN.namespaceExists(n))); + + pipeline + .apply(Create.of(inputRows)) + .setRowSchema(BEAM_SCHEMA) + .apply(Managed.write(ICEBERG).withConfig(writeConfig)); + pipeline.run().waitUntilFinish(); + + // assert namespace were created + namespaces.forEach(n -> assertTrue(sN.namespaceExists(n))); + + Table table0true = catalog.loadTable(tableId0true); + Table table0false = catalog.loadTable(tableId0false); + Table table1true = catalog.loadTable(tableId1true); + Table table1false = catalog.loadTable(tableId1false); + Table table2true = catalog.loadTable(tableId2true); + Table table2false = catalog.loadTable(tableId2false); + Table table3true = catalog.loadTable(tableId3true); + Table table3false = catalog.loadTable(tableId3false); + Table table4true = catalog.loadTable(tableId4true); + Table table4false = catalog.loadTable(tableId4false); + + for (Table t : + Arrays.asList( + table0true, + table0false, + table1true, + table1false, + table2true, + table2false, + table3true, + table3false, + table4true, + table4false)) { + assertTrue(t.schema().sameSchema(ICEBERG_SCHEMA)); + } + + // Read back and check records are correct + Map<KV<Long, Boolean>, List<Record>> results = + ImmutableMap.<KV<Long, Boolean>, List<Record>>builder() + .put(KV.of(0L, true), readRecords(table0true)) + .put(KV.of(0L, false), readRecords(table0false)) + .put(KV.of(1L, true), readRecords(table1true)) + .put(KV.of(1L, false), readRecords(table1false)) + .put(KV.of(2L, true), readRecords(table2true)) + .put(KV.of(2L, false), readRecords(table2false)) + .put(KV.of(3L, true), readRecords(table3true)) + .put(KV.of(3L, false), readRecords(table3false)) + .put(KV.of(4L, true), readRecords(table4true)) + .put(KV.of(4L, false), readRecords(table4false)) + .build(); + + for (Map.Entry<KV<Long, Boolean>, List<Record>> entry : results.entrySet()) { + long modulo = entry.getKey().getKey(); + boolean bool = entry.getKey().getValue(); + List<Record> records = entry.getValue(); + Stream<Record> expectedRecords = + inputRows.stream() + .filter( + rec -> + checkStateNotNull(rec.getInt64("modulo_5")) == modulo + && checkStateNotNull(rec.getBoolean("bool_field")) == bool) + .map(RECORD_FUNC::apply); + + assertThat(records, containsInAnyOrder(expectedRecords.toArray())); + } + + try { + namespaces.forEach(sN::dropNamespace); + } catch (Exception e) { + LOG.error("Test passed but threw an error when cleaning up namespaces.", e); Review Comment: It fails pretty consistently with the BigQueryMetastore catalog. Agreed we can prob do more to fully cleanup, I can add some tweaking to fall back on BQ client to delete datasets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org