This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a56d29c1c8 [clone] support skipping cloning if the target table
already exists. (#7139)
a56d29c1c8 is described below
commit a56d29c1c8b192053f60855704dab13e6a332b01
Author: shyjsarah <[email protected]>
AuthorDate: Wed Jan 28 18:59:17 2026 +0800
[clone] support skipping cloning if the target table already exists. (#7139)
---
.../apache/paimon/flink/action/CloneAction.java | 11 +++-
.../paimon/flink/action/CloneActionFactory.java | 9 ++-
.../paimon/flink/clone/CloneHiveTableUtils.java | 8 ++-
.../paimon/flink/clone/ClonePaimonTableUtils.java | 8 ++-
.../clone/schema/CloneHiveSchemaFunction.java | 25 +++++++--
.../clone/schema/ClonePaimonSchemaFunction.java | 20 ++++++-
.../paimon/flink/procedure/CloneProcedure.java | 10 +++-
.../paimon/hive/procedure/CloneActionITCase.java | 65 ++++++++++++++++++++++
8 files changed, 139 insertions(+), 17 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
index fe283fe8f4..e3ed1e01d4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
@@ -49,6 +49,7 @@ public class CloneAction extends ActionBase {
@Nullable private final String preferFileFormat;
private final String cloneFrom;
private final boolean metaOnly;
+ private final boolean cloneIfExists;
public CloneAction(
String sourceDatabase,
@@ -63,7 +64,8 @@ public class CloneAction extends ActionBase {
@Nullable List<String> excludedTables,
@Nullable String preferFileFormat,
String cloneFrom,
- boolean metaOnly) {
+ boolean metaOnly,
+ boolean cloneIfExists) {
super(sourceCatalogConfig);
if (cloneFrom.equalsIgnoreCase("hive")) {
@@ -97,6 +99,7 @@ public class CloneAction extends ActionBase {
: preferFileFormat.toLowerCase();
this.cloneFrom = cloneFrom;
this.metaOnly = metaOnly;
+ this.cloneIfExists = cloneIfExists;
}
@Override
@@ -117,7 +120,8 @@ public class CloneAction extends ActionBase {
includedTables,
excludedTables,
preferFileFormat,
- metaOnly);
+ metaOnly,
+ cloneIfExists);
break;
case "paimon":
ClonePaimonTableUtils.build(
@@ -134,7 +138,8 @@ public class CloneAction extends ActionBase {
includedTables,
excludedTables,
preferFileFormat,
- metaOnly);
+ metaOnly,
+ cloneIfExists);
break;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
index 5d884bd2d9..db0680cf3e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
@@ -41,6 +41,7 @@ public class CloneActionFactory implements ActionFactory {
private static final String PREFER_FILE_FORMAT = "prefer_file_format";
private static final String CLONE_FROM = "clone_from";
private static final String META_ONLY = "meta_only";
+ private static final String CLONE_IF_EXISTS = "clone_if_exists";
@Override
public String identifier() {
@@ -83,6 +84,11 @@ public class CloneActionFactory implements ActionFactory {
!StringUtils.isNullOrWhitespaceOnly(metaOnlyStr)
&& Boolean.parseBoolean(metaOnlyStr);
+ String cloneIfExistsStr = params.get(CLONE_IF_EXISTS);
+ boolean cloneIfExists =
+ StringUtils.isNullOrWhitespaceOnly(cloneIfExistsStr)
+ || Boolean.parseBoolean(cloneIfExistsStr);
+
CloneAction cloneAction =
new CloneAction(
params.get(DATABASE),
@@ -97,7 +103,8 @@ public class CloneActionFactory implements ActionFactory {
excludedTables,
preferFileFormat,
cloneFrom,
- metaOnly);
+ metaOnly,
+ cloneIfExists);
return Optional.of(cloneAction);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
index b6298404a3..8431b4ba70 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
@@ -157,7 +157,8 @@ public class CloneHiveTableUtils {
@Nullable List<String> includedTables,
@Nullable List<String> excludedTables,
@Nullable String preferFileFormat,
- boolean metaOnly)
+ boolean metaOnly,
+ boolean cloneIfExists)
throws Exception {
// list source tables
DataStream<Tuple2<Identifier, Identifier>> source =
@@ -180,7 +181,10 @@ public class CloneHiveTableUtils {
partitionedSource
.process(
new CloneHiveSchemaFunction(
- sourceCatalogConfig,
targetCatalogConfig, preferFileFormat))
+ sourceCatalogConfig,
+ targetCatalogConfig,
+ preferFileFormat,
+ cloneIfExists))
.name("Clone Schema")
.setParallelism(parallelism);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
index 1bce445a45..9cf1473b0f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
@@ -139,7 +139,8 @@ public class ClonePaimonTableUtils {
@Nullable List<String> includedTables,
@Nullable List<String> excludedTables,
@Nullable String preferFileFormat,
- boolean metaOnly)
+ boolean metaOnly,
+ boolean cloneIfExists)
throws Exception {
// list source tables
DataStream<Tuple2<Identifier, Identifier>> source =
@@ -162,7 +163,10 @@ public class ClonePaimonTableUtils {
partitionedSource
.process(
new ClonePaimonSchemaFunction(
- sourceCatalogConfig,
targetCatalogConfig, preferFileFormat))
+ sourceCatalogConfig,
+ targetCatalogConfig,
+ preferFileFormat,
+ cloneIfExists))
.name("Clone Schema")
.setParallelism(parallelism);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
index 1dceffba4a..35c504c240 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/CloneHiveSchemaFunction.java
@@ -63,6 +63,7 @@ public class CloneHiveSchemaFunction
protected final Map<String, String> sourceCatalogConfig;
protected final Map<String, String> targetCatalogConfig;
@Nullable protected final String preferFileFormat;
+ protected final boolean cloneIfExists;
protected transient HiveCatalog hiveCatalog;
protected transient Catalog targetCatalog;
@@ -70,10 +71,12 @@ public class CloneHiveSchemaFunction
public CloneHiveSchemaFunction(
Map<String, String> sourceCatalogConfig,
Map<String, String> targetCatalogConfig,
- @Nullable String preferFileFormat) {
+ @Nullable String preferFileFormat,
+ boolean cloneIfExists) {
this.sourceCatalogConfig = sourceCatalogConfig;
this.targetCatalogConfig = targetCatalogConfig;
this.preferFileFormat = preferFileFormat;
+ this.cloneIfExists = cloneIfExists;
}
/**
@@ -131,6 +134,13 @@ public class CloneHiveSchemaFunction
try {
Table existedTable = targetCatalog.getTable(tuple.f1);
+ if (!cloneIfExists) {
+ LOG.info(
+ "Target table '{}' already exists and clone_if_exists
is false, skipping clone operation.",
+ tuple.f1);
+ return;
+ }
+
checkState(
existedTable instanceof FileStoreTable,
String.format(
@@ -156,20 +166,21 @@ public class CloneHiveSchemaFunction
checkState(
existedSchema.primaryKeys().isEmpty(),
"Can not clone data to existed paimon table which has primary
keys. Existed paimon table is "
- + existedTable.name());
+ + existedTable.fullName());
// check bucket
checkState(
existedTable.coreOptions().bucket() == -1,
"Can not clone data to existed paimon table which bucket is
not -1. Existed paimon table is "
- + existedTable.name());
+ + existedTable.fullName());
// check format
checkState(
Objects.equals(
sourceSchema.options().get(FILE_FORMAT.key()),
existedTable.coreOptions().formatType()),
- "source table format is not compatible with existed paimon
table format.");
+ "source table format is not compatible with existed paimon
table format. Existed paimon table is "
+ + existedTable.fullName());
// check partition keys
List<String> sourcePartitionFields = sourceSchema.partitionKeys();
@@ -178,7 +189,8 @@ public class CloneHiveSchemaFunction
checkState(
sourcePartitionFields.size() == existedPartitionFields.size()
&& new
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
- "source table partition keys is not compatible with existed
paimon table partition keys.");
+ "source table partition keys is not compatible with existed
paimon table partition keys. Existed paimon table is "
+ + existedTable.fullName());
// check all fields
List<DataField> sourceFields = sourceSchema.fields();
@@ -187,6 +199,7 @@ public class CloneHiveSchemaFunction
checkState(
existedFields.size() >= sourceFields.size()
&& new
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
- "source table partition keys is not compatible with existed
paimon table partition keys.");
+ "source table partition keys is not compatible with existed
paimon table partition keys. Existed paimon table is "
+ + existedTable.fullName());
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
index 73b560f62b..5ad46beb06 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/schema/ClonePaimonSchemaFunction.java
@@ -31,6 +31,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Map;
@@ -44,10 +46,12 @@ public class ClonePaimonSchemaFunction
extends ProcessFunction<Tuple2<Identifier, Identifier>,
CloneSchemaInfo> {
private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(ClonePaimonSchemaFunction.class);
private final Map<String, String> sourceCatalogConfig;
private final Map<String, String> targetCatalogConfig;
private final String preferFileFormat;
+ private final boolean cloneIfExists;
private transient Catalog sourceCatalog;
private transient Catalog targetCatalog;
@@ -55,10 +59,12 @@ public class ClonePaimonSchemaFunction
public ClonePaimonSchemaFunction(
Map<String, String> sourceCatalogConfig,
Map<String, String> targetCatalogConfig,
- String preferFileFormat) {
+ String preferFileFormat,
+ boolean cloneIfExists) {
this.sourceCatalogConfig = sourceCatalogConfig;
this.targetCatalogConfig = targetCatalogConfig;
this.preferFileFormat = preferFileFormat;
+ this.cloneIfExists = cloneIfExists;
}
/**
@@ -121,6 +127,18 @@ public class ClonePaimonSchemaFunction
builder.option(CoreOptions.FILE_FORMAT.key(), preferFileFormat);
}
+ try {
+ targetCatalog.getTable(tuple.f1);
+ if (!cloneIfExists) {
+ LOG.info(
+ "Target table '{}' already exists and clone_if_exists
is false, skipping clone operation.",
+ tuple.f1);
+ return;
+ }
+ } catch (Catalog.TableNotExistException e) {
+ // Table does not exist, proceed to create
+ }
+
targetCatalog.createTable(tuple.f1, builder.build(), true);
CloneSchemaInfo cloneSchemaInfo = new CloneSchemaInfo(tuple, true);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
index 4daeb2fef7..74a70be4fa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
@@ -77,6 +77,10 @@ public class CloneProcedure extends ProcedureBase {
@ArgumentHint(
name = "meta_only",
type = @DataTypeHint("BOOLEAN"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "clone_if_exists",
+ type = @DataTypeHint("BOOLEAN"),
isOptional = true)
})
public String[] call(
@@ -93,7 +97,8 @@ public class CloneProcedure extends ProcedureBase {
String excludedTablesStr,
String preferFileFormat,
String cloneFrom,
- Boolean metaOnly)
+ Boolean metaOnly,
+ Boolean cloneIfExists)
throws Exception {
Map<String, String> sourceCatalogConfig =
new HashMap<>(optionalConfigMap(sourceCatalogConfigStr));
@@ -124,7 +129,8 @@ public class CloneProcedure extends ProcedureBase {
excludedTables,
preferFileFormat,
cloneFrom,
- metaOnly != null && metaOnly);
+ metaOnly != null && metaOnly,
+ cloneIfExists == null || cloneIfExists);
return execute(procedureContext, action, "Clone Job");
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index 943732f256..2dbed21acf 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -1070,6 +1070,71 @@ public class CloneActionITCase extends ActionITCaseBase {
assertThat(paimonTable.snapshotManager().earliestSnapshot()).isNull();
}
+ @Test
+ public void testCloneIfExistsEqualsFalse() throws Exception {
+ String format = randomFormat();
+ String dbName = "hivedb" + StringUtils.randomNumericString(10);
+ String tableName = "hivetable" + StringUtils.randomNumericString(10);
+
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
+ tEnv.useCatalog("HIVE");
+ tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
+ tEnv.executeSql("CREATE DATABASE " + dbName);
+ sql(
+ tEnv,
+ "CREATE TABLE %s.%s (id STRING) PARTITIONED BY (id2 INT, id3
INT) STORED AS %s",
+ dbName,
+ tableName,
+ format);
+ sql(tEnv, "INSERT INTO %s.%s VALUES %s", dbName, tableName, data(100));
+
+ tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
+ tEnv.executeSql("CREATE CATALOG PAIMON_GE WITH
('type'='paimon-generic')");
+ tEnv.useCatalog("PAIMON_GE");
+
+ sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' =
'%s')", warehouse);
+ tEnv.useCatalog("PAIMON");
+ tEnv.executeSql("CREATE DATABASE test");
+
+ List<String> args =
+ new ArrayList<>(
+ Arrays.asList(
+ "clone",
+ "--database",
+ dbName,
+ "--table",
+ tableName,
+ "--catalog_conf",
+ "metastore=hive",
+ "--catalog_conf",
+ "uri=thrift://localhost:" + PORT,
+ "--target_database",
+ "test",
+ "--target_table",
+ "test_table",
+ "--target_catalog_conf",
+ "warehouse=" + warehouse));
+
+ // First run: clone the table successfully
+ createAction(CloneAction.class, args).run();
+ FileStoreTable paimonTable =
+ paimonTable(tEnv, "PAIMON", Identifier.create("test",
"test_table"));
+ assertThat(paimonTable.partitionKeys()).containsExactly("id2", "id3");
+
assertThat(paimonTable.snapshotManager().earliestSnapshot()).isNotNull();
+
+ long snapshotId = paimonTable.snapshotManager().latestSnapshotId();
+
+ // Second run: clone_if_exists = false, should skip clone
+ args.add("--clone_if_exists");
+ args.add("false");
+ createAction(CloneAction.class, args).run();
+
+ // Verify that the table was not modified (snapshot ID should remain
the same)
+ paimonTable = paimonTable(tEnv, "PAIMON", Identifier.create("test",
"test_table"));
+
assertThat(paimonTable.snapshotManager().latestSnapshotId()).isEqualTo(snapshotId);
+ }
+
private String[] ddls(String format) {
// has primary key
String ddl0 =