This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a4d74f17ea [hive][clone] support excluding some tables when clone all 
tables in a catalog or database. (#5868)
a4d74f17ea is described below

commit a4d74f17ea314f03cd11049cb52fc1b9c00acc92
Author: shyjsarah <[email protected]>
AuthorDate: Fri Jul 11 09:52:06 2025 +0800

    [hive][clone] support excluding some tables when clone all tables in a 
catalog or database. (#5868)
---
 docs/content/migration/clone-to-paimon.md          | 16 ++++
 .../apache/paimon/flink/action/CloneAction.java    |  7 +-
 .../paimon/flink/action/CloneActionFactory.java    | 14 +++-
 .../org/apache/paimon/flink/clone/CloneUtils.java  | 12 ++-
 .../paimon/flink/procedure/CloneProcedure.java     | 20 ++++-
 .../apache/paimon/hive/clone/HiveCloneUtils.java   | 29 ++++++-
 .../org/apache/paimon/hive/HiveCloneUtilsTest.java | 88 ++++++++++++++++++++++
 .../paimon/hive/procedure/CloneActionITCase.java   | 82 ++++++++++++++++++++
 8 files changed, 257 insertions(+), 11 deletions(-)

diff --git a/docs/content/migration/clone-to-paimon.md 
b/docs/content/migration/clone-to-paimon.md
index 2795743578..1bf6a6371d 100644
--- a/docs/content/migration/clone-to-paimon.md
+++ b/docs/content/migration/clone-to-paimon.md
@@ -70,7 +70,23 @@ clone \
 --target_database test \
 --parallelism 10 \
 --target_catalog_conf warehouse=my_warehouse
+--excluded_tables <excluded_tables_spec>
 ```
+You can use excluded tables spec to specify the tables that don't need to be 
cloned. The format is 
`<database1>.<table1>,<database2>.<table2>,<database3>.<table3>`.
+
+## Clone Hive Catalog
+
+```bash
+<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
+clone \
+--catalog_conf metastore=hive \
+--catalog_conf uri=thrift://localhost:9088 \
+--parallelism 10 \
+--target_catalog_conf warehouse=my_warehouse
+--excluded_tables <excluded_tables_spec>
+```
+You can use excluded tables spec to specify the tables that don't need to be 
cloned. The format is 
`<database1>.<table1>,<database2>.<table2>,<database3>.<table3>`.
+
 
 ## Clone Hudi Tables
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
index 313db28336..03ecdff9b9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
 import java.util.Map;
 
 /** Clone source table to target table. */
@@ -52,6 +53,7 @@ public class CloneAction extends ActionBase {
 
     private final int parallelism;
     @Nullable private final String whereSql;
+    @Nullable private final List<String> excludedTables;
 
     public CloneAction(
             String sourceDatabase,
@@ -61,7 +63,8 @@ public class CloneAction extends ActionBase {
             String targetTableName,
             Map<String, String> targetCatalogConfig,
             @Nullable Integer parallelism,
-            @Nullable String whereSql) {
+            @Nullable String whereSql,
+            @Nullable List<String> excludedTables) {
         super(sourceCatalogConfig);
 
         Catalog sourceCatalog = catalog;
@@ -84,6 +87,7 @@ public class CloneAction extends ActionBase {
 
         this.parallelism = parallelism == null ? env.getParallelism() : 
parallelism;
         this.whereSql = whereSql;
+        this.excludedTables = excludedTables;
     }
 
     @Override
@@ -96,6 +100,7 @@ public class CloneAction extends ActionBase {
                         targetDatabase,
                         targetTableName,
                         catalog,
+                        excludedTables,
                         env);
 
         DataStream<Tuple2<Identifier, Identifier>> partitionedSource =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
index fd5cc8d58b..29d3331914 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
@@ -18,7 +18,11 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.utils.StringUtils;
+
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
@@ -32,6 +36,7 @@ public class CloneActionFactory implements ActionFactory {
     private static final String TARGET_CATALOG_CONF = "target_catalog_conf";
     private static final String PARALLELISM = "parallelism";
     private static final String WHERE = "where";
+    private static final String EXCLUDED_TABLES = "excluded_tables";
 
     @Override
     public String identifier() {
@@ -51,6 +56,12 @@ public class CloneActionFactory implements ActionFactory {
 
         String parallelism = params.get(PARALLELISM);
 
+        String excludedTablesStr = params.get(EXCLUDED_TABLES);
+        List<String> excludedTables =
+                StringUtils.isNullOrWhitespaceOnly(excludedTablesStr)
+                        ? null
+                        : Arrays.asList(StringUtils.split(excludedTablesStr, 
","));
+
         CloneAction cloneAction =
                 new CloneAction(
                         params.get(DATABASE),
@@ -60,7 +71,8 @@ public class CloneActionFactory implements ActionFactory {
                         params.get(TARGET_TABLE),
                         targetCatalogConfig,
                         parallelism == null ? null : 
Integer.parseInt(parallelism),
-                        params.get(WHERE));
+                        params.get(WHERE),
+                        excludedTables);
 
         return Optional.of(cloneAction);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
index 5a53c39fa2..06ff96af3d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
@@ -27,12 +27,15 @@ import org.apache.paimon.hive.clone.HiveCloneUtils;
 import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.utils.StringUtils;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
@@ -51,6 +54,7 @@ public class CloneUtils {
             String targetDatabase,
             String targetTableName,
             Catalog sourceCatalog,
+            @Nullable List<String> excludedTables,
             StreamExecutionEnvironment env)
             throws Exception {
         List<Tuple2<Identifier, Identifier>> result = new ArrayList<>();
@@ -66,7 +70,7 @@ public class CloneUtils {
                     StringUtils.isNullOrWhitespaceOnly(targetTableName),
                     "targetTableName must be blank when clone all tables in a 
catalog.");
 
-            for (Identifier identifier : 
HiveCloneUtils.listTables(hiveCatalog)) {
+            for (Identifier identifier : 
HiveCloneUtils.listTables(hiveCatalog, excludedTables)) {
                 result.add(new Tuple2<>(identifier, identifier));
             }
         } else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
@@ -77,7 +81,8 @@ public class CloneUtils {
                     StringUtils.isNullOrWhitespaceOnly(targetTableName),
                     "targetTableName must be blank when clone all tables in a 
catalog.");
 
-            for (Identifier identifier : 
HiveCloneUtils.listTables(hiveCatalog, sourceDatabase)) {
+            for (Identifier identifier :
+                    HiveCloneUtils.listTables(hiveCatalog, sourceDatabase, 
excludedTables)) {
                 result.add(
                         new Tuple2<>(
                                 identifier,
@@ -90,6 +95,9 @@ public class CloneUtils {
             checkArgument(
                     !StringUtils.isNullOrWhitespaceOnly(targetTableName),
                     "targetTableName must not be blank when clone a table.");
+            checkArgument(
+                    CollectionUtils.isEmpty(excludedTables),
+                    "excludedTables must be empty when clone a single table.");
             result.add(
                     new Tuple2<>(
                             Identifier.create(sourceDatabase, sourceTableName),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
index 94233eed12..55647848ba 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
@@ -19,13 +19,16 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.flink.action.CloneAction;
+import org.apache.paimon.utils.StringUtils;
 
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.procedure.ProcedureContext;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Clone tables procedure. */
@@ -54,7 +57,11 @@ public class CloneProcedure extends ProcedureBase {
                         type = @DataTypeHint("STRING"),
                         isOptional = true),
                 @ArgumentHint(name = "parallelism", type = 
@DataTypeHint("INT"), isOptional = true),
-                @ArgumentHint(name = "where", type = @DataTypeHint("STRING"), 
isOptional = true)
+                @ArgumentHint(name = "where", type = @DataTypeHint("STRING"), 
isOptional = true),
+                @ArgumentHint(
+                        name = "excluded_tables",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true)
             })
     public String[] call(
             ProcedureContext procedureContext,
@@ -65,7 +72,8 @@ public class CloneProcedure extends ProcedureBase {
             String targetTableName,
             String targetCatalogConfigStr,
             Integer parallelism,
-            String where)
+            String where,
+            String excludedTablesStr)
             throws Exception {
         Map<String, String> sourceCatalogConfig =
                 new HashMap<>(optionalConfigMap(sourceCatalogConfigStr));
@@ -73,6 +81,11 @@ public class CloneProcedure extends ProcedureBase {
         Map<String, String> targetCatalogConfig =
                 new HashMap<>(optionalConfigMap(targetCatalogConfigStr));
 
+        List<String> excludedTables =
+                StringUtils.isNullOrWhitespaceOnly(excludedTablesStr)
+                        ? null
+                        : Arrays.asList(StringUtils.split(excludedTablesStr, 
","));
+
         CloneAction action =
                 new CloneAction(
                         database,
@@ -82,7 +95,8 @@ public class CloneProcedure extends ProcedureBase {
                         targetTableName,
                         targetCatalogConfig,
                         parallelism,
-                        where);
+                        where,
+                        excludedTables);
         return execute(procedureContext, action, "Clone Job");
     }
 
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
index d3c59f0885..c646a1b7ca 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/clone/HiveCloneUtils.java
@@ -25,6 +25,7 @@ import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.RowType;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -39,8 +40,10 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Predicate;
 
 import static org.apache.paimon.hive.HiveTypeUtils.toPaimonType;
@@ -64,23 +67,41 @@ public class HiveCloneUtils {
         return paimonOptions;
     }
 
-    public static List<Identifier> listTables(HiveCatalog hiveCatalog) throws 
Exception {
+    public static List<Identifier> listTables(
+            HiveCatalog hiveCatalog, @Nullable List<String> excludedTables) 
throws Exception {
+        Set<String> excludedTableSet = new HashSet<>();
+        if (CollectionUtils.isNotEmpty(excludedTables)) {
+            excludedTableSet.addAll(excludedTables);
+        }
         IMetaStoreClient client = hiveCatalog.getHmsClient();
         List<Identifier> results = new ArrayList<>();
         for (String database : client.getAllDatabases()) {
             for (String table : client.getAllTables(database)) {
-                results.add(Identifier.create(database, table));
+                Identifier identifier = Identifier.create(database, table);
+                if (excludedTableSet.contains(identifier.getFullName())) {
+                    continue;
+                }
+                results.add(identifier);
             }
         }
         return results;
     }
 
-    public static List<Identifier> listTables(HiveCatalog hiveCatalog, String 
database)
+    public static List<Identifier> listTables(
+            HiveCatalog hiveCatalog, String database, @Nullable List<String> 
excludedTables)
             throws Exception {
+        Set<String> excludedTableSet = new HashSet<>();
+        if (CollectionUtils.isNotEmpty(excludedTables)) {
+            excludedTableSet.addAll(excludedTables);
+        }
         IMetaStoreClient client = hiveCatalog.getHmsClient();
         List<Identifier> results = new ArrayList<>();
         for (String table : client.getAllTables(database)) {
-            results.add(Identifier.create(database, table));
+            Identifier identifier = Identifier.create(database, table);
+            if (excludedTableSet.contains(identifier.getFullName())) {
+                continue;
+            }
+            results.add(identifier);
         }
         return results;
     }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCloneUtilsTest.java
 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCloneUtilsTest.java
new file mode 100644
index 0000000000..1100a065c4
--- /dev/null
+++ 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCloneUtilsTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.hive.clone.HiveCloneUtils;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/** Test for {@link HiveCloneUtils}. */
+public class HiveCloneUtilsTest {
+
+    @Test
+    public void listAllTablesForCatalog() throws Exception {
+        HiveCatalog mockHiveCatalog = mock(HiveCatalog.class);
+        IMetaStoreClient mockClient = mock(IMetaStoreClient.class);
+        when(mockHiveCatalog.getHmsClient()).thenReturn(mockClient);
+        when(mockClient.getAllDatabases()).thenReturn(Arrays.asList("db1", 
"db2"));
+        when(mockClient.getAllTables("db1")).thenReturn(Arrays.asList("tbl1", 
"tbl2", "tbl3"));
+        when(mockClient.getAllTables("db2")).thenReturn(Arrays.asList("tbl1", 
"tbl2", "tbl3"));
+
+        List<Identifier> sourceTables = 
HiveCloneUtils.listTables(mockHiveCatalog, null);
+        List<Identifier> expectedTables =
+                Arrays.asList(
+                        Identifier.create("db1", "tbl1"),
+                        Identifier.create("db1", "tbl2"),
+                        Identifier.create("db1", "tbl3"),
+                        Identifier.create("db2", "tbl1"),
+                        Identifier.create("db2", "tbl2"),
+                        Identifier.create("db2", "tbl3"));
+        
Assertions.assertThatList(sourceTables).containsExactlyInAnyOrderElementsOf(expectedTables);
+
+        sourceTables =
+                HiveCloneUtils.listTables(mockHiveCatalog, 
Arrays.asList("db1.tbl3", "db2.tbl1"));
+        expectedTables =
+                Arrays.asList(
+                        Identifier.create("db1", "tbl1"),
+                        Identifier.create("db1", "tbl2"),
+                        Identifier.create("db2", "tbl2"),
+                        Identifier.create("db2", "tbl3"));
+        
Assertions.assertThatList(sourceTables).containsExactlyInAnyOrderElementsOf(expectedTables);
+    }
+
+    @Test
+    public void listAllTablesForDatabase() throws Exception {
+        HiveCatalog mockHiveCatalog = mock(HiveCatalog.class);
+        IMetaStoreClient mockClient = mock(IMetaStoreClient.class);
+        when(mockHiveCatalog.getHmsClient()).thenReturn(mockClient);
+        when(mockClient.getAllTables("db1")).thenReturn(Arrays.asList("tbl1", 
"tbl2", "tbl3"));
+
+        List<Identifier> sourceTables = 
HiveCloneUtils.listTables(mockHiveCatalog, "db1", null);
+        List<Identifier> expectedTables =
+                Arrays.asList(
+                        Identifier.create("db1", "tbl1"),
+                        Identifier.create("db1", "tbl2"),
+                        Identifier.create("db1", "tbl3"));
+        
Assertions.assertThatList(sourceTables).containsExactlyInAnyOrderElementsOf(expectedTables);
+
+        sourceTables = HiveCloneUtils.listTables(mockHiveCatalog, "db1", 
Arrays.asList("db1.tbl1"));
+        expectedTables =
+                Arrays.asList(Identifier.create("db1", "tbl2"), 
Identifier.create("db1", "tbl3"));
+        
Assertions.assertThatList(sourceTables).containsExactlyInAnyOrderElementsOf(expectedTables);
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index daaf43f836..b15a6f84ae 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
@@ -577,6 +578,87 @@ public class CloneActionITCase extends ActionITCaseBase {
         Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
     }
 
+    @Test
+    public void testMigrateWholeCatalogWithExcludedTables() throws Exception {
+        String dbName1 = "hivedb" + StringUtils.randomNumericString(10);
+        String tableName1 = "hivetable1" + StringUtils.randomNumericString(10);
+        String tableName2 = "hivetable2" + StringUtils.randomNumericString(10);
+
+        String dbName2 = "hivedb" + StringUtils.randomNumericString(10);
+        String tableName3 = "hivetable1" + StringUtils.randomNumericString(10);
+        String tableName4 = "hivetable2" + StringUtils.randomNumericString(10);
+
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+        tEnv.useCatalog("HIVE");
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql("CREATE DATABASE " + dbName1);
+        sql(
+                tEnv,
+                "CREATE TABLE %s.%s (id STRING, id2 INT, id3 INT) STORED AS 
%s",
+                dbName1,
+                tableName1,
+                randomFormat());
+        sql(tEnv, "INSERT INTO TABLE %s.%s VALUES %s", dbName1, tableName1, 
data(100));
+        sql(
+                tEnv,
+                "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3 
INT) STORED AS %s",
+                dbName1,
+                tableName2,
+                randomFormat());
+        sql(tEnv, "INSERT INTO TABLE %s.%s VALUES %s", dbName1, tableName2, 
data(100));
+
+        tEnv.executeSql("CREATE DATABASE " + dbName2);
+        sql(
+                tEnv,
+                "CREATE TABLE %s.%s (id STRING, id2 INT, id3 INT) STORED AS 
%s",
+                dbName2,
+                tableName3,
+                randomFormat());
+        sql(tEnv, "INSERT INTO TABLE %s.%s VALUES %s", dbName2, tableName3, 
data(100));
+        sql(
+                tEnv,
+                "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3 
INT) STORED AS %s",
+                dbName2,
+                tableName4,
+                randomFormat());
+        sql(tEnv, "INSERT INTO TABLE %s.%s VALUES %s", dbName2, tableName4, 
data(100));
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
+        tEnv.useCatalog("PAIMON_GE");
+        List<String> db1Tables = ImmutableList.of(tableName2);
+        List<String> db2Tables = ImmutableList.of(tableName4);
+
+        sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = 
'%s')", warehouse);
+        tEnv.useCatalog("PAIMON");
+
+        createAction(
+                        CloneAction.class,
+                        "clone",
+                        "--catalog_conf",
+                        "metastore=hive",
+                        "--catalog_conf",
+                        "uri=thrift://localhost:" + PORT,
+                        "--target_catalog_conf",
+                        "warehouse=" + warehouse,
+                        "--excluded_tables",
+                        dbName1 + "." + tableName1 + "," + dbName2 + "." + 
tableName3)
+                .run();
+
+        List<String> actualDB1Tables =
+                sql(tEnv, "show tables from %s", dbName1).stream()
+                        .map(row -> row.getField(0).toString())
+                        .collect(Collectors.toList());
+        List<String> actualDB2Tables =
+                sql(tEnv, "show tables from %s", dbName2).stream()
+                        .map(row -> row.getField(0).toString())
+                        .collect(Collectors.toList());
+
+        
Assertions.assertThatList(actualDB1Tables).containsExactlyInAnyOrderElementsOf(db1Tables);
+        
Assertions.assertThatList(actualDB2Tables).containsExactlyInAnyOrderElementsOf(db2Tables);
+    }
+
     private String[] ddls(String format) {
         // has primary key
         String ddl0 =

Reply via email to