This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 83dfdf135 [connector/flink] Throw Exception If catalog default 
database is not existed (#1608)
83dfdf135 is described below

commit 83dfdf135eeb4c79821ed3c7ec7a886d9f7289f9
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Sep 10 13:51:40 2025 +0800

    [connector/flink] Throw Exception If catalog default database is not 
existed (#1608)
---
 .../org/apache/fluss/flink/catalog/FlinkCatalog.java     | 10 ++++++++--
 .../apache/fluss/flink/catalog/FlinkCatalogITCase.java   | 16 ++++++++++++++++
 .../org/apache/fluss/flink/catalog/FlinkCatalogTest.java | 15 +++++++--------
 3 files changed, 31 insertions(+), 10 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 3430d422e..82a58bd43 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -108,7 +108,7 @@ public class FlinkCatalog extends AbstractCatalog {
     protected final ClassLoader classLoader;
 
     protected final String catalogName;
-    protected final @Nullable String defaultDatabase;
+    protected final String defaultDatabase;
     protected final String bootstrapServers;
     private final Map<String, String> securityConfigs;
     protected Connection connection;
@@ -117,7 +117,7 @@ public class FlinkCatalog extends AbstractCatalog {
 
     public FlinkCatalog(
             String name,
-            @Nullable String defaultDatabase,
+            String defaultDatabase,
             String bootstrapServers,
             ClassLoader classLoader,
             Map<String, String> securityConfigs) {
@@ -142,6 +142,12 @@ public class FlinkCatalog extends AbstractCatalog {
 
         connection = 
ConnectionFactory.createConnection(Configuration.fromMap(flussConfigs));
         admin = connection.getAdmin();
+        if (!databaseExists(defaultDatabase)) {
+            throw new CatalogException(
+                    String.format(
+                            "The configured default-database '%s' does not 
exist in the Fluss cluster.",
+                            defaultDatabase));
+        }
     }
 
     @Override
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index df39185df..948ec3635 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
@@ -657,6 +658,21 @@ abstract class FlinkCatalogITCase {
         }
     }
 
+    @Test
+    void testCreateCatalogWithUnexistedDatabase() {
+        assertThatThrownBy(
+                        () ->
+                                tEnv.executeSql(
+                                        String.format(
+                                                "create catalog 
test_non_exist_database_catalog with ('type' = 'fluss', '%s' = '%s', 
'default-database' = 'non-exist')",
+                                                BOOTSTRAP_SERVERS.key(),
+                                                
FLUSS_CLUSTER_EXTENSION.getBootstrapServers())))
+                .rootCause()
+                .isExactlyInstanceOf(CatalogException.class)
+                .hasMessage(
+                        "The configured default-database 'non-exist' does not 
exist in the Fluss cluster.");
+    }
+
     private static void assertOptionsEqual(
             Map<String, String> actualOptions, Map<String, String> 
expectedOptions) {
         actualOptions.remove(ConfigOptions.BOOTSTRAP_SERVERS.key());
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index 46dfa8b23..dec2601a2 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -90,7 +90,7 @@ class FlinkCatalogTest {
                     .build();
 
     private static final String CATALOG_NAME = "test-catalog";
-    private static final String DEFAULT_DB = "default";
+    private static final String DEFAULT_DB = 
FlinkCatalogOptions.DEFAULT_DATABASE.defaultValue();
     static Catalog catalog;
     private final ObjectPath tableInDefaultDb = new ObjectPath(DEFAULT_DB, 
"t1");
 
@@ -440,8 +440,7 @@ class FlinkCatalogTest {
         CatalogTable expectedTable = addOptions(table, addedOptions);
         checkEqualsRespectSchema((CatalogTable) tableCreated, expectedTable);
         
assertThat(catalog.listTables("db1")).isEqualTo(Collections.singletonList("t1"));
-        assertThat(catalog.listDatabases())
-                .isEqualTo(Arrays.asList(DEFAULT_DB, "db1", "db2", "fluss"));
+        assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList("db1", 
"db2", DEFAULT_DB));
         // test drop db1;
         // should throw exception since db1 is not empty and we set cascade = 
false
         assertThatThrownBy(() -> catalog.dropDatabase("db1", false, false))
@@ -457,10 +456,10 @@ class FlinkCatalogTest {
         // should be ok since we set ignoreIfNotExists = true
         catalog.dropDatabase("db1", true, true);
         // test list db
-        
assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList(DEFAULT_DB, "db2", 
"fluss"));
+        assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList("db2", 
DEFAULT_DB));
         catalog.dropDatabase("db2", false, true);
         // should be empty
-        
assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList(DEFAULT_DB, 
"fluss"));
+        
assertThat(catalog.listDatabases()).isEqualTo(Collections.singletonList(DEFAULT_DB));
         // should throw exception since the db is not exist and we set 
ignoreIfNotExists = false
         assertThatThrownBy(() -> catalog.listTables("unknown"))
                 .isInstanceOf(DatabaseNotExistException.class)
@@ -498,7 +497,7 @@ class FlinkCatalogTest {
         catalog.createTable(path1, table, false);
         assertThatThrownBy(() -> catalog.listPartitions(path1))
                 .isInstanceOf(TableNotPartitionedException.class)
-                .hasMessage("Table default.t1 in catalog test-catalog is not 
partitioned.");
+                .hasMessage("Table fluss.t1 in catalog test-catalog is not 
partitioned.");
 
         // create partition table and list partitions.
         ObjectPath path2 = new ObjectPath(DEFAULT_DB, "partitioned_t1");
@@ -534,7 +533,7 @@ class FlinkCatalogTest {
         assertThatThrownBy(() -> catalog.listPartitions(path2, 
invalidTestSpec))
                 .isInstanceOf(CatalogException.class)
                 .hasMessage(
-                        "Failed to list partitions of table 
default.partitioned_t1 in test-catalog, by partitionSpec 
CatalogPartitionSpec{{second=}}");
+                        "Failed to list partitions of table 
fluss.partitioned_t1 in test-catalog, by partitionSpec 
CatalogPartitionSpec{{second=}}");
 
         // NEW: Test dropPartition functionality
         CatalogPartitionSpec firstPartSpec = catalogPartitionSpecs.get(0);
@@ -553,7 +552,7 @@ class FlinkCatalogTest {
                 .isInstanceOf(
                         
org.apache.flink.table.catalog.exceptions.PartitionNotExistException.class)
                 .hasMessage(
-                        "Partition CatalogPartitionSpec{{first=999}} of table 
default.partitioned_t1 in catalog test-catalog does not exist.");
+                        "Partition CatalogPartitionSpec{{first=999}} of table 
fluss.partitioned_t1 in catalog test-catalog does not exist.");
 
         // Should not throw with ignoreIfNotExists = true
         catalog.dropPartition(path2, nonExistentSpec, true);

Reply via email to