This is an automated email from the ASF dual-hosted git repository. zjureel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 76554d186ad [FLINK-32747][table] Fix ddl operations for catalog which is loaded from CatalogStore (#23133) 76554d186ad is described below commit 76554d186ad198384ff783e3e3f040dd738b7571 Author: Shammon FY <zjur...@gmail.com> AuthorDate: Thu Aug 10 11:57:34 2023 +0800 [FLINK-32747][table] Fix ddl operations for catalog which is loaded from CatalogStore (#23133) --- .../apache/flink/table/catalog/CatalogManager.java | 57 ++++++++++++---------- .../flink/table/catalog/CatalogManagerTest.java | 36 +++++++++++++- 2 files changed, 67 insertions(+), 26 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index 6868e8c4671..ac010e2bdde 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -399,11 +399,13 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { // Get catalog from the CatalogStore. Optional<CatalogDescriptor> optionalDescriptor = catalogStoreHolder.catalogStore().getCatalog(catalogName); - if (optionalDescriptor.isPresent()) { - return Optional.of(initCatalog(catalogName, optionalDescriptor.get())); - } - - return Optional.empty(); + return optionalDescriptor.map( + descriptor -> { + Catalog catalog = initCatalog(catalogName, descriptor); + catalog.open(); + catalogs.put(catalogName, catalog); + return catalog; + }); } public Catalog getCatalogOrThrowException(String catalogName) { @@ -452,12 +454,14 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { checkArgument( !StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog name cannot be empty."); - Catalog potentialCurrentCatalog = catalogs.get(catalogName); - if (potentialCurrentCatalog == null) { - throw new CatalogException( - format("A catalog with name [%s] does not exist.", catalogName)); - } - + Catalog potentialCurrentCatalog = + getCatalog(catalogName) + .orElseThrow( + () -> + new CatalogException( + format( + "A catalog with name [%s] does not exist.", + catalogName))); if (!catalogName.equals(currentCatalogName)) { currentCatalogName = catalogName; currentDatabaseName = potentialCurrentCatalog.getDefaultDatabase(); @@ -502,7 +506,7 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { throw new CatalogException("Current catalog has not been set."); } - if (!catalogs.get(currentCatalogName).databaseExists(databaseName)) { + if (!getCatalogOrThrowException(currentCatalogName).databaseExists(databaseName)) { throw new CatalogException( format( "A database with name [%s] does not exist in the catalog: [%s].", @@ -537,7 +541,7 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { */ public String getBuiltInDatabaseName() { // The default database of the built-in catalog is also the built-in database. - return catalogs.get(getBuiltInCatalogName()).getDefaultDatabase(); + return getCatalogOrThrowException(getBuiltInCatalogName()).getDefaultDatabase(); } /** @@ -630,11 +634,13 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { */ public Optional<CatalogPartition> getPartition( ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec) { - Catalog catalog = catalogs.get(tableIdentifier.getCatalogName()); - if (catalog != null) { + Optional<Catalog> catalogOptional = getCatalog(tableIdentifier.getCatalogName()); + if (catalogOptional.isPresent()) { try { return Optional.of( - catalog.getPartition(tableIdentifier.toObjectPath(), partitionSpec)); + catalogOptional + .get() + .getPartition(tableIdentifier.toObjectPath(), partitionSpec)); } catch (PartitionNotExistException ignored) { } } @@ -643,9 +649,10 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { private Optional<ContextResolvedTable> getPermanentTable( ObjectIdentifier objectIdentifier, @Nullable Long timestamp) { - Catalog currentCatalog = catalogs.get(objectIdentifier.getCatalogName()); + Optional<Catalog> catalogOptional = getCatalog(objectIdentifier.getCatalogName()); ObjectPath objectPath = objectIdentifier.toObjectPath(); - if (currentCatalog != null) { + if (catalogOptional.isPresent()) { + Catalog currentCatalog = catalogOptional.get(); try { final CatalogBaseTable table; if (timestamp != null) { @@ -671,11 +678,11 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { } private Optional<CatalogBaseTable> getUnresolvedTable(ObjectIdentifier objectIdentifier) { - Catalog currentCatalog = catalogs.get(objectIdentifier.getCatalogName()); + Optional<Catalog> currentCatalog = getCatalog(objectIdentifier.getCatalogName()); ObjectPath objectPath = objectIdentifier.toObjectPath(); - if (currentCatalog != null) { + if (currentCatalog.isPresent()) { try { - final CatalogBaseTable table = currentCatalog.getTable(objectPath); + final CatalogBaseTable table = currentCatalog.get().getTable(objectPath); return Optional.of(table); } catch (TableNotExistException e) { // Ignore. @@ -715,7 +722,7 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { * @return names of all registered tables */ public Set<String> listTables(String catalogName, String databaseName) { - Catalog catalog = catalogs.get(catalogName); + Catalog catalog = getCatalogOrThrowException(catalogName); if (catalog == null) { throw new ValidationException(String.format("Catalog %s does not exist", catalogName)); } @@ -784,7 +791,7 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { * @return names of registered views */ public Set<String> listViews(String catalogName, String databaseName) { - Catalog catalog = catalogs.get(catalogName); + Catalog catalog = getCatalogOrThrowException(catalogName); if (catalog == null) { throw new ValidationException(String.format("Catalog %s does not exist", catalogName)); } @@ -834,7 +841,7 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { */ public Set<String> listSchemas(String catalogName) { return Stream.concat( - Optional.ofNullable(catalogs.get(catalogName)).map(Catalog::listDatabases) + getCatalog(catalogName).map(Catalog::listDatabases) .orElse(Collections.emptyList()).stream(), temporaryTables.keySet().stream() .filter(i -> i.getCatalogName().equals(catalogName)) @@ -1085,7 +1092,7 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { getTemporaryOperationListener(objectIdentifier) .ifPresent(l -> l.onDropTemporaryTable(objectIdentifier.toObjectPath())); - Catalog catalog = catalogs.get(objectIdentifier.getCatalogName()); + Catalog catalog = getCatalog(objectIdentifier.getCatalogName()).orElse(null); ResolvedCatalogBaseTable<?> resolvedTable = resolveCatalogBaseTable(catalogBaseTable); managedTableListener.notifyTableDrop( catalog, objectIdentifier, resolvedTable, true, ignoreIfNotExists); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java index a3e3b50f48e..b22a9957c40 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java @@ -34,7 +34,9 @@ import org.apache.flink.table.utils.ExpressionResolverMocks; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -336,7 +338,7 @@ class CatalogManagerTest { } @Test - void testCatalogStore() { + void testCatalogStore() throws Exception { CatalogStore catalogStore = new GenericInMemoryCatalogStore(); Configuration configuration = new Configuration(); @@ -350,6 +352,7 @@ class CatalogManagerTest { .hasMessageContaining("CatalogStore is not opened yet."); CatalogManager catalogManager = CatalogManagerMocks.createCatalogManager(catalogStore); + catalogStore.storeCatalog("exist_cat", CatalogDescriptor.of("exist_cat", configuration)); catalogManager.createCatalog("cat1", CatalogDescriptor.of("cat1", configuration)); catalogManager.createCatalog("cat2", CatalogDescriptor.of("cat2", configuration)); @@ -379,6 +382,37 @@ class CatalogManagerTest { .isInstanceOf(CatalogException.class) .hasMessageContaining("Catalog cat4 already exists in initialized catalogs."); + catalogManager.createDatabase( + "exist_cat", + "cat_db", + new CatalogDatabaseImpl(Collections.emptyMap(), "database for exist_cat"), + false); + catalogManager.createTable( + CatalogTable.of( + Schema.newBuilder().build(), + null, + Collections.emptyList(), + Collections.emptyMap()), + ObjectIdentifier.of("exist_cat", "cat_db", "test_table"), + false); + assertThat(catalogManager.listSchemas("exist_cat")) + .isEqualTo(new HashSet<>(Arrays.asList("default", "cat_db"))); + assertThat(catalogManager.listTables("exist_cat", "cat_db")) + .isEqualTo(Collections.singleton("test_table")); + catalogManager.setCurrentCatalog("exist_cat"); + assertThat(catalogManager.listSchemas()) + .isEqualTo( + new HashSet<>( + Arrays.asList( + "cat1", + "cat2", + "cat3", + "cat4", + "default_catalog", + "exist_cat"))); + catalogManager.setCurrentDatabase("cat_db"); + assertThat(catalogManager.listTables()).isEqualTo(Collections.singleton("test_table")); + catalogManager.unregisterCatalog("cat1", false); catalogManager.unregisterCatalog("cat2", false); catalogManager.unregisterCatalog("cat3", false);