This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 76554d186ad [FLINK-32747][table] Fix ddl operations for catalog which 
is loaded from CatalogStore (#23133)
76554d186ad is described below

commit 76554d186ad198384ff783e3e3f040dd738b7571
Author: Shammon FY <zjur...@gmail.com>
AuthorDate: Thu Aug 10 11:57:34 2023 +0800

    [FLINK-32747][table] Fix ddl operations for catalog which is loaded from 
CatalogStore (#23133)
---
 .../apache/flink/table/catalog/CatalogManager.java | 57 ++++++++++++----------
 .../flink/table/catalog/CatalogManagerTest.java    | 36 +++++++++++++-
 2 files changed, 67 insertions(+), 26 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index 6868e8c4671..ac010e2bdde 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -399,11 +399,13 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
         // Get catalog from the CatalogStore.
         Optional<CatalogDescriptor> optionalDescriptor =
                 catalogStoreHolder.catalogStore().getCatalog(catalogName);
-        if (optionalDescriptor.isPresent()) {
-            return Optional.of(initCatalog(catalogName, 
optionalDescriptor.get()));
-        }
-
-        return Optional.empty();
+        return optionalDescriptor.map(
+                descriptor -> {
+                    Catalog catalog = initCatalog(catalogName, descriptor);
+                    catalog.open();
+                    catalogs.put(catalogName, catalog);
+                    return catalog;
+                });
     }
 
     public Catalog getCatalogOrThrowException(String catalogName) {
@@ -452,12 +454,14 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(catalogName), "Catalog 
name cannot be empty.");
 
-        Catalog potentialCurrentCatalog = catalogs.get(catalogName);
-        if (potentialCurrentCatalog == null) {
-            throw new CatalogException(
-                    format("A catalog with name [%s] does not exist.", 
catalogName));
-        }
-
+        Catalog potentialCurrentCatalog =
+                getCatalog(catalogName)
+                        .orElseThrow(
+                                () ->
+                                        new CatalogException(
+                                                format(
+                                                        "A catalog with name 
[%s] does not exist.",
+                                                        catalogName)));
         if (!catalogName.equals(currentCatalogName)) {
             currentCatalogName = catalogName;
             currentDatabaseName = potentialCurrentCatalog.getDefaultDatabase();
@@ -502,7 +506,7 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
             throw new CatalogException("Current catalog has not been set.");
         }
 
-        if (!catalogs.get(currentCatalogName).databaseExists(databaseName)) {
+        if 
(!getCatalogOrThrowException(currentCatalogName).databaseExists(databaseName)) {
             throw new CatalogException(
                     format(
                             "A database with name [%s] does not exist in the 
catalog: [%s].",
@@ -537,7 +541,7 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
      */
     public String getBuiltInDatabaseName() {
         // The default database of the built-in catalog is also the built-in 
database.
-        return catalogs.get(getBuiltInCatalogName()).getDefaultDatabase();
+        return 
getCatalogOrThrowException(getBuiltInCatalogName()).getDefaultDatabase();
     }
 
     /**
@@ -630,11 +634,13 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
      */
     public Optional<CatalogPartition> getPartition(
             ObjectIdentifier tableIdentifier, CatalogPartitionSpec 
partitionSpec) {
-        Catalog catalog = catalogs.get(tableIdentifier.getCatalogName());
-        if (catalog != null) {
+        Optional<Catalog> catalogOptional = 
getCatalog(tableIdentifier.getCatalogName());
+        if (catalogOptional.isPresent()) {
             try {
                 return Optional.of(
-                        catalog.getPartition(tableIdentifier.toObjectPath(), 
partitionSpec));
+                        catalogOptional
+                                .get()
+                                .getPartition(tableIdentifier.toObjectPath(), 
partitionSpec));
             } catch (PartitionNotExistException ignored) {
             }
         }
@@ -643,9 +649,10 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
 
     private Optional<ContextResolvedTable> getPermanentTable(
             ObjectIdentifier objectIdentifier, @Nullable Long timestamp) {
-        Catalog currentCatalog = 
catalogs.get(objectIdentifier.getCatalogName());
+        Optional<Catalog> catalogOptional = 
getCatalog(objectIdentifier.getCatalogName());
         ObjectPath objectPath = objectIdentifier.toObjectPath();
-        if (currentCatalog != null) {
+        if (catalogOptional.isPresent()) {
+            Catalog currentCatalog = catalogOptional.get();
             try {
                 final CatalogBaseTable table;
                 if (timestamp != null) {
@@ -671,11 +678,11 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
     }
 
     private Optional<CatalogBaseTable> getUnresolvedTable(ObjectIdentifier 
objectIdentifier) {
-        Catalog currentCatalog = 
catalogs.get(objectIdentifier.getCatalogName());
+        Optional<Catalog> currentCatalog = 
getCatalog(objectIdentifier.getCatalogName());
         ObjectPath objectPath = objectIdentifier.toObjectPath();
-        if (currentCatalog != null) {
+        if (currentCatalog.isPresent()) {
             try {
-                final CatalogBaseTable table = 
currentCatalog.getTable(objectPath);
+                final CatalogBaseTable table = 
currentCatalog.get().getTable(objectPath);
                 return Optional.of(table);
             } catch (TableNotExistException e) {
                 // Ignore.
@@ -715,7 +722,7 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
      * @return names of all registered tables
      */
     public Set<String> listTables(String catalogName, String databaseName) {
-        Catalog catalog = catalogs.get(catalogName);
+        Catalog catalog = getCatalogOrThrowException(catalogName);
         if (catalog == null) {
             throw new ValidationException(String.format("Catalog %s does not 
exist", catalogName));
         }
@@ -784,7 +791,7 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
      * @return names of registered views
      */
     public Set<String> listViews(String catalogName, String databaseName) {
-        Catalog catalog = catalogs.get(catalogName);
+        Catalog catalog = getCatalogOrThrowException(catalogName);
         if (catalog == null) {
             throw new ValidationException(String.format("Catalog %s does not 
exist", catalogName));
         }
@@ -834,7 +841,7 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
      */
     public Set<String> listSchemas(String catalogName) {
         return Stream.concat(
-                        
Optional.ofNullable(catalogs.get(catalogName)).map(Catalog::listDatabases)
+                        getCatalog(catalogName).map(Catalog::listDatabases)
                                 .orElse(Collections.emptyList()).stream(),
                         temporaryTables.keySet().stream()
                                 .filter(i -> 
i.getCatalogName().equals(catalogName))
@@ -1085,7 +1092,7 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
             getTemporaryOperationListener(objectIdentifier)
                     .ifPresent(l -> 
l.onDropTemporaryTable(objectIdentifier.toObjectPath()));
 
-            Catalog catalog = catalogs.get(objectIdentifier.getCatalogName());
+            Catalog catalog = 
getCatalog(objectIdentifier.getCatalogName()).orElse(null);
             ResolvedCatalogBaseTable<?> resolvedTable = 
resolveCatalogBaseTable(catalogBaseTable);
             managedTableListener.notifyTableDrop(
                     catalog, objectIdentifier, resolvedTable, true, 
ignoreIfNotExists);
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
index a3e3b50f48e..b22a9957c40 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogManagerTest.java
@@ -34,7 +34,9 @@ import org.apache.flink.table.utils.ExpressionResolverMocks;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -336,7 +338,7 @@ class CatalogManagerTest {
     }
 
     @Test
-    void testCatalogStore() {
+    void testCatalogStore() throws Exception {
         CatalogStore catalogStore = new GenericInMemoryCatalogStore();
 
         Configuration configuration = new Configuration();
@@ -350,6 +352,7 @@ class CatalogManagerTest {
                 .hasMessageContaining("CatalogStore is not opened yet.");
 
         CatalogManager catalogManager = 
CatalogManagerMocks.createCatalogManager(catalogStore);
+        catalogStore.storeCatalog("exist_cat", 
CatalogDescriptor.of("exist_cat", configuration));
 
         catalogManager.createCatalog("cat1", CatalogDescriptor.of("cat1", 
configuration));
         catalogManager.createCatalog("cat2", CatalogDescriptor.of("cat2", 
configuration));
@@ -379,6 +382,37 @@ class CatalogManagerTest {
                 .isInstanceOf(CatalogException.class)
                 .hasMessageContaining("Catalog cat4 already exists in 
initialized catalogs.");
 
+        catalogManager.createDatabase(
+                "exist_cat",
+                "cat_db",
+                new CatalogDatabaseImpl(Collections.emptyMap(), "database for 
exist_cat"),
+                false);
+        catalogManager.createTable(
+                CatalogTable.of(
+                        Schema.newBuilder().build(),
+                        null,
+                        Collections.emptyList(),
+                        Collections.emptyMap()),
+                ObjectIdentifier.of("exist_cat", "cat_db", "test_table"),
+                false);
+        assertThat(catalogManager.listSchemas("exist_cat"))
+                .isEqualTo(new HashSet<>(Arrays.asList("default", "cat_db")));
+        assertThat(catalogManager.listTables("exist_cat", "cat_db"))
+                .isEqualTo(Collections.singleton("test_table"));
+        catalogManager.setCurrentCatalog("exist_cat");
+        assertThat(catalogManager.listSchemas())
+                .isEqualTo(
+                        new HashSet<>(
+                                Arrays.asList(
+                                        "cat1",
+                                        "cat2",
+                                        "cat3",
+                                        "cat4",
+                                        "default_catalog",
+                                        "exist_cat")));
+        catalogManager.setCurrentDatabase("cat_db");
+        
assertThat(catalogManager.listTables()).isEqualTo(Collections.singleton("test_table"));
+
         catalogManager.unregisterCatalog("cat1", false);
         catalogManager.unregisterCatalog("cat2", false);
         catalogManager.unregisterCatalog("cat3", false);

Reply via email to