This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a3553326 [flink] Improve TableAlreadyExistException message for 
LakeCatalog.createTable() (#1638)
4a3553326 is described below

commit 4a35533266510d2c94fc89ca4cc3e95854778b1f
Author: Junbo Wang <[email protected]>
AuthorDate: Fri Sep 5 11:58:41 2025 +0800

    [flink] Improve TableAlreadyExistException message for 
LakeCatalog.createTable() (#1638)
---
 .../exception/LakeTableAlreadyExistException.java  | 38 +++++++++++++++++++++
 .../apache/fluss/flink/catalog/FlinkCatalog.java   |  2 ++
 .../fluss/flink/utils/CatalogExceptionUtils.java   |  5 +++
 .../fluss/flink/catalog/FlinkCatalogTest.java      | 39 +++++++++++++++++++++-
 .../fluss/flink/utils/CatalogTableTestUtils.java   |  3 ++
 .../java/org/apache/fluss/rpc/protocol/Errors.java |  5 ++-
 .../server/coordinator/CoordinatorService.java     |  3 +-
 .../server/coordinator/LakeTableManagerITCase.java |  4 +--
 8 files changed, 94 insertions(+), 5 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/exception/LakeTableAlreadyExistException.java
 
b/fluss-common/src/main/java/org/apache/fluss/exception/LakeTableAlreadyExistException.java
new file mode 100644
index 000000000..44fdb42f5
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/exception/LakeTableAlreadyExistException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+/**
+ * Exception for lakeCatalog trying to create a table that already exists.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public class LakeTableAlreadyExistException extends ApiException {
+    private static final long serialVersionUID = 1L;
+
+    public LakeTableAlreadyExistException(String message) {
+        this(message, null);
+    }
+
+    public LakeTableAlreadyExistException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 7dd651158..3430d422e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -380,6 +380,8 @@ public class FlinkCatalog extends AbstractCatalog {
                 throw new DatabaseNotExistException(getName(), 
objectPath.getDatabaseName());
             } else if (CatalogExceptionUtils.isTableAlreadyExist(t)) {
                 throw new TableAlreadyExistException(getName(), objectPath);
+            } else if (CatalogExceptionUtils.isLakeTableAlreadyExist(t)) {
+                throw new CatalogException(t.getMessage());
             } else if (isTableInvalid(t)) {
                 throw new InvalidTableException(t.getMessage());
             } else {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/CatalogExceptionUtils.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/CatalogExceptionUtils.java
index 2e4a89a13..958f5e6ad 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/CatalogExceptionUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/CatalogExceptionUtils.java
@@ -22,6 +22,7 @@ import org.apache.fluss.exception.DatabaseNotEmptyException;
 import org.apache.fluss.exception.DatabaseNotExistException;
 import org.apache.fluss.exception.InvalidPartitionException;
 import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.exception.LakeTableAlreadyExistException;
 import org.apache.fluss.exception.PartitionAlreadyExistsException;
 import org.apache.fluss.exception.PartitionNotExistException;
 import org.apache.fluss.exception.TableAlreadyExistException;
@@ -53,6 +54,10 @@ public class CatalogExceptionUtils {
         return throwable instanceof TableAlreadyExistException;
     }
 
+    public static boolean isLakeTableAlreadyExist(Throwable throwable) {
+        return throwable instanceof LakeTableAlreadyExistException;
+    }
+
     public static boolean isTableInvalid(Throwable throwable) {
         return throwable instanceof InvalidTableException;
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index 22fbdd8e3..ce9ee048b 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -67,12 +67,15 @@ import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
 import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
 import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE;
 import static org.apache.fluss.flink.utils.CatalogTableTestUtils.addOptions;
 import static 
org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsIgnoreSchema;
 import static 
org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsRespectSchema;
+import static org.apache.fluss.metadata.DataLakeFormat.PAIMON;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -81,13 +84,22 @@ class FlinkCatalogTest {
 
     @RegisterExtension
     public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
-            FlussClusterExtension.builder().setNumOfTabletServers(1).build();
+            FlussClusterExtension.builder()
+                    .setClusterConf(initConfig())
+                    .setNumOfTabletServers(1)
+                    .build();
 
     private static final String CATALOG_NAME = "test-catalog";
     private static final String DEFAULT_DB = "default";
     static Catalog catalog;
     private final ObjectPath tableInDefaultDb = new ObjectPath(DEFAULT_DB, 
"t1");
 
+    private static Configuration initConfig() {
+        Configuration configuration = new Configuration();
+        configuration.set(ConfigOptions.DATALAKE_FORMAT, PAIMON);
+        return configuration;
+    }
+
     private ResolvedSchema createSchema() {
         return new ResolvedSchema(
                 Arrays.asList(
@@ -255,6 +267,31 @@ class FlinkCatalogTest {
                 .hasMessageContaining("regularTable$lake does not exist");
     }
 
+    @Test
+    void testCreateAlreadyExistsLakeTable() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(TABLE_DATALAKE_ENABLED.key(), "true");
+        options.put(TABLE_DATALAKE_FORMAT.key(), PAIMON.name());
+        assertThatThrownBy(() -> catalog.getTable(tableInDefaultDb))
+                .isInstanceOf(TableNotExistException.class)
+                .hasMessage(
+                        String.format(
+                                "Table (or view) %s does not exist in Catalog 
%s.",
+                                tableInDefaultDb, CATALOG_NAME));
+        CatalogTable table = this.newCatalogTable(options);
+        catalog.createTable(this.tableInDefaultDb, table, false);
+        assertThat(catalog.tableExists(this.tableInDefaultDb)).isTrue();
+        // drop fluss table
+        catalog.dropTable(this.tableInDefaultDb, false);
+        // create the table again, should throw exception with ignore if exist 
= false
+        assertThatThrownBy(() -> catalog.createTable(this.tableInDefaultDb, 
table, false))
+                .isInstanceOf(CatalogException.class)
+                .hasMessage(
+                        String.format(
+                                "The table %s already exists in %s catalog, 
please first drop the table in %s catalog or use a new table name.",
+                                this.tableInDefaultDb, "paimon", "paimon"));
+    }
+
     @Test
     void testCreateTableWithBucket() throws Exception {
         // for pk table;
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java
index a5d458a21..b22a604da 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
+import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
 import static org.apache.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -80,6 +81,8 @@ public class CatalogTableTestUtils {
             Map<String, String> actualOptions, Map<String, String> 
expectedOptions) {
         actualOptions.remove(BOOTSTRAP_SERVERS.key());
         actualOptions.remove(TABLE_REPLICATION_FACTOR.key());
+        // Remove datalake format (auto-added when datalake is enabled in 
Fluss cluster)
+        actualOptions.remove(TABLE_DATALAKE_FORMAT.key());
         assertThat(actualOptions).isEqualTo(expectedOptions);
     }
 }
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
index 5edb3ac83..631047eac 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
@@ -43,6 +43,7 @@ import 
org.apache.fluss.exception.InvalidUpdateVersionException;
 import org.apache.fluss.exception.KvSnapshotNotExistException;
 import org.apache.fluss.exception.KvStorageException;
 import org.apache.fluss.exception.LakeStorageNotConfiguredException;
+import org.apache.fluss.exception.LakeTableAlreadyExistException;
 import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
 import org.apache.fluss.exception.LeaderNotAvailableException;
 import org.apache.fluss.exception.LogOffsetOutOfRangeException;
@@ -214,7 +215,9 @@ public enum Errors {
     INVALID_SERVER_RACK_INFO_EXCEPTION(
             52, "The server rack info is invalid.", 
InvalidServerRackInfoException::new),
     LAKE_SNAPSHOT_NOT_EXIST(
-            53, "The lake snapshot is not exist.", 
LakeTableSnapshotNotExistException::new);
+            53, "The lake snapshot is not exist.", 
LakeTableSnapshotNotExistException::new),
+    LAKE_TABLE_ALREADY_EXIST(
+            54, "The lake table already exists.", 
LakeTableAlreadyExistException::new);
 
     private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 7253ef962..f1b9a8e2f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -24,6 +24,7 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.InvalidCoordinatorException;
 import org.apache.fluss.exception.InvalidDatabaseException;
 import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.exception.LakeTableAlreadyExistException;
 import org.apache.fluss.exception.SecurityDisabledException;
 import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.exception.TableNotPartitionedException;
@@ -268,7 +269,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
             try {
                 checkNotNull(lakeCatalog).createTable(tablePath, 
tableDescriptor);
             } catch (TableAlreadyExistException e) {
-                throw new TableAlreadyExistException(
+                throw new LakeTableAlreadyExistException(
                         String.format(
                                 "The table %s already exists in %s catalog, 
please "
                                         + "first drop the table in %s catalog 
or use a new table name.",
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
index d728353e2..8040bbd93 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableManagerITCase.java
@@ -19,7 +19,7 @@ package org.apache.fluss.server.coordinator;
 
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.LakeTableAlreadyExistException;
 import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableDescriptor;
@@ -103,7 +103,7 @@ class LakeTableManagerITCase {
                                                         lakeTablePath, 
lakeTableDescriptor, false))
                                         .get())
                 .cause()
-                .isInstanceOf(TableAlreadyExistException.class)
+                .isInstanceOf(LakeTableAlreadyExistException.class)
                 .hasMessage(
                         "The table %s already exists in paimon catalog, please 
first drop the table in paimon catalog or use a new table name.",
                         lakeTablePath);

Reply via email to