ahmedabu98 commented on code in PR #35228:
URL: https://github.com/apache/beam/pull/35228#discussion_r2139839244


##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java:
##########
@@ -851,6 +857,113 @@ public void testStreamToPartitionedDynamicDestinations() 
throws IOException {
     writeToDynamicDestinations(null, true, true);
   }
 
+  @Test
+  public void testWriteToDynamicNamespaces() throws IOException {
+    // run this test only on catalogs that support namespace management
+    assumeTrue(catalog instanceof SupportsNamespaces);
+
+    long salt = System.currentTimeMillis();
+    String tableIdentifierTemplate =
+        String.format("namespace_{modulo_5}_%s.table_{bool_field}", salt);
+    Map<String, Object> writeConfig = new 
HashMap<>(managedIcebergConfig(tableIdentifierTemplate));
+    // override with table template
+    writeConfig.put("table", tableIdentifierTemplate);
+
+    Namespace namespace0 = Namespace.of("namespace_0_" + salt);
+    Namespace namespace1 = Namespace.of("namespace_1_" + salt);
+    Namespace namespace2 = Namespace.of("namespace_2_" + salt);
+    Namespace namespace3 = Namespace.of("namespace_3_" + salt);
+    Namespace namespace4 = Namespace.of("namespace_4_" + salt);
+
+    TableIdentifier tableId0true = TableIdentifier.of(namespace0, 
"table_true");
+    TableIdentifier tableId0false = TableIdentifier.of(namespace0, 
"table_false");
+    TableIdentifier tableId1true = TableIdentifier.of(namespace1, 
"table_true");
+    TableIdentifier tableId1false = TableIdentifier.of(namespace1, 
"table_false");
+    TableIdentifier tableId2true = TableIdentifier.of(namespace2, 
"table_true");
+    TableIdentifier tableId2false = TableIdentifier.of(namespace2, 
"table_false");
+    TableIdentifier tableId3true = TableIdentifier.of(namespace3, 
"table_true");
+    TableIdentifier tableId3false = TableIdentifier.of(namespace3, 
"table_false");
+    TableIdentifier tableId4true = TableIdentifier.of(namespace4, 
"table_true");
+    TableIdentifier tableId4false = TableIdentifier.of(namespace4, 
"table_false");
+
+    List<Namespace> namespaces =
+        Arrays.asList(namespace0, namespace1, namespace2, namespace3, 
namespace4);
+    SupportsNamespaces sN = (SupportsNamespaces) catalog;
+    // assert namespace don't exist beforehand
+    namespaces.forEach(n -> assertFalse(sN.namespaceExists(n)));
+
+    pipeline
+        .apply(Create.of(inputRows))
+        .setRowSchema(BEAM_SCHEMA)
+        .apply(Managed.write(ICEBERG).withConfig(writeConfig));
+    pipeline.run().waitUntilFinish();
+
+    // assert namespace were created
+    namespaces.forEach(n -> assertTrue(sN.namespaceExists(n)));
+
+    Table table0true = catalog.loadTable(tableId0true);
+    Table table0false = catalog.loadTable(tableId0false);
+    Table table1true = catalog.loadTable(tableId1true);
+    Table table1false = catalog.loadTable(tableId1false);
+    Table table2true = catalog.loadTable(tableId2true);
+    Table table2false = catalog.loadTable(tableId2false);
+    Table table3true = catalog.loadTable(tableId3true);
+    Table table3false = catalog.loadTable(tableId3false);
+    Table table4true = catalog.loadTable(tableId4true);
+    Table table4false = catalog.loadTable(tableId4false);
+
+    for (Table t :
+        Arrays.asList(
+            table0true,
+            table0false,
+            table1true,
+            table1false,
+            table2true,
+            table2false,
+            table3true,
+            table3false,
+            table4true,
+            table4false)) {
+      assertTrue(t.schema().sameSchema(ICEBERG_SCHEMA));
+    }
+
+    // Read back and check records are correct
+    Map<KV<Long, Boolean>, List<Record>> results =
+        ImmutableMap.<KV<Long, Boolean>, List<Record>>builder()
+            .put(KV.of(0L, true), readRecords(table0true))
+            .put(KV.of(0L, false), readRecords(table0false))
+            .put(KV.of(1L, true), readRecords(table1true))
+            .put(KV.of(1L, false), readRecords(table1false))
+            .put(KV.of(2L, true), readRecords(table2true))
+            .put(KV.of(2L, false), readRecords(table2false))
+            .put(KV.of(3L, true), readRecords(table3true))
+            .put(KV.of(3L, false), readRecords(table3false))
+            .put(KV.of(4L, true), readRecords(table4true))
+            .put(KV.of(4L, false), readRecords(table4false))
+            .build();
+
+    for (Map.Entry<KV<Long, Boolean>, List<Record>> entry : 
results.entrySet()) {
+      long modulo = entry.getKey().getKey();
+      boolean bool = entry.getKey().getValue();
+      List<Record> records = entry.getValue();
+      Stream<Record> expectedRecords =
+          inputRows.stream()
+              .filter(
+                  rec ->
+                      checkStateNotNull(rec.getInt64("modulo_5")) == modulo
+                          && checkStateNotNull(rec.getBoolean("bool_field")) 
== bool)
+              .map(RECORD_FUNC::apply);
+
+      assertThat(records, containsInAnyOrder(expectedRecords.toArray()));
+    }
+
+    try {
+      namespaces.forEach(sN::dropNamespace);
+    } catch (Exception e) {
+      LOG.error("Test passed but threw an error when cleaning up namespaces.", 
e);

Review Comment:
   It fails pretty consistently with the BigQueryMetastore catalog. Agreed we 
can prob do more to fully cleanup, I can add some tweaking to fall back on BQ 
client to delete datasets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to