This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 83dfdf135 [connector/flink] Throw Exception If catalog default
database is not existed (#1608)
83dfdf135 is described below
commit 83dfdf135eeb4c79821ed3c7ec7a886d9f7289f9
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Sep 10 13:51:40 2025 +0800
[connector/flink] Throw Exception If catalog default database is not
existed (#1608)
---
.../org/apache/fluss/flink/catalog/FlinkCatalog.java | 10 ++++++++--
.../apache/fluss/flink/catalog/FlinkCatalogITCase.java | 16 ++++++++++++++++
.../org/apache/fluss/flink/catalog/FlinkCatalogTest.java | 15 +++++++--------
3 files changed, 31 insertions(+), 10 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 3430d422e..82a58bd43 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -108,7 +108,7 @@ public class FlinkCatalog extends AbstractCatalog {
protected final ClassLoader classLoader;
protected final String catalogName;
- protected final @Nullable String defaultDatabase;
+ protected final String defaultDatabase;
protected final String bootstrapServers;
private final Map<String, String> securityConfigs;
protected Connection connection;
@@ -117,7 +117,7 @@ public class FlinkCatalog extends AbstractCatalog {
public FlinkCatalog(
String name,
- @Nullable String defaultDatabase,
+ String defaultDatabase,
String bootstrapServers,
ClassLoader classLoader,
Map<String, String> securityConfigs) {
@@ -142,6 +142,12 @@ public class FlinkCatalog extends AbstractCatalog {
connection =
ConnectionFactory.createConnection(Configuration.fromMap(flussConfigs));
admin = connection.getAdmin();
+ if (!databaseExists(defaultDatabase)) {
+ throw new CatalogException(
+ String.format(
+ "The configured default-database '%s' does not
exist in the Fluss cluster.",
+ defaultDatabase));
+ }
}
@Override
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index df39185df..948ec3635 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
@@ -657,6 +658,21 @@ abstract class FlinkCatalogITCase {
}
}
+ @Test
+ void testCreateCatalogWithUnexistedDatabase() {
+ assertThatThrownBy(
+ () ->
+ tEnv.executeSql(
+ String.format(
+ "create catalog
test_non_exist_database_catalog with ('type' = 'fluss', '%s' = '%s',
'default-database' = 'non-exist')",
+ BOOTSTRAP_SERVERS.key(),
+
FLUSS_CLUSTER_EXTENSION.getBootstrapServers())))
+ .rootCause()
+ .isExactlyInstanceOf(CatalogException.class)
+ .hasMessage(
+ "The configured default-database 'non-exist' does not
exist in the Fluss cluster.");
+ }
+
private static void assertOptionsEqual(
Map<String, String> actualOptions, Map<String, String>
expectedOptions) {
actualOptions.remove(ConfigOptions.BOOTSTRAP_SERVERS.key());
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index 46dfa8b23..dec2601a2 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -90,7 +90,7 @@ class FlinkCatalogTest {
.build();
private static final String CATALOG_NAME = "test-catalog";
- private static final String DEFAULT_DB = "default";
+ private static final String DEFAULT_DB =
FlinkCatalogOptions.DEFAULT_DATABASE.defaultValue();
static Catalog catalog;
private final ObjectPath tableInDefaultDb = new ObjectPath(DEFAULT_DB,
"t1");
@@ -440,8 +440,7 @@ class FlinkCatalogTest {
CatalogTable expectedTable = addOptions(table, addedOptions);
checkEqualsRespectSchema((CatalogTable) tableCreated, expectedTable);
assertThat(catalog.listTables("db1")).isEqualTo(Collections.singletonList("t1"));
- assertThat(catalog.listDatabases())
- .isEqualTo(Arrays.asList(DEFAULT_DB, "db1", "db2", "fluss"));
+ assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList("db1",
"db2", DEFAULT_DB));
// test drop db1;
// should throw exception since db1 is not empty and we set cascade =
false
assertThatThrownBy(() -> catalog.dropDatabase("db1", false, false))
@@ -457,10 +456,10 @@ class FlinkCatalogTest {
// should be ok since we set ignoreIfNotExists = true
catalog.dropDatabase("db1", true, true);
// test list db
-
assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList(DEFAULT_DB, "db2",
"fluss"));
+ assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList("db2",
DEFAULT_DB));
catalog.dropDatabase("db2", false, true);
// should be empty
-
assertThat(catalog.listDatabases()).isEqualTo(Arrays.asList(DEFAULT_DB,
"fluss"));
+
assertThat(catalog.listDatabases()).isEqualTo(Collections.singletonList(DEFAULT_DB));
// should throw exception since the db is not exist and we set
ignoreIfNotExists = false
assertThatThrownBy(() -> catalog.listTables("unknown"))
.isInstanceOf(DatabaseNotExistException.class)
@@ -498,7 +497,7 @@ class FlinkCatalogTest {
catalog.createTable(path1, table, false);
assertThatThrownBy(() -> catalog.listPartitions(path1))
.isInstanceOf(TableNotPartitionedException.class)
- .hasMessage("Table default.t1 in catalog test-catalog is not
partitioned.");
+ .hasMessage("Table fluss.t1 in catalog test-catalog is not
partitioned.");
// create partition table and list partitions.
ObjectPath path2 = new ObjectPath(DEFAULT_DB, "partitioned_t1");
@@ -534,7 +533,7 @@ class FlinkCatalogTest {
assertThatThrownBy(() -> catalog.listPartitions(path2,
invalidTestSpec))
.isInstanceOf(CatalogException.class)
.hasMessage(
- "Failed to list partitions of table
default.partitioned_t1 in test-catalog, by partitionSpec
CatalogPartitionSpec{{second=}}");
+ "Failed to list partitions of table
fluss.partitioned_t1 in test-catalog, by partitionSpec
CatalogPartitionSpec{{second=}}");
// NEW: Test dropPartition functionality
CatalogPartitionSpec firstPartSpec = catalogPartitionSpecs.get(0);
@@ -553,7 +552,7 @@ class FlinkCatalogTest {
.isInstanceOf(
org.apache.flink.table.catalog.exceptions.PartitionNotExistException.class)
.hasMessage(
- "Partition CatalogPartitionSpec{{first=999}} of table
default.partitioned_t1 in catalog test-catalog does not exist.");
+ "Partition CatalogPartitionSpec{{first=999}} of table
fluss.partitioned_t1 in catalog test-catalog does not exist.");
// Should not throw with ignoreIfNotExists = true
catalog.dropPartition(path2, nonExistentSpec, true);