This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 4a3553326 [flink] Improve TableAlreadyExistException message for
LakeCatalog.createTable() (#1638)
4a3553326 is described below
commit 4a35533266510d2c94fc89ca4cc3e95854778b1f
Author: Junbo Wang <[email protected]>
AuthorDate: Fri Sep 5 11:58:41 2025 +0800
[flink] Improve TableAlreadyExistException message for
LakeCatalog.createTable() (#1638)
---
.../exception/LakeTableAlreadyExistException.java | 38 +++++++++++++++++++++
.../apache/fluss/flink/catalog/FlinkCatalog.java | 2 ++
.../fluss/flink/utils/CatalogExceptionUtils.java | 5 +++
.../fluss/flink/catalog/FlinkCatalogTest.java | 39 +++++++++++++++++++++-
.../fluss/flink/utils/CatalogTableTestUtils.java | 3 ++
.../java/org/apache/fluss/rpc/protocol/Errors.java | 5 ++-
.../server/coordinator/CoordinatorService.java | 3 +-
.../server/coordinator/LakeTableManagerITCase.java | 4 +--
8 files changed, 94 insertions(+), 5 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/exception/LakeTableAlreadyExistException.java
b/fluss-common/src/main/java/org/apache/fluss/exception/LakeTableAlreadyExistException.java
new file mode 100644
index 000000000..44fdb42f5
--- /dev/null
+++
b/fluss-common/src/main/java/org/apache/fluss/exception/LakeTableAlreadyExistException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+/**
+ * Exception for lakeCatalog trying to create a table that already exists.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public class LakeTableAlreadyExistException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public LakeTableAlreadyExistException(String message) {
+ this(message, null);
+ }
+
+ public LakeTableAlreadyExistException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 7dd651158..3430d422e 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -380,6 +380,8 @@ public class FlinkCatalog extends AbstractCatalog {
throw new DatabaseNotExistException(getName(),
objectPath.getDatabaseName());
} else if (CatalogExceptionUtils.isTableAlreadyExist(t)) {
throw new TableAlreadyExistException(getName(), objectPath);
+ } else if (CatalogExceptionUtils.isLakeTableAlreadyExist(t)) {
+ throw new CatalogException(t.getMessage());
} else if (isTableInvalid(t)) {
throw new InvalidTableException(t.getMessage());
} else {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/CatalogExceptionUtils.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/CatalogExceptionUtils.java
index 2e4a89a13..958f5e6ad 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/CatalogExceptionUtils.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/CatalogExceptionUtils.java
@@ -22,6 +22,7 @@ import org.apache.fluss.exception.DatabaseNotEmptyException;
import org.apache.fluss.exception.DatabaseNotExistException;
import org.apache.fluss.exception.InvalidPartitionException;
import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.exception.LakeTableAlreadyExistException;
import org.apache.fluss.exception.PartitionAlreadyExistsException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.TableAlreadyExistException;
@@ -53,6 +54,10 @@ public class CatalogExceptionUtils {
return throwable instanceof TableAlreadyExistException;
}
+ public static boolean isLakeTableAlreadyExist(Throwable throwable) {
+ return throwable instanceof LakeTableAlreadyExistException;
+ }
+
public static boolean isTableInvalid(Throwable throwable) {
return throwable instanceof InvalidTableException;
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index 22fbdd8e3..ce9ee048b 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -67,12 +67,15 @@ import java.util.Map;
import java.util.Optional;
import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE;
import static org.apache.fluss.flink.utils.CatalogTableTestUtils.addOptions;
import static
org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsIgnoreSchema;
import static
org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsRespectSchema;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -81,13 +84,22 @@ class FlinkCatalogTest {
@RegisterExtension
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
- FlussClusterExtension.builder().setNumOfTabletServers(1).build();
+ FlussClusterExtension.builder()
+ .setClusterConf(initConfig())
+ .setNumOfTabletServers(1)
+ .build();
private static final String CATALOG_NAME = "test-catalog";
private static final String DEFAULT_DB = "default";
static Catalog catalog;
private final ObjectPath tableInDefaultDb = new ObjectPath(DEFAULT_DB,
"t1");
+ private static Configuration initConfig() {
+ Configuration configuration = new Configuration();
+ configuration.set(ConfigOptions.DATALAKE_FORMAT, PAIMON);
+ return configuration;
+ }
+
private ResolvedSchema createSchema() {
return new ResolvedSchema(
Arrays.asList(
@@ -255,6 +267,31 @@ class FlinkCatalogTest {
.hasMessageContaining("regularTable$lake does not exist");
}
+ @Test
+ void testCreateAlreadyExistsLakeTable() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(TABLE_DATALAKE_ENABLED.key(), "true");
+ options.put(TABLE_DATALAKE_FORMAT.key(), PAIMON.name());
+ assertThatThrownBy(() -> catalog.getTable(tableInDefaultDb))
+ .isInstanceOf(TableNotExistException.class)
+ .hasMessage(
+ String.format(
+ "Table (or view) %s does not exist in Catalog
%s.",
+ tableInDefaultDb, CATALOG_NAME));
+ CatalogTable table = this.newCatalogTable(options);
+ catalog.createTable(this.tableInDefaultDb, table, false);
+ assertThat(catalog.tableExists(this.tableInDefaultDb)).isTrue();
+ // drop fluss table
+ catalog.dropTable(this.tableInDefaultDb, false);
+ // create the table again, should throw exception with ignore if exist
= false
+ assertThatThrownBy(() -> catalog.createTable(this.tableInDefaultDb,
table, false))
+ .isInstanceOf(CatalogException.class)
+ .hasMessage(
+ String.format(
+ "The table %s already exists in %s catalog,
please first drop the table in %s catalog or use a new table name.",
+ this.tableInDefaultDb, "paimon", "paimon"));
+ }
+
@Test
void testCreateTableWithBucket() throws Exception {
// for pk table;
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java
index a5d458a21..b22a604da 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
import static org.apache.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR;
import static org.assertj.core.api.Assertions.assertThat;
@@ -80,6 +81,8 @@ public class CatalogTableTestUtils {
Map<String, String> actualOptions, Map<String, String>
expectedOptions) {
actualOptions.remove(BOOTSTRAP_SERVERS.key());
actualOptions.remove(TABLE_REPLICATION_FACTOR.key());
+ // Remove datalake format (auto-added when datalake is enabled in
Fluss cluster)
+ actualOptions.remove(TABLE_DATALAKE_FORMAT.key());
assertThat(actualOptions).isEqualTo(expectedOptions);
}
}
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
index 5edb3ac83..631047eac 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
@@ -43,6 +43,7 @@ import
org.apache.fluss.exception.InvalidUpdateVersionException;
import org.apache.fluss.exception.KvSnapshotNotExistException;
import org.apache.fluss.exception.KvStorageException;
import org.apache.fluss.exception.LakeStorageNotConfiguredException;
+import org.apache.fluss.exception.LakeTableAlreadyExistException;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.exception.LeaderNotAvailableException;
import org.apache.fluss.exception.LogOffsetOutOfRangeException;
@@ -214,7 +215,9 @@ public enum Errors {
INVALID_SERVER_RACK_INFO_EXCEPTION(
52, "The server rack info is invalid.",
InvalidServerRackInfoException::new),
LAKE_SNAPSHOT_NOT_EXIST(
- 53, "The lake snapshot is not exist.",
LakeTableSnapshotNotExistException::new);
+ 53, "The lake snapshot is not exist.",
LakeTableSnapshotNotExistException::new),
+ LAKE_TABLE_ALREADY_EXIST(
+ 54, "The lake table already exists.",
LakeTableAlreadyExistException::new);
private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 7253ef962..f1b9a8e2f 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -24,6 +24,7 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.InvalidDatabaseException;
import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.exception.LakeTableAlreadyExistException;
import org.apache.fluss.exception.SecurityDisabledException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
@@ -268,7 +269,7 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
try {
checkNotNull(lakeCatalog).createTable(tablePath,
tableDescriptor);
} catch (TableAlreadyExistException e) {
- throw new TableAlreadyExistException(
+ throw new LakeTableAlreadyExistException(
String.format(
"The table %s already exists in %s catalog,
please "
+ "first drop the table in %s catalog
or use a new table name.",
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
index d728353e2..8040bbd93 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
@@ -19,7 +19,7 @@ package org.apache.fluss.server.coordinator;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.LakeTableAlreadyExistException;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableDescriptor;
@@ -103,7 +103,7 @@ class LakeTableManagerITCase {
lakeTablePath,
lakeTableDescriptor, false))
.get())
.cause()
- .isInstanceOf(TableAlreadyExistException.class)
+ .isInstanceOf(LakeTableAlreadyExistException.class)
.hasMessage(
"The table %s already exists in paimon catalog, please
first drop the table in paimon catalog or use a new table name.",
lakeTablePath);