This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a56d29c1c8 [clone] support skipping cloning if the target table 
already exists. (#7139)
a56d29c1c8 is described below

commit a56d29c1c8b192053f60855704dab13e6a332b01
Author: shyjsarah <[email protected]>
AuthorDate: Wed Jan 28 18:59:17 2026 +0800

    [clone] support skipping cloning if the target table already exists. (#7139)
---
 .../apache/paimon/flink/action/CloneAction.java    | 11 +++-
 .../paimon/flink/action/CloneActionFactory.java    |  9 ++-
 .../paimon/flink/clone/CloneHiveTableUtils.java    |  8 ++-
 .../paimon/flink/clone/ClonePaimonTableUtils.java  |  8 ++-
 .../clone/schema/CloneHiveSchemaFunction.java      | 25 +++++++--
 .../clone/schema/ClonePaimonSchemaFunction.java    | 20 ++++++-
 .../paimon/flink/procedure/CloneProcedure.java     | 10 +++-
 .../paimon/hive/procedure/CloneActionITCase.java   | 65 ++++++++++++++++++++++
 8 files changed, 139 insertions(+), 17 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
index fe283fe8f4..e3ed1e01d4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
@@ -49,6 +49,7 @@ public class CloneAction extends ActionBase {
     @Nullable private final String preferFileFormat;
     private final String cloneFrom;
     private final boolean metaOnly;
+    private final boolean cloneIfExists;
 
     public CloneAction(
             String sourceDatabase,
@@ -63,7 +64,8 @@ public class CloneAction extends ActionBase {
             @Nullable List<String> excludedTables,
             @Nullable String preferFileFormat,
             String cloneFrom,
-            boolean metaOnly) {
+            boolean metaOnly,
+            boolean cloneIfExists) {
         super(sourceCatalogConfig);
 
         if (cloneFrom.equalsIgnoreCase("hive")) {
@@ -97,6 +99,7 @@ public class CloneAction extends ActionBase {
                         : preferFileFormat.toLowerCase();
         this.cloneFrom = cloneFrom;
         this.metaOnly = metaOnly;
+        this.cloneIfExists = cloneIfExists;
     }
 
     @Override
@@ -117,7 +120,8 @@ public class CloneAction extends ActionBase {
                         includedTables,
                         excludedTables,
                         preferFileFormat,
-                        metaOnly);
+                        metaOnly,
+                        cloneIfExists);
                 break;
             case "paimon":
                 ClonePaimonTableUtils.build(
@@ -134,7 +138,8 @@ public class CloneAction extends ActionBase {
                         includedTables,
                         excludedTables,
                         preferFileFormat,
-                        metaOnly);
+                        metaOnly,
+                        cloneIfExists);
                 break;
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
index 5d884bd2d9..db0680cf3e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
@@ -41,6 +41,7 @@ public class CloneActionFactory implements ActionFactory {
     private static final String PREFER_FILE_FORMAT = "prefer_file_format";
     private static final String CLONE_FROM = "clone_from";
     private static final String META_ONLY = "meta_only";
+    private static final String CLONE_IF_EXISTS = "clone_if_exists";
 
     @Override
     public String identifier() {
@@ -83,6 +84,11 @@ public class CloneActionFactory implements ActionFactory {
                 !StringUtils.isNullOrWhitespaceOnly(metaOnlyStr)
                         && Boolean.parseBoolean(metaOnlyStr);
 
+        String cloneIfExistsStr = params.get(CLONE_IF_EXISTS);
+        boolean cloneIfExists =
+                StringUtils.isNullOrWhitespaceOnly(cloneIfExistsStr)
+                        || Boolean.parseBoolean(cloneIfExistsStr);
+
         CloneAction cloneAction =
                 new CloneAction(
                         params.get(DATABASE),
@@ -97,7 +103,8 @@ public class CloneActionFactory implements ActionFactory {
                         excludedTables,
                         preferFileFormat,
                         cloneFrom,
-                        metaOnly);
+                        metaOnly,
+                        cloneIfExists);
 
         return Optional.of(cloneAction);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
index b6298404a3..8431b4ba70 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
@@ -157,7 +157,8 @@ public class CloneHiveTableUtils {
             @Nullable List<String> includedTables,
             @Nullable List<String> excludedTables,
             @Nullable String preferFileFormat,
-            boolean metaOnly)
+            boolean metaOnly,
+            boolean cloneIfExists)
             throws Exception {
         // list source tables
         DataStream<Tuple2<Identifier, Identifier>> source =
@@ -180,7 +181,10 @@ public class CloneHiveTableUtils {
                 partitionedSource
                         .process(
                                 new CloneHiveSchemaFunction(
-                                        sourceCatalogConfig, 
targetCatalogConfig, preferFileFormat))
+                                        sourceCatalogConfig,
+                                        targetCatalogConfig,
+                                        preferFileFormat,
+                                        cloneIfExists))
                         .name("Clone Schema")
                         .setParallelism(parallelism);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
index 1bce445a45..9cf1473b0f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
@@ -139,7 +139,8 @@ public class ClonePaimonTableUtils {
             @Nullable List<String> includedTables,
             @Nullable List<String> excludedTables,
             @Nullable String preferFileFormat,
-            boolean metaOnly)
+            boolean metaOnly,
+            boolean cloneIfExists)
             throws Exception {
         // list source tables
         DataStream<Tuple2<Identifier, Identifier>> source =
@@ -162,7 +163,10 @@ public class ClonePaimonTableUtils {
                 partitionedSource
                         .process(
                                 new ClonePaimonSchemaFunction(
-                                        sourceCatalogConfig, 
targetCatalogConfig, preferFileFormat))
+                                        sourceCatalogConfig,
+                                        targetCatalogConfig,
+                                        preferFileFormat,
+                                        cloneIfExists))
                         .name("Clone Schema")
                         .setParallelism(parallelism);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
index 1dceffba4a..35c504c240 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
@@ -63,6 +63,7 @@ public class CloneHiveSchemaFunction
     protected final Map<String, String> sourceCatalogConfig;
     protected final Map<String, String> targetCatalogConfig;
     @Nullable protected final String preferFileFormat;
+    protected final boolean cloneIfExists;
 
     protected transient HiveCatalog hiveCatalog;
     protected transient Catalog targetCatalog;
@@ -70,10 +71,12 @@ public class CloneHiveSchemaFunction
     public CloneHiveSchemaFunction(
             Map<String, String> sourceCatalogConfig,
             Map<String, String> targetCatalogConfig,
-            @Nullable String preferFileFormat) {
+            @Nullable String preferFileFormat,
+            boolean cloneIfExists) {
         this.sourceCatalogConfig = sourceCatalogConfig;
         this.targetCatalogConfig = targetCatalogConfig;
         this.preferFileFormat = preferFileFormat;
+        this.cloneIfExists = cloneIfExists;
     }
 
     /**
@@ -131,6 +134,13 @@ public class CloneHiveSchemaFunction
         try {
             Table existedTable = targetCatalog.getTable(tuple.f1);
 
+            if (!cloneIfExists) {
+                LOG.info(
+                        "Target table '{}' already exists and clone_if_exists 
is false, skipping clone operation.",
+                        tuple.f1);
+                return;
+            }
+
             checkState(
                     existedTable instanceof FileStoreTable,
                     String.format(
@@ -156,20 +166,21 @@ public class CloneHiveSchemaFunction
         checkState(
                 existedSchema.primaryKeys().isEmpty(),
                 "Can not clone data to existed paimon table which has primary 
keys. Existed paimon table is "
-                        + existedTable.name());
+                        + existedTable.fullName());
 
         // check bucket
         checkState(
                 existedTable.coreOptions().bucket() == -1,
                 "Can not clone data to existed paimon table which bucket is 
not -1. Existed paimon table is "
-                        + existedTable.name());
+                        + existedTable.fullName());
 
         // check format
         checkState(
                 Objects.equals(
                         sourceSchema.options().get(FILE_FORMAT.key()),
                         existedTable.coreOptions().formatType()),
-                "source table format is not compatible with existed paimon 
table format.");
+                "source table format is not compatible with existed paimon 
table format. Existed paimon table is "
+                        + existedTable.fullName());
 
         // check partition keys
         List<String> sourcePartitionFields = sourceSchema.partitionKeys();
@@ -178,7 +189,8 @@ public class CloneHiveSchemaFunction
         checkState(
                 sourcePartitionFields.size() == existedPartitionFields.size()
                         && new 
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
-                "source table partition keys is not compatible with existed 
paimon table partition keys.");
+                "source table partition keys is not compatible with existed 
paimon table partition keys. Existed paimon table is "
+                        + existedTable.fullName());
 
         // check all fields
         List<DataField> sourceFields = sourceSchema.fields();
@@ -187,6 +199,7 @@ public class CloneHiveSchemaFunction
         checkState(
                 existedFields.size() >= sourceFields.size()
                         && new 
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
-                "source table partition keys is not compatible with existed 
paimon table partition keys.");
+                "source table partition keys is not compatible with existed 
paimon table partition keys. Existed paimon table is "
+                        + existedTable.fullName());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
index 73b560f62b..5ad46beb06 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
@@ -31,6 +31,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
@@ -44,10 +46,12 @@ public class ClonePaimonSchemaFunction
         extends ProcessFunction<Tuple2<Identifier, Identifier>, 
CloneSchemaInfo> {
 
     private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClonePaimonSchemaFunction.class);
 
     private final Map<String, String> sourceCatalogConfig;
     private final Map<String, String> targetCatalogConfig;
     private final String preferFileFormat;
+    private final boolean cloneIfExists;
 
     private transient Catalog sourceCatalog;
     private transient Catalog targetCatalog;
@@ -55,10 +59,12 @@ public class ClonePaimonSchemaFunction
     public ClonePaimonSchemaFunction(
             Map<String, String> sourceCatalogConfig,
             Map<String, String> targetCatalogConfig,
-            String preferFileFormat) {
+            String preferFileFormat,
+            boolean cloneIfExists) {
         this.sourceCatalogConfig = sourceCatalogConfig;
         this.targetCatalogConfig = targetCatalogConfig;
         this.preferFileFormat = preferFileFormat;
+        this.cloneIfExists = cloneIfExists;
     }
 
     /**
@@ -121,6 +127,18 @@ public class ClonePaimonSchemaFunction
             builder.option(CoreOptions.FILE_FORMAT.key(), preferFileFormat);
         }
 
+        try {
+            targetCatalog.getTable(tuple.f1);
+            if (!cloneIfExists) {
+                LOG.info(
+                        "Target table '{}' already exists and clone_if_exists 
is false, skipping clone operation.",
+                        tuple.f1);
+                return;
+            }
+        } catch (Catalog.TableNotExistException e) {
+            // Table does not exist, proceed to create
+        }
+
         targetCatalog.createTable(tuple.f1, builder.build(), true);
 
         CloneSchemaInfo cloneSchemaInfo = new CloneSchemaInfo(tuple, true);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
index 4daeb2fef7..74a70be4fa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
@@ -77,6 +77,10 @@ public class CloneProcedure extends ProcedureBase {
                 @ArgumentHint(
                         name = "meta_only",
                         type = @DataTypeHint("BOOLEAN"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "clone_if_exists",
+                        type = @DataTypeHint("BOOLEAN"),
                         isOptional = true)
             })
     public String[] call(
@@ -93,7 +97,8 @@ public class CloneProcedure extends ProcedureBase {
             String excludedTablesStr,
             String preferFileFormat,
             String cloneFrom,
-            Boolean metaOnly)
+            Boolean metaOnly,
+            Boolean cloneIfExists)
             throws Exception {
         Map<String, String> sourceCatalogConfig =
                 new HashMap<>(optionalConfigMap(sourceCatalogConfigStr));
@@ -124,7 +129,8 @@ public class CloneProcedure extends ProcedureBase {
                         excludedTables,
                         preferFileFormat,
                         cloneFrom,
-                        metaOnly != null && metaOnly);
+                        metaOnly != null && metaOnly,
+                        cloneIfExists == null || cloneIfExists);
         return execute(procedureContext, action, "Clone Job");
     }
 
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index 943732f256..2dbed21acf 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -1070,6 +1070,71 @@ public class CloneActionITCase extends ActionITCaseBase {
         assertThat(paimonTable.snapshotManager().earliestSnapshot()).isNull();
     }
 
+    @Test
+    public void testCloneIfExistsEqualsFalse() throws Exception {
+        String format = randomFormat();
+        String dbName = "hivedb" + StringUtils.randomNumericString(10);
+        String tableName = "hivetable" + StringUtils.randomNumericString(10);
+
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+        tEnv.useCatalog("HIVE");
+        tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+        tEnv.executeSql("CREATE DATABASE " + dbName);
+        sql(
+                tEnv,
+                "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3 
INT) STORED AS %s",
+                dbName,
+                tableName,
+                format);
+        sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
+
+        tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+        tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH 
('type'='paimon-generic')");
+        tEnv.useCatalog("PAIMON_GE");
+
+        sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = 
'%s')", warehouse);
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql("CREATE DATABASE test");
+
+        List<String> args =
+                new ArrayList<>(
+                        Arrays.asList(
+                                "clone",
+                                "--database",
+                                dbName,
+                                "--table",
+                                tableName,
+                                "--catalog_conf",
+                                "metastore=hive",
+                                "--catalog_conf",
+                                "uri=thrift://localhost:" + PORT,
+                                "--target_database",
+                                "test",
+                                "--target_table",
+                                "test_table",
+                                "--target_catalog_conf",
+                                "warehouse=" + warehouse));
+
+        // First run: clone the table successfully
+        createAction(CloneAction.class, args).run();
+        FileStoreTable paimonTable =
+                paimonTable(tEnv, "PAIMON", Identifier.create("test", 
"test_table"));
+        assertThat(paimonTable.partitionKeys()).containsExactly("id2", "id3");
+        
assertThat(paimonTable.snapshotManager().earliestSnapshot()).isNotNull();
+
+        long snapshotId = paimonTable.snapshotManager().latestSnapshotId();
+
+        // Second run: clone_if_exists = false, should skip clone
+        args.add("--clone_if_exists");
+        args.add("false");
+        createAction(CloneAction.class, args).run();
+
+        // Verify that the table was not modified (snapshot ID should remain 
the same)
+        paimonTable = paimonTable(tEnv, "PAIMON", Identifier.create("test", 
"test_table"));
+        
assertThat(paimonTable.snapshotManager().latestSnapshotId()).isEqualTo(snapshotId);
+    }
+
     private String[] ddls(String format) {
         // has primary key
         String ddl0 =

Reply via email to