This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9b0c90c476 [flink] Add clone from Paimon tables in clone action (#5911)
9b0c90c476 is described below
commit 9b0c90c4766759e85b2582c203e5f7eed809ab12
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jul 18 18:17:45 2025 +0800
[flink] Add clone from Paimon tables in clone action (#5911)
---
.../apache/paimon/flink/action/CloneAction.java | 103 ++++------
.../paimon/flink/action/CloneActionFactory.java | 9 +-
.../{CloneUtils.java => CloneHiveTableUtils.java} | 131 +++++++------
.../paimon/flink/clone/ClonePaimonTableUtils.java | 214 +++++++++++++++++++++
....java => ShuffleIdentifierByTableComputer.java} | 38 ++--
.../flink/clone/{ => files}/CloneFileInfo.java | 2 +-
.../CloneFilesCommitOperator.java} | 6 +-
.../clone/{ => files}/CloneFilesFunction.java | 4 +-
.../CloneFilesProcessFunction.java} | 10 +-
.../flink/clone/{ => files}/DataFileInfo.java | 2 +-
.../clone/{ => files}/ListCloneFilesFunction.java | 8 +-
.../files/ShuffleDataFileByTableComputer.java | 46 +++++
.../paimon/flink/clone/spits/CloneSplitInfo.java | 62 ++++++
.../flink/clone/spits/CloneSplitsFunction.java | 108 +++++++++++
.../CommitMessageInfo.java} | 37 ++--
.../CommitMessageTableOperator.java} | 43 ++---
.../flink/clone/spits/ListCloneSplitsFunction.java | 139 +++++++++++++
.../spits/ShuffleCommitMessageByTableComputer.java | 46 +++++
.../paimon/flink/procedure/CloneProcedure.java | 10 +-
.../flink/clone/ListCloneFilesFunctionTest.java | 3 +-
.../paimon/hive/procedure/CloneActionITCase.java | 86 +++++++++
21 files changed, 899 insertions(+), 208 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
index 03ecdff9b9..8114b43fe1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneAction.java
@@ -20,21 +20,10 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.catalog.CachingCatalog;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.clone.CloneFileInfo;
-import org.apache.paimon.flink.clone.CloneFilesFunction;
-import org.apache.paimon.flink.clone.CloneUtils;
-import org.apache.paimon.flink.clone.CommitTableOperator;
-import org.apache.paimon.flink.clone.DataFileInfo;
-import org.apache.paimon.flink.clone.ListCloneFilesFunction;
-import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.flink.clone.CloneHiveTableUtils;
+import org.apache.paimon.flink.clone.ClonePaimonTableUtils;
import org.apache.paimon.hive.HiveCatalog;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
-
import javax.annotation.Nullable;
import java.util.List;
@@ -54,6 +43,7 @@ public class CloneAction extends ActionBase {
private final int parallelism;
@Nullable private final String whereSql;
@Nullable private final List<String> excludedTables;
+ private final String cloneFrom;
public CloneAction(
String sourceDatabase,
@@ -64,17 +54,20 @@ public class CloneAction extends ActionBase {
Map<String, String> targetCatalogConfig,
@Nullable Integer parallelism,
@Nullable String whereSql,
- @Nullable List<String> excludedTables) {
+ @Nullable List<String> excludedTables,
+ String cloneFrom) {
super(sourceCatalogConfig);
- Catalog sourceCatalog = catalog;
- if (sourceCatalog instanceof CachingCatalog) {
- sourceCatalog = ((CachingCatalog) sourceCatalog).wrapped();
- }
- if (!(sourceCatalog instanceof HiveCatalog)) {
- throw new UnsupportedOperationException(
- "Only support clone hive tables using HiveCatalog, but
current source catalog is "
- + sourceCatalog.getClass().getName());
+ if (cloneFrom.equalsIgnoreCase("hive")) {
+ Catalog sourceCatalog = catalog;
+ if (sourceCatalog instanceof CachingCatalog) {
+ sourceCatalog = ((CachingCatalog) sourceCatalog).wrapped();
+ }
+ if (!(sourceCatalog instanceof HiveCatalog)) {
+ throw new UnsupportedOperationException(
+ "Only support clone hive tables using HiveCatalog, but
current source catalog is "
+ + sourceCatalog.getClass().getName());
+ }
}
this.sourceDatabase = sourceDatabase;
@@ -88,58 +81,46 @@ public class CloneAction extends ActionBase {
this.parallelism = parallelism == null ? env.getParallelism() :
parallelism;
this.whereSql = whereSql;
this.excludedTables = excludedTables;
+ this.cloneFrom = cloneFrom;
}
@Override
public void build() throws Exception {
- // list source tables
- DataStream<Tuple2<Identifier, Identifier>> source =
- CloneUtils.buildSource(
+ switch (cloneFrom) {
+ case "hive":
+ CloneHiveTableUtils.build(
+ env,
+ catalog,
sourceDatabase,
sourceTableName,
+ sourceCatalogConfig,
targetDatabase,
targetTableName,
+ targetCatalogConfig,
+ parallelism,
+ whereSql,
+ excludedTables);
+ break;
+ case "paimon":
+ ClonePaimonTableUtils.build(
+ env,
catalog,
- excludedTables,
- env);
-
- DataStream<Tuple2<Identifier, Identifier>> partitionedSource =
- FlinkStreamPartitioner.partition(
- source, new CloneUtils.TableChannelComputer(),
parallelism);
-
- // create target table, list files and group by <table, partition>
- DataStream<CloneFileInfo> files =
- partitionedSource
- .process(
- new ListCloneFilesFunction(
- sourceCatalogConfig,
targetCatalogConfig, whereSql))
- .name("List Files")
- .setParallelism(parallelism);
-
- // copy files and commit
- DataStream<DataFileInfo> dataFile =
- files.rebalance()
- .process(new CloneFilesFunction(sourceCatalogConfig,
targetCatalogConfig))
- .name("Copy Files")
- .setParallelism(parallelism);
-
- DataStream<DataFileInfo> partitionedDataFile =
- FlinkStreamPartitioner.partition(
- dataFile, new CloneUtils.DataFileChannelComputer(),
parallelism);
-
- DataStream<Long> committed =
- partitionedDataFile
- .transform(
- "Commit table",
- BasicTypeInfo.LONG_TYPE_INFO,
- new CommitTableOperator(targetCatalogConfig))
- .setParallelism(parallelism);
- committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
+ sourceDatabase,
+ sourceTableName,
+ sourceCatalogConfig,
+ targetDatabase,
+ targetTableName,
+ targetCatalogConfig,
+ parallelism,
+ whereSql,
+ excludedTables);
+ break;
+ }
}
@Override
public void run() throws Exception {
build();
- execute("Clone Hive job");
+ execute("Clone job");
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
index 29d3331914..53b88d5f9c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CloneActionFactory.java
@@ -37,6 +37,7 @@ public class CloneActionFactory implements ActionFactory {
private static final String PARALLELISM = "parallelism";
private static final String WHERE = "where";
private static final String EXCLUDED_TABLES = "excluded_tables";
+ private static final String CLONE_FROM = "clone_from";
@Override
public String identifier() {
@@ -62,6 +63,11 @@ public class CloneActionFactory implements ActionFactory {
? null
: Arrays.asList(StringUtils.split(excludedTablesStr,
","));
+ String cloneFrom = params.get(CLONE_FROM);
+ if (StringUtils.isNullOrWhitespaceOnly(cloneFrom)) {
+ cloneFrom = "hive";
+ }
+
CloneAction cloneAction =
new CloneAction(
params.get(DATABASE),
@@ -72,7 +78,8 @@ public class CloneActionFactory implements ActionFactory {
targetCatalogConfig,
parallelism == null ? null :
Integer.parseInt(parallelism),
params.get(WHERE),
- excludedTables);
+ excludedTables,
+ cloneFrom);
return Optional.of(cloneAction);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
similarity index 59%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
index 06ff96af3d..c7cf68da05 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneHiveTableUtils.java
@@ -22,15 +22,22 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.DelegateCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.CloneAction;
+import org.apache.paimon.flink.clone.files.CloneFileInfo;
+import org.apache.paimon.flink.clone.files.CloneFilesCommitOperator;
+import org.apache.paimon.flink.clone.files.CloneFilesFunction;
+import org.apache.paimon.flink.clone.files.DataFileInfo;
+import org.apache.paimon.flink.clone.files.ListCloneFilesFunction;
+import org.apache.paimon.flink.clone.files.ShuffleDataFileByTableComputer;
+import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.hive.HiveCatalog;
-import org.apache.paimon.hive.clone.HiveCloneUtils;
-import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,15 +45,15 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
-/** Utils for building {@link CloneAction}. */
-public class CloneUtils {
+/** Utils for building {@link CloneAction} for append tables. */
+public class CloneHiveTableUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(CloneUtils.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(CloneHiveTableUtils.class);
public static DataStream<Tuple2<Identifier, Identifier>> buildSource(
String sourceDatabase,
@@ -70,7 +77,9 @@ public class CloneUtils {
StringUtils.isNullOrWhitespaceOnly(targetTableName),
"targetTableName must be blank when clone all tables in a
catalog.");
- for (Identifier identifier :
HiveCloneUtils.listTables(hiveCatalog, excludedTables)) {
+ for (Identifier identifier :
+ org.apache.paimon.hive.clone.HiveCloneUtils.listTables(
+ hiveCatalog, excludedTables)) {
result.add(new Tuple2<>(identifier, identifier));
}
} else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
@@ -82,7 +91,8 @@ public class CloneUtils {
"targetTableName must be blank when clone all tables in a
catalog.");
for (Identifier identifier :
- HiveCloneUtils.listTables(hiveCatalog, sourceDatabase,
excludedTables)) {
+ org.apache.paimon.hive.clone.HiveCloneUtils.listTables(
+ hiveCatalog, sourceDatabase, excludedTables)) {
result.add(
new Tuple2<>(
identifier,
@@ -121,54 +131,61 @@ public class CloneUtils {
return (HiveCatalog) rootCatalog;
}
- // ---------------------------------- Classes
----------------------------------
-
- /** Shuffle tables. */
- public static class TableChannelComputer
- implements ChannelComputer<Tuple2<Identifier, Identifier>> {
-
- private static final long serialVersionUID = 1L;
-
- private transient int numChannels;
-
- @Override
- public void setup(int numChannels) {
- this.numChannels = numChannels;
- }
-
- @Override
- public int channel(Tuple2<Identifier, Identifier> record) {
- return Math.floorMod(
- Objects.hash(record.f1.getDatabaseName(),
record.f1.getTableName()),
- numChannels);
- }
-
- @Override
- public String toString() {
- return "shuffle by identifier hash";
- }
- }
-
- /** Shuffle tables. */
- public static class DataFileChannelComputer implements
ChannelComputer<DataFileInfo> {
-
- private static final long serialVersionUID = 1L;
-
- private transient int numChannels;
-
- @Override
- public void setup(int numChannels) {
- this.numChannels = numChannels;
- }
-
- @Override
- public int channel(DataFileInfo record) {
- return Math.floorMod(Objects.hash(record.identifier()),
numChannels);
- }
-
- @Override
- public String toString() {
- return "shuffle by identifier hash";
- }
+ public static void build(
+ StreamExecutionEnvironment env,
+ Catalog sourceCatalog,
+ String sourceDatabase,
+ String sourceTableName,
+ Map<String, String> sourceCatalogConfig,
+ String targetDatabase,
+ String targetTableName,
+ Map<String, String> targetCatalogConfig,
+ int parallelism,
+ @Nullable String whereSql,
+ @Nullable List<String> excludedTables)
+ throws Exception {
+ // list source tables
+ DataStream<Tuple2<Identifier, Identifier>> source =
+ buildSource(
+ sourceDatabase,
+ sourceTableName,
+ targetDatabase,
+ targetTableName,
+ sourceCatalog,
+ excludedTables,
+ env);
+
+ DataStream<Tuple2<Identifier, Identifier>> partitionedSource =
+ FlinkStreamPartitioner.partition(
+ source, new ShuffleIdentifierByTableComputer(),
parallelism);
+
+ // create target table, list files and group by <table, partition>
+ DataStream<CloneFileInfo> files =
+ partitionedSource
+ .process(
+ new ListCloneFilesFunction(
+ sourceCatalogConfig,
targetCatalogConfig, whereSql))
+ .name("List Files")
+ .setParallelism(parallelism);
+
+ // copy files and commit
+ DataStream<DataFileInfo> dataFile =
+ files.rebalance()
+ .process(new CloneFilesFunction(sourceCatalogConfig,
targetCatalogConfig))
+ .name("Copy Files")
+ .setParallelism(parallelism);
+
+ DataStream<DataFileInfo> partitionedDataFile =
+ FlinkStreamPartitioner.partition(
+ dataFile, new ShuffleDataFileByTableComputer(),
parallelism);
+
+ DataStream<Long> committed =
+ partitionedDataFile
+ .transform(
+ "Commit table",
+ BasicTypeInfo.LONG_TYPE_INFO,
+ new
CloneFilesCommitOperator(targetCatalogConfig))
+ .setParallelism(parallelism);
+ committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
new file mode 100644
index 0000000000..8b539b01e8
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ClonePaimonTableUtils.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.CloneAction;
+import org.apache.paimon.flink.clone.spits.CloneSplitInfo;
+import org.apache.paimon.flink.clone.spits.CloneSplitsFunction;
+import org.apache.paimon.flink.clone.spits.CommitMessageInfo;
+import org.apache.paimon.flink.clone.spits.CommitMessageTableOperator;
+import org.apache.paimon.flink.clone.spits.ListCloneSplitsFunction;
+import org.apache.paimon.flink.clone.spits.ShuffleCommitMessageByTableComputer;
+import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Utils for building {@link CloneAction} for Paimon tables. */
+public class ClonePaimonTableUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ClonePaimonTableUtils.class);
+
+ public static DataStream<Tuple2<Identifier, Identifier>> buildSource(
+ String sourceDatabase,
+ String sourceTableName,
+ String targetDatabase,
+ String targetTableName,
+ Catalog sourceCatalog,
+ @Nullable List<String> excludedTables,
+ StreamExecutionEnvironment env)
+ throws Exception {
+ List<Tuple2<Identifier, Identifier>> result = new ArrayList<>();
+ if (StringUtils.isNullOrWhitespaceOnly(sourceDatabase)) {
+ checkArgument(
+ StringUtils.isNullOrWhitespaceOnly(sourceTableName),
+ "sourceTableName must be blank when database is null.");
+ checkArgument(
+ StringUtils.isNullOrWhitespaceOnly(targetDatabase),
+ "targetDatabase must be blank when clone all tables in a
catalog.");
+ checkArgument(
+ StringUtils.isNullOrWhitespaceOnly(targetTableName),
+ "targetTableName must be blank when clone all tables in a
catalog.");
+
+ for (Identifier identifier : listTables(sourceCatalog,
excludedTables)) {
+ result.add(new Tuple2<>(identifier, identifier));
+ }
+ } else if (StringUtils.isNullOrWhitespaceOnly(sourceTableName)) {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(targetDatabase),
+ "targetDatabase must not be blank when clone all tables in
a database.");
+ checkArgument(
+ StringUtils.isNullOrWhitespaceOnly(targetTableName),
+ "targetTableName must be blank when clone all tables in a
catalog.");
+
+ for (Identifier identifier :
+ listTables(sourceCatalog, sourceDatabase, excludedTables))
{
+ result.add(
+ new Tuple2<>(
+ identifier,
+ Identifier.create(targetDatabase,
identifier.getObjectName())));
+ }
+ } else {
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(targetDatabase),
+ "targetDatabase must not be blank when clone a table.");
+ checkArgument(
+ !StringUtils.isNullOrWhitespaceOnly(targetTableName),
+ "targetTableName must not be blank when clone a table.");
+ checkArgument(
+ CollectionUtils.isEmpty(excludedTables),
+ "excludedTables must be empty when clone a single table.");
+ result.add(
+ new Tuple2<>(
+ Identifier.create(sourceDatabase, sourceTableName),
+ Identifier.create(targetDatabase,
targetTableName)));
+ }
+
+ checkState(!result.isEmpty(), "Didn't find any table in source
catalog.");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The clone identifiers of source table and target table
are: {}", result);
+ }
+ return env.fromCollection(result).forceNonParallel();
+ }
+
+ public static void build(
+ StreamExecutionEnvironment env,
+ Catalog sourceCatalog,
+ String sourceDatabase,
+ String sourceTableName,
+ Map<String, String> sourceCatalogConfig,
+ String targetDatabase,
+ String targetTableName,
+ Map<String, String> targetCatalogConfig,
+ int parallelism,
+ @Nullable String whereSql,
+ @Nullable List<String> excludedTables)
+ throws Exception {
+ // list source tables
+ DataStream<Tuple2<Identifier, Identifier>> source =
+ buildSource(
+ sourceDatabase,
+ sourceTableName,
+ targetDatabase,
+ targetTableName,
+ sourceCatalog,
+ excludedTables,
+ env);
+
+ DataStream<Tuple2<Identifier, Identifier>> partitionedSource =
+ FlinkStreamPartitioner.partition(
+ source, new ShuffleIdentifierByTableComputer(),
parallelism);
+
+ DataStream<CloneSplitInfo> splits =
+ partitionedSource
+ .process(
+ new ListCloneSplitsFunction(
+ sourceCatalogConfig,
targetCatalogConfig, whereSql))
+ .name("List Files")
+ .setParallelism(parallelism);
+
+ // copy splits and commit
+ DataStream<CommitMessageInfo> commitMessage =
+ splits.rebalance()
+ .process(new CloneSplitsFunction(sourceCatalogConfig,
targetCatalogConfig))
+ .name("Copy Files")
+ .setParallelism(parallelism);
+
+ DataStream<CommitMessageInfo> partitionedCommitMessage =
+ FlinkStreamPartitioner.partition(
+ commitMessage, new
ShuffleCommitMessageByTableComputer(), parallelism);
+
+ DataStream<Long> committed =
+ partitionedCommitMessage
+ .transform(
+ "Commit table",
+ BasicTypeInfo.LONG_TYPE_INFO,
+ new
CommitMessageTableOperator(targetCatalogConfig))
+ .setParallelism(parallelism);
+ committed.sinkTo(new DiscardingSink<>()).name("end").setParallelism(1);
+ }
+
+ public static List<Identifier> listTables(
+ Catalog catalog, @Nullable List<String> excludedTables) throws
Exception {
+ Set<String> excludedTableSet = new HashSet<>();
+ if (CollectionUtils.isNotEmpty(excludedTables)) {
+ excludedTableSet.addAll(excludedTables);
+ }
+ List<Identifier> results = new ArrayList<>();
+ for (String database : catalog.listDatabases()) {
+ for (String table : catalog.listTables(database)) {
+ Identifier identifier = Identifier.create(database, table);
+ if (excludedTableSet.contains(identifier.getFullName())) {
+ continue;
+ }
+ results.add(identifier);
+ }
+ }
+ return results;
+ }
+
+ public static List<Identifier> listTables(
+ Catalog catalog, String database, @Nullable List<String>
excludedTables)
+ throws Exception {
+ Set<String> excludedTableSet = new HashSet<>();
+ if (CollectionUtils.isNotEmpty(excludedTables)) {
+ excludedTableSet.addAll(excludedTables);
+ }
+ List<Identifier> results = new ArrayList<>();
+ for (String table : catalog.listTables(database)) {
+ Identifier identifier = Identifier.create(database, table);
+ if (excludedTableSet.contains(identifier.getFullName())) {
+ continue;
+ }
+ results.add(identifier);
+ }
+ return results;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ShuffleIdentifierByTableComputer.java
similarity index 51%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ShuffleIdentifierByTableComputer.java
index 25bfa6a757..d58dbc5117 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ShuffleIdentifierByTableComputer.java
@@ -19,37 +19,33 @@
package org.apache.paimon.flink.clone;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.sink.ChannelComputer;
-import java.io.Serializable;
+import org.apache.flink.api.java.tuple.Tuple2;
-import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
-import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+import java.util.Objects;
-/** Data File (table, partition) with necessary information. */
-public class DataFileInfo implements Serializable {
+/** Shuffle tables from source identifier and target identifier by target
identifier. */
+public class ShuffleIdentifierByTableComputer
+ implements ChannelComputer<Tuple2<Identifier, Identifier>> {
private static final long serialVersionUID = 1L;
- private final Identifier identifier;
- private final byte[] partition;
- private final byte[] dataFileMeta;
+ private transient int numChannels;
- public DataFileInfo(Identifier identifier, BinaryRow partition, byte[]
dataFileMeta) {
- this.identifier = identifier;
- this.partition = serializeBinaryRow(partition);
- this.dataFileMeta = dataFileMeta;
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
}
- public Identifier identifier() {
- return identifier;
+ @Override
+ public int channel(Tuple2<Identifier, Identifier> record) {
+ return Math.floorMod(
+ Objects.hash(record.f1.getDatabaseName(),
record.f1.getTableName()), numChannels);
}
- public BinaryRow partition() {
- return deserializeBinaryRow(partition);
- }
-
- public byte[] dataFileMeta() {
- return dataFileMeta;
+ @Override
+ public String toString() {
+ return "shuffle by identifier hash";
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFileInfo.java
similarity index 98%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFileInfo.java
index 649a509d62..c50dbec69f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFileInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFileInfo.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.files;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesCommitOperator.java
similarity index 95%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesCommitOperator.java
index 57093e9fb8..1490c0912b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesCommitOperator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.files;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
@@ -42,7 +42,7 @@ import java.util.Map;
import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
/** Commit table operator. */
-public class CommitTableOperator extends AbstractStreamOperator<Long>
+public class CloneFilesCommitOperator extends AbstractStreamOperator<Long>
implements OneInputStreamOperator<DataFileInfo, Long>, BoundedOneInput
{
private static final long serialVersionUID = 1L;
@@ -52,7 +52,7 @@ public class CommitTableOperator extends
AbstractStreamOperator<Long>
private transient DataFileMetaSerializer dataFileSerializer;
private transient Map<Identifier, Map<BinaryRow, List<DataFileMeta>>>
files;
- public CommitTableOperator(Map<String, String> catalogConfig) {
+ public CloneFilesCommitOperator(Map<String, String> catalogConfig) {
this.catalogConfig = catalogConfig;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesFunction.java
similarity index 96%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesFunction.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesFunction.java
index 145770fa69..57de8621ea 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneFilesFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesFunction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.files;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
@@ -37,7 +37,7 @@ import java.util.HashMap;
import java.util.Map;
/** Clone files for table. */
-public class CloneFilesFunction extends CloneProcessFunction<CloneFileInfo,
DataFileInfo> {
+public class CloneFilesFunction extends
CloneFilesProcessFunction<CloneFileInfo, DataFileInfo> {
private static final long serialVersionUID = 1L;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesProcessFunction.java
similarity index 92%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneProcessFunction.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesProcessFunction.java
index 3ee51fe444..3733052448 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CloneProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/CloneFilesProcessFunction.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.files;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
@@ -35,12 +35,12 @@ import java.util.HashMap;
import java.util.Map;
import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
-import static org.apache.paimon.flink.clone.CloneUtils.getRootHiveCatalog;
+import static
org.apache.paimon.flink.clone.CloneHiveTableUtils.getRootHiveCatalog;
/** Abstract function for copying tables. */
-public abstract class CloneProcessFunction<I, O> extends ProcessFunction<I, O>
{
+public abstract class CloneFilesProcessFunction<I, O> extends
ProcessFunction<I, O> {
- protected static final Logger LOG =
LoggerFactory.getLogger(CloneProcessFunction.class);
+ protected static final Logger LOG =
LoggerFactory.getLogger(CloneFilesProcessFunction.class);
protected final Map<String, String> sourceCatalogConfig;
protected final Map<String, String> targetCatalogConfig;
@@ -51,7 +51,7 @@ public abstract class CloneProcessFunction<I, O> extends
ProcessFunction<I, O> {
protected transient Map<Identifier, Table> tableCache;
protected transient DataFileMetaSerializer dataFileSerializer;
- public CloneProcessFunction(
+ public CloneFilesProcessFunction(
Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
this.sourceCatalogConfig = sourceCatalogConfig;
this.targetCatalogConfig = targetCatalogConfig;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/DataFileInfo.java
similarity index 97%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/DataFileInfo.java
index 25bfa6a757..4bf0f62990 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/DataFileInfo.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.files;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
similarity index 96%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
index 40dd2d53c6..8957b217d1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/ListCloneFilesFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ListCloneFilesFunction.java
@@ -16,10 +16,9 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.files;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.predicate.SimpleSqlPredicateConvertor;
@@ -51,7 +50,7 @@ import static
org.apache.paimon.utils.Preconditions.checkState;
/** List files for table. */
public class ListCloneFilesFunction
- extends CloneProcessFunction<Tuple2<Identifier, Identifier>,
CloneFileInfo> {
+ extends CloneFilesProcessFunction<Tuple2<Identifier, Identifier>,
CloneFileInfo> {
private static final long serialVersionUID = 1L;
@@ -169,9 +168,8 @@ public class ListCloneFilesFunction
"source table partition keys is not compatible with existed
paimon table partition keys.");
}
- @VisibleForTesting
@Nullable
- static PartitionPredicate getPartitionPredicate(
+ public static PartitionPredicate getPartitionPredicate(
@Nullable String whereSql, RowType partitionType, Identifier
tableId) throws Exception {
if (whereSql == null) {
return null;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ShuffleDataFileByTableComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ShuffleDataFileByTableComputer.java
new file mode 100644
index 0000000000..f8ac669664
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/files/ShuffleDataFileByTableComputer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.files;
+
+import org.apache.paimon.table.sink.ChannelComputer;
+
+import java.util.Objects;
+
+/** Shuffle DataFile By Table Computer. */
+public class ShuffleDataFileByTableComputer implements
ChannelComputer<DataFileInfo> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient int numChannels;
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ }
+
+ @Override
+ public int channel(DataFileInfo record) {
+ return Math.floorMod(Objects.hash(record.identifier()), numChannels);
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by identifier hash";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitInfo.java
new file mode 100644
index 0000000000..96d8d737d0
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.spits;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/** Clone split with necessary information. */
+public class CloneSplitInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Identifier sourceIdentifier;
+ private final Identifier targetidentifier;
+ private final byte[] split;
+
+ public CloneSplitInfo(Identifier sourceIdentifier, Identifier
targetidentifier, Split split) {
+ this.sourceIdentifier = sourceIdentifier;
+ this.targetidentifier = targetidentifier;
+ try {
+ this.split = InstantiationUtil.serializeObject(split);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Identifier sourceIdentifier() {
+ return sourceIdentifier;
+ }
+
+ public Identifier targetidentifier() {
+ return targetidentifier;
+ }
+
+ public Split split() {
+ try {
+ return InstantiationUtil.deserializeObject(split,
getClass().getClassLoader());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java
new file mode 100644
index 0000000000..4b5bec097d
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CloneSplitsFunction.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.spits;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.TableRead;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
+
+/** Clone splits for table. */
+public class CloneSplitsFunction extends ProcessFunction<CloneSplitInfo,
CommitMessageInfo> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Map<String, String> sourceCatalogConfig;
+ private final Map<String, String> targetCatalogConfig;
+
+ private transient Catalog sourceCatalog;
+ private transient Catalog targetCatalog;
+
+ public CloneSplitsFunction(
+ Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
+ this.sourceCatalogConfig = sourceCatalogConfig;
+ this.targetCatalogConfig = targetCatalogConfig;
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
+ public void open(Configuration conf) throws Exception {
+ this.sourceCatalog =
createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
+ this.targetCatalog =
createPaimonCatalog(Options.fromMap(targetCatalogConfig));
+ }
+
+ @Override
+ public void processElement(
+ CloneSplitInfo cloneSplitInfo,
+ ProcessFunction<CloneSplitInfo, CommitMessageInfo>.Context context,
+ Collector<CommitMessageInfo> collector)
+ throws Exception {
+
+ // TODO reuse table read?
+ TableRead tableRead =
+ sourceCatalog
+ .getTable(cloneSplitInfo.sourceIdentifier())
+ .newReadBuilder()
+ .newRead();
+ // TODO reuse table write
+ BatchTableWrite write =
+ targetCatalog
+ .getTable(cloneSplitInfo.targetidentifier())
+ .newBatchWriteBuilder()
+ .newWrite();
+
+ tableRead
+ .createReader(cloneSplitInfo.split())
+ .forEachRemaining(
+ row -> {
+ try {
+ write.write(row);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ List<CommitMessage> commitMessages = write.prepareCommit();
+ for (CommitMessage commitMessage : commitMessages) {
+ CommitMessageInfo messageInfo =
+ new CommitMessageInfo(cloneSplitInfo.targetidentifier(),
commitMessage);
+ collector.collect(messageInfo);
+ }
+ write.close();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CommitMessageInfo.java
similarity index 56%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CommitMessageInfo.java
index 25bfa6a757..20ea096a75 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/DataFileInfo.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CommitMessageInfo.java
@@ -16,40 +16,41 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.spits;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.utils.InstantiationUtil;
+import java.io.IOException;
import java.io.Serializable;
-import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
-import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
-
-/** Data File (table, partition) with necessary information. */
-public class DataFileInfo implements Serializable {
+/** Commit message with necessary information. */
+public class CommitMessageInfo implements Serializable {
private static final long serialVersionUID = 1L;
private final Identifier identifier;
- private final byte[] partition;
- private final byte[] dataFileMeta;
+ private final byte[] commitMessage;
- public DataFileInfo(Identifier identifier, BinaryRow partition, byte[]
dataFileMeta) {
+ public CommitMessageInfo(Identifier identifier, CommitMessage
commitMessage) {
this.identifier = identifier;
- this.partition = serializeBinaryRow(partition);
- this.dataFileMeta = dataFileMeta;
+ try {
+ this.commitMessage =
InstantiationUtil.serializeObject(commitMessage);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
public Identifier identifier() {
return identifier;
}
- public BinaryRow partition() {
- return deserializeBinaryRow(partition);
- }
-
- public byte[] dataFileMeta() {
- return dataFileMeta;
+ public CommitMessage commitMessage() {
+ try {
+ return InstantiationUtil.deserializeObject(commitMessage,
getClass().getClassLoader());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CommitMessageTableOperator.java
similarity index 57%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
rename to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CommitMessageTableOperator.java
index 57093e9fb8..ad71ebf01a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CommitTableOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/CommitMessageTableOperator.java
@@ -16,14 +16,10 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.clone;
+package org.apache.paimon.flink.clone.spits;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataFileMetaSerializer;
-import org.apache.paimon.migrate.FileMetaUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
@@ -42,51 +38,38 @@ import java.util.Map;
import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
/** Commit table operator. */
-public class CommitTableOperator extends AbstractStreamOperator<Long>
- implements OneInputStreamOperator<DataFileInfo, Long>, BoundedOneInput
{
+public class CommitMessageTableOperator extends AbstractStreamOperator<Long>
+ implements OneInputStreamOperator<CommitMessageInfo, Long>,
BoundedOneInput {
private static final long serialVersionUID = 1L;
private final Map<String, String> catalogConfig;
- private transient DataFileMetaSerializer dataFileSerializer;
- private transient Map<Identifier, Map<BinaryRow, List<DataFileMeta>>>
files;
+ private transient Map<Identifier, List<CommitMessage>> commitMessages;
- public CommitTableOperator(Map<String, String> catalogConfig) {
+ public CommitMessageTableOperator(Map<String, String> catalogConfig) {
this.catalogConfig = catalogConfig;
}
@Override
public void open() throws Exception {
super.open();
- this.dataFileSerializer = new DataFileMetaSerializer();
- this.files = new HashMap<>();
+ this.commitMessages = new HashMap<>();
}
@Override
- public void processElement(StreamRecord<DataFileInfo> streamRecord) throws
Exception {
- DataFileInfo file = streamRecord.getValue();
- BinaryRow partition = file.partition();
- List<DataFileMeta> files =
- this.files
- .computeIfAbsent(file.identifier(), k -> new
HashMap<>())
- .computeIfAbsent(partition, k -> new ArrayList<>());
-
files.add(dataFileSerializer.deserializeFromBytes(file.dataFileMeta()));
+ public void processElement(StreamRecord<CommitMessageInfo> streamRecord)
throws Exception {
+ CommitMessageInfo info = streamRecord.getValue();
+ List<CommitMessage> messages =
+ this.commitMessages.computeIfAbsent(info.identifier(), k ->
new ArrayList<>());
+ messages.add(info.commitMessage());
}
@Override
public void endInput() throws Exception {
try (Catalog catalog =
createPaimonCatalog(Options.fromMap(catalogConfig))) {
- for (Map.Entry<Identifier, Map<BinaryRow, List<DataFileMeta>>>
entry :
- files.entrySet()) {
- List<CommitMessage> commitMessages = new ArrayList<>();
- for (Map.Entry<BinaryRow, List<DataFileMeta>> listEntry :
- entry.getValue().entrySet()) {
- commitMessages.add(
- FileMetaUtils.createCommitMessage(
- listEntry.getKey(), 0,
listEntry.getValue()));
- }
-
+ for (Map.Entry<Identifier, List<CommitMessage>> entry :
commitMessages.entrySet()) {
+ List<CommitMessage> commitMessages = entry.getValue();
Table table = catalog.getTable(entry.getKey());
try (BatchTableCommit commit =
table.newBatchWriteBuilder().withOverwrite().newCommit()) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
new file mode 100644
index 0000000000..9fdb5226b9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ListCloneSplitsFunction.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.spits;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
+
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.flink.FlinkCatalogFactory.createPaimonCatalog;
+import static
org.apache.paimon.flink.clone.files.ListCloneFilesFunction.getPartitionPredicate;
+
+/** List splits for table. */
+public class ListCloneSplitsFunction
+ extends ProcessFunction<Tuple2<Identifier, Identifier>,
CloneSplitInfo> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Map<String, String> sourceCatalogConfig;
+ private final Map<String, String> targetCatalogConfig;
+ @Nullable private final String whereSql;
+
+ private transient Catalog sourceCatalog;
+ private transient Catalog targetCatalog;
+
+ public ListCloneSplitsFunction(
+ Map<String, String> sourceCatalogConfig,
+ Map<String, String> targetCatalogConfig,
+ @Nullable String whereSql) {
+ this.sourceCatalogConfig = sourceCatalogConfig;
+ this.targetCatalogConfig = targetCatalogConfig;
+ this.whereSql = whereSql;
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public void open(OpenContext openContext) throws Exception {
+ open(new Configuration());
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
+ public void open(Configuration conf) throws Exception {
+ this.sourceCatalog =
createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
+ this.targetCatalog =
createPaimonCatalog(Options.fromMap(targetCatalogConfig));
+ }
+
+ @Override
+ public void processElement(
+ Tuple2<Identifier, Identifier> tuple,
+ ProcessFunction<Tuple2<Identifier, Identifier>,
CloneSplitInfo>.Context context,
+ Collector<CloneSplitInfo> collector)
+ throws Exception {
+
+ // create database if not exists
+ targetCatalog.createDatabase(tuple.f1.getDatabaseName(), true);
+
+ Table sourceTable = sourceCatalog.getTable(tuple.f0);
+ Schema.Builder builder = Schema.newBuilder();
+ sourceTable
+ .rowType()
+ .getFields()
+ .forEach(
+ f -> builder.column(f.name(), f.type(),
f.description(), f.defaultValue()));
+ builder.partitionKeys(sourceTable.partitionKeys());
+ builder.primaryKey(sourceTable.primaryKeys());
+ sourceTable
+ .options()
+ .forEach(
+ (k, v) -> {
+ if (k.equalsIgnoreCase(BUCKET.key())
+ || k.equalsIgnoreCase(PATH.key())) {
+ return;
+ }
+ builder.option(k, v);
+ });
+
+ if (sourceTable.primaryKeys().isEmpty()) {
+ // for append table with bucket
+ if (sourceTable.options().containsKey(BUCKET_KEY.key())) {
+ builder.option(BUCKET.key(),
sourceTable.options().get(BUCKET.key()));
+ builder.option(BUCKET_KEY.key(),
sourceTable.options().get(BUCKET_KEY.key()));
+ }
+ } else {
+ // for primary key table, only postpone bucket supports clone
+ builder.option(BUCKET.key(), "-2");
+ }
+
+ targetCatalog.createTable(tuple.f1, builder.build(), true);
+
+ PartitionPredicate predicate =
+ getPartitionPredicate(
+ whereSql,
+
sourceTable.rowType().project(sourceTable.partitionKeys()),
+ tuple.f0);
+ TableScan scan =
sourceTable.newReadBuilder().withPartitionFilter(predicate).newScan();
+ List<Split> splits = scan.plan().splits();
+ for (Split split : splits) {
+ CloneSplitInfo splitInfo = new CloneSplitInfo(tuple.f0, tuple.f1,
split);
+ collector.collect(splitInfo);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ShuffleCommitMessageByTableComputer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ShuffleCommitMessageByTableComputer.java
new file mode 100644
index 0000000000..974c879e7e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/spits/ShuffleCommitMessageByTableComputer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.clone.spits;
+
+import org.apache.paimon.table.sink.ChannelComputer;
+
+import java.util.Objects;
+
+/** Shuffle by table. */
+public class ShuffleCommitMessageByTableComputer implements
ChannelComputer<CommitMessageInfo> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient int numChannels;
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ }
+
+ @Override
+ public int channel(CommitMessageInfo record) {
+ return Math.floorMod(Objects.hash(record.identifier()), numChannels);
+ }
+
+ @Override
+ public String toString() {
+ return "shuffle by identifier hash";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
index 55647848ba..287b9758a3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CloneProcedure.java
@@ -61,6 +61,10 @@ public class CloneProcedure extends ProcedureBase {
@ArgumentHint(
name = "excluded_tables",
type = @DataTypeHint("STRING"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "clone_from",
+ type = @DataTypeHint("STRING"),
isOptional = true)
})
public String[] call(
@@ -73,7 +77,8 @@ public class CloneProcedure extends ProcedureBase {
String targetCatalogConfigStr,
Integer parallelism,
String where,
- String excludedTablesStr)
+ String excludedTablesStr,
+ String cloneFrom)
throws Exception {
Map<String, String> sourceCatalogConfig =
new HashMap<>(optionalConfigMap(sourceCatalogConfigStr));
@@ -96,7 +101,8 @@ public class CloneProcedure extends ProcedureBase {
targetCatalogConfig,
parallelism,
where,
- excludedTables);
+ excludedTables,
+ cloneFrom);
return execute(procedureContext, action, "Clone Job");
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/ListCloneFilesFunctionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/ListCloneFilesFunctionTest.java
index 880b65ab5f..5d63e32b3e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/ListCloneFilesFunctionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/clone/ListCloneFilesFunctionTest.java
@@ -22,13 +22,14 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryRowWriter;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.flink.clone.files.ListCloneFilesFunction;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.Test;
-import static
org.apache.paimon.flink.clone.ListCloneFilesFunction.getPartitionPredicate;
+import static
org.apache.paimon.flink.clone.files.ListCloneFilesFunction.getPartitionPredicate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
index b15a6f84ae..7858e33839 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/CloneActionITCase.java
@@ -34,6 +34,7 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
@@ -70,6 +71,91 @@ public class CloneActionITCase extends ActionITCaseBase {
TEST_HIVE_METASTORE.stop();
}
+ @Test
+ public void testClonePKTableFromPaimon() throws Exception {
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+ String warehouse1 = getTempDirPath();
+ String warehouse2 = getTempDirPath();
+ sql(tEnv, "CREATE CATALOG catalog1 WITH ('type'='paimon', 'warehouse'
= '%s')", warehouse1);
+ sql(tEnv, "CREATE CATALOG catalog2 WITH ('type'='paimon', 'warehouse'
= '%s')", warehouse2);
+
+ sql(
+ tEnv,
+ "CREATE TABLE catalog1.`default`.src (a INT, b INT, PRIMARY
KEY (a) NOT ENFORCED)");
+ sql(tEnv, "INSERT INTO catalog1.`default`.src VALUES (1, 1), (2, 2)");
+ createAction(
+ CloneAction.class,
+ "clone",
+ "--database",
+ "default",
+ "--table",
+ "src",
+ "--catalog_conf",
+ "warehouse=" + warehouse1,
+ "--target_database",
+ "default",
+ "--target_table",
+ "target",
+ "--target_catalog_conf",
+ "warehouse=" + warehouse2,
+ "--clone_from",
+ "paimon")
+ .run();
+
+ sql(tEnv, "CALL catalog2.sys.compact(`table` => 'default.target')");
+ List<Row> result = sql(tEnv, "SELECT * FROM
catalog2.`default`.target");
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1), Row.of(2,
2));
+ List<Row> show = sql(tEnv, "SHOW CREATE TABLE
catalog2.`default`.target");
+ assertThat(show.toString()).contains("PRIMARY KEY");
+ }
+
+ @Test
+ public void testCloneBucketedAppendFromPaimon() throws Exception {
+ TableEnvironment tEnv =
+ tableEnvironmentBuilder()
+ .batchMode()
+ .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+ .build();
+ String warehouse1 = getTempDirPath();
+ String warehouse2 = getTempDirPath();
+ sql(tEnv, "CREATE CATALOG catalog1 WITH ('type'='paimon', 'warehouse'
= '%s')", warehouse1);
+ sql(tEnv, "CREATE CATALOG catalog2 WITH ('type'='paimon', 'warehouse'
= '%s')", warehouse2);
+
+ sql(
+ tEnv,
+ "CREATE TABLE catalog1.`default`.src (a INT, b INT) WITH
('bucket' = '2', 'bucket-key' = 'b')");
+ sql(tEnv, "INSERT INTO catalog1.`default`.src VALUES (1, 1), (2, 2)");
+ createAction(
+ CloneAction.class,
+ "clone",
+ "--database",
+ "default",
+ "--table",
+ "src",
+ "--catalog_conf",
+ "warehouse=" + warehouse1,
+ "--target_database",
+ "default",
+ "--target_table",
+ "target",
+ "--target_catalog_conf",
+ "warehouse=" + warehouse2,
+ "--clone_from",
+ "paimon")
+ .run();
+
+ sql(tEnv, "CALL catalog2.sys.compact(`table` => 'default.target')");
+ List<Row> result = sql(tEnv, "SELECT * FROM
catalog2.`default`.target");
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 1), Row.of(2,
2));
+ List<Row> show = sql(tEnv, "SHOW CREATE TABLE
catalog2.`default`.target");
+ assertThat(show.toString()).contains("'bucket' = '2'");
+ assertThat(show.toString()).contains("'bucket-key' = 'b'");
+ }
+
@Test
public void testMigrateOneNonPartitionedTable() throws Exception {
String format = randomFormat();