This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b7a42cdf78 [spark] Support CopyFilesProcedure in spark (#6625)
b7a42cdf78 is described below
commit b7a42cdf78c75d0e5f629d390d368d4ed59c5241
Author: shyjsarah <[email protected]>
AuthorDate: Fri Nov 21 15:06:25 2025 +0800
[spark] Support CopyFilesProcedure in spark (#6625)
---
docs/content/spark/procedures.md | 13 ++
.../org/apache/paimon/migrate/FileMetaUtils.java | 21 ++
.../org/apache/paimon/spark/SparkProcedures.java | 2 +
.../paimon/spark/copy/CopyDataFilesOperator.java | 87 ++++++++
.../org/apache/paimon/spark/copy/CopyFileInfo.java | 92 +++++++++
.../paimon/spark/copy/CopyFilesCommitOperator.java | 150 ++++++++++++++
.../paimon/spark/copy/CopyFilesOperator.java | 37 ++++
.../apache/paimon/spark/copy/CopyFilesUtil.java | 98 +++++++++
.../paimon/spark/copy/CopySchemaOperator.java | 146 +++++++++++++
.../org/apache/paimon/spark/copy/DataFileInfo.java | 57 ++++++
.../apache/paimon/spark/copy/IndexFileInfo.java | 50 +++++
.../paimon/spark/copy/ListDataFilesOperator.java | 114 +++++++++++
.../paimon/spark/copy/ListIndexFilesOperator.java | 105 ++++++++++
.../paimon/spark/procedure/CopyFilesProcedure.java | 226 ++++++++++++++++++++
.../spark/procedure/CopyFilesProcedureTest.scala | 228 +++++++++++++++++++++
15 files changed, 1426 insertions(+)
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index ad938daaa7..c0abd0bae2 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -463,5 +463,18 @@ This section introduce all available spark procedures
about paimon.
CALL sys.rewrite_file_index(table => "t", where => "day =
'2025-08-17'")<br/>
</td>
</tr>
+ <tr>
+ <td>copy</td>
+ <td>
+ copy table files. Arguments:
+ <li>source_table: the source table identifier. Cannot be
empty.</li>
+ <li>target_table: the target table identifier. Cannot be
empty.</li>
+ <li>where: partition predicate. Left empty for all partitions.</li>
+ </td>
+ <td>
+ CALL sys.copy(source_table => "t1", target_table => "t1_copy")<br/>
+ CALL sys.copy(source_table => "t1", target_table => "t1_copy", where
=> "day = '2025-08-17'")<br/>
+ </td>
+ </tr>
</tbody>
</table>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 0dc14971b3..de51a91da2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -28,6 +28,7 @@ import org.apache.paimon.format.SimpleStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
@@ -95,6 +96,26 @@ public class FileMetaUtils {
Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()));
}
+ public static CommitMessage createCommitMessage(
+ BinaryRow partition,
+ int bucket,
+ int totalBuckets,
+ List<DataFileMeta> dataFileMetas,
+ List<IndexFileMeta> indexFileMetas) {
+ return new CommitMessageImpl(
+ partition,
+ bucket,
+ totalBuckets,
+ new DataIncrement(
+ dataFileMetas,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ indexFileMetas,
+ Collections.emptyList()),
+ new CompactIncrement(
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()));
+ }
+
public static DataFileMeta constructFileMeta(
String format,
FileStatus fileStatus,
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index b5db4192b1..32909a2caa 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -23,6 +23,7 @@ import
org.apache.paimon.spark.procedure.AlterViewDialectProcedure;
import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
import org.apache.paimon.spark.procedure.CompactManifestProcedure;
import org.apache.paimon.spark.procedure.CompactProcedure;
+import org.apache.paimon.spark.procedure.CopyFilesProcedure;
import org.apache.paimon.spark.procedure.CreateBranchProcedure;
import org.apache.paimon.spark.procedure.CreateFunctionProcedure;
import org.apache.paimon.spark.procedure.CreateTagFromTimestampProcedure;
@@ -113,6 +114,7 @@ public class SparkProcedures {
procedureBuilders.put(
"trigger_tag_automatic_creation",
TriggerTagAutomaticCreationProcedure::builder);
procedureBuilders.put("rewrite_file_index",
RewriteFileIndexProcedure::builder);
+ procedureBuilders.put("copy", CopyFilesProcedure::builder);
return procedureBuilders.build();
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyDataFilesOperator.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyDataFilesOperator.java
new file mode 100644
index 0000000000..6de060c01f
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyDataFilesOperator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.spark.utils.SparkProcedureUtils;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/** Copy data files from source table to target table. */
+public class CopyDataFilesOperator extends CopyFilesOperator {
+
+ public CopyDataFilesOperator(SparkSession spark, Catalog sourceCatalog,
Catalog targetCatalog) {
+ super(spark, sourceCatalog, targetCatalog);
+ }
+
+ public JavaRDD<CopyFileInfo> execute(
+ Identifier sourceIdentifier, Identifier targetIdentifier,
List<CopyFileInfo> dataFiles)
+ throws Exception {
+ if (CollectionUtils.isEmpty(dataFiles)) {
+ return null;
+ }
+ FileStoreTable sourceTable = (FileStoreTable)
sourceCatalog.getTable(sourceIdentifier);
+ FileStoreTable targetTable = (FileStoreTable)
targetCatalog.getTable(targetIdentifier);
+ int readParallelism = SparkProcedureUtils.readParallelism(dataFiles,
spark);
+ JavaSparkContext context =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+ JavaRDD<CopyFileInfo> copyFileInfoRdd =
+ context.parallelize(dataFiles, readParallelism)
+ .mapPartitions(new DataFileProcesser(sourceTable,
targetTable));
+ return copyFileInfoRdd;
+ }
+
+ /** Copy data files. */
+ public static class DataFileProcesser
+ implements FlatMapFunction<Iterator<CopyFileInfo>, CopyFileInfo> {
+
+ private final FileStoreTable sourceTable;
+ private final FileStoreTable targetTable;
+
+ public DataFileProcesser(FileStoreTable sourceTable, FileStoreTable
targetTable) {
+ this.sourceTable = sourceTable;
+ this.targetTable = targetTable;
+ }
+
+ @Override
+ public Iterator<CopyFileInfo> call(Iterator<CopyFileInfo>
dataFileIterator)
+ throws Exception {
+ List<CopyFileInfo> result = new ArrayList<>();
+ while (dataFileIterator.hasNext()) {
+ CopyFileInfo file = dataFileIterator.next();
+ Path sourcePath = new Path(file.sourceFilePath());
+ Path targetPath = new Path(file.targetFilePath());
+ CopyFilesUtil.copyFiles(
+ sourceTable.fileIO(), targetTable.fileIO(),
sourcePath, targetPath, false);
+ result.add(file);
+ }
+ return result.iterator();
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFileInfo.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFileInfo.java
new file mode 100644
index 0000000000..254c493d7e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFileInfo.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import java.io.Serializable;
+
+/** The information of copy data file. */
+public class CopyFileInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String sourceFilePath;
+
+ private final String targetFilePath;
+
+ private final byte[] partition;
+
+ private final int bucket;
+
+ private final int totalBuckets;
+
+ private final byte[] dataFileMeta;
+
+ public CopyFileInfo(
+ String sourceFilePath,
+ String targetFilePath,
+ byte[] partition,
+ int bucket,
+ byte[] dataFileMeta) {
+ this.sourceFilePath = sourceFilePath;
+ this.targetFilePath = targetFilePath;
+ this.partition = partition;
+ this.bucket = bucket;
+ this.totalBuckets = 0;
+ this.dataFileMeta = dataFileMeta;
+ }
+
+ public CopyFileInfo(
+ String sourceFilePath,
+ String targetFilePath,
+ byte[] partition,
+ int bucket,
+ int totalBuckets,
+ byte[] dataFileMeta) {
+ this.sourceFilePath = sourceFilePath;
+ this.targetFilePath = targetFilePath;
+ this.partition = partition;
+ this.bucket = bucket;
+ this.totalBuckets = totalBuckets;
+ this.dataFileMeta = dataFileMeta;
+ }
+
+ public String sourceFilePath() {
+ return sourceFilePath;
+ }
+
+ public String targetFilePath() {
+ return targetFilePath;
+ }
+
+ public byte[] partition() {
+ return partition;
+ }
+
+ public int bucket() {
+ return bucket;
+ }
+
+ public int totalBuckets() {
+ return totalBuckets;
+ }
+
+ public byte[] dataFileMeta() {
+ return dataFileMeta;
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesCommitOperator.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesCommitOperator.java
new file mode 100644
index 0000000000..d2284e5e55
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesCommitOperator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexFileMetaSerializer;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+/** Commit operator for copy files. */
+public class CopyFilesCommitOperator extends CopyFilesOperator {
+
+ private DataFileMetaSerializer dataFileSerializer;
+
+ private IndexFileMetaSerializer indexFileSerializer;
+
+ public CopyFilesCommitOperator(
+ SparkSession spark, Catalog sourceCatalog, Catalog targetCatalog) {
+ super(spark, sourceCatalog, targetCatalog);
+ this.dataFileSerializer = new DataFileMetaSerializer();
+ this.indexFileSerializer = new IndexFileMetaSerializer();
+ }
+
+ public void execute(
+ Identifier targetIdentifier,
+ JavaRDD<CopyFileInfo> dataCopyFileInfoRdd,
+ JavaRDD<CopyFileInfo> indexCopyFileInfoRdd)
+ throws Exception {
+ FileStoreTable targetTable = (FileStoreTable)
targetCatalog.getTable(targetIdentifier);
+
+ // deserialize data file meta
+ Map<Tuple2<BinaryRow, Integer>, DataFileInfo> dataFileMetaMap =
+ deserializeDataFileMeta(dataCopyFileInfoRdd);
+
+ // deserialize index file meta
+ Map<Tuple2<BinaryRow, Integer>, IndexFileInfo> indexFileMetaMap =
+ deserializeIndexFileMeta(indexCopyFileInfoRdd);
+
+ // construct commit messages
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ for (Map.Entry<Tuple2<BinaryRow, Integer>, DataFileInfo> entry :
+ dataFileMetaMap.entrySet()) {
+ Tuple2<BinaryRow, Integer> partitionAndBucket = entry.getKey();
+ DataFileInfo dataFileInfo = entry.getValue();
+ List<DataFileMeta> dataFileMetas = dataFileInfo.dataFileMetas();
+ List<IndexFileMeta> indexFileMetas = new ArrayList<>();
+ if (indexFileMetaMap.containsKey(partitionAndBucket)) {
+ IndexFileInfo indexFileInfo =
indexFileMetaMap.get(partitionAndBucket);
+ indexFileMetas.addAll(indexFileInfo.indexFileMetas());
+ }
+ commitMessages.add(
+ FileMetaUtils.createCommitMessage(
+ partitionAndBucket._1,
+ partitionAndBucket._2,
+ dataFileInfo.totalBuckets(),
+ dataFileMetas,
+ indexFileMetas));
+ }
+ try (BatchTableCommit commit =
+
targetTable.newBatchWriteBuilder().withOverwrite().newCommit()) {
+ commit.commit(commitMessages);
+ }
+ }
+
+ private Map<Tuple2<BinaryRow, Integer>, DataFileInfo>
deserializeDataFileMeta(
+ JavaRDD<CopyFileInfo> copyFileInfoRdd) throws IOException {
+ Map<Tuple2<BinaryRow, Integer>, DataFileInfo> result = new HashMap<>();
+ if (copyFileInfoRdd == null) {
+ return result;
+ }
+ List<CopyFileInfo> copyFileInfos = copyFileInfoRdd.collect();
+ for (CopyFileInfo copyDataFileInfo : copyFileInfos) {
+ BinaryRow partition =
+
SerializationUtils.deserializeBinaryRow(copyDataFileInfo.partition());
+ int bucket = copyDataFileInfo.bucket();
+ DataFileInfo dataFileInfo =
+ result.computeIfAbsent(
+ new Tuple2<>(partition, bucket),
+ k ->
+ new DataFileInfo(
+ partition,
+ bucket,
+ copyDataFileInfo.totalBuckets(),
+ new ArrayList<>()));
+ dataFileInfo
+ .dataFileMetas()
+
.add(dataFileSerializer.deserializeFromBytes(copyDataFileInfo.dataFileMeta()));
+ }
+ return result;
+ }
+
+ private Map<Tuple2<BinaryRow, Integer>, IndexFileInfo>
deserializeIndexFileMeta(
+ JavaRDD<CopyFileInfo> copyFileInfoRdd) throws IOException {
+ Map<Tuple2<BinaryRow, Integer>, IndexFileInfo> result = new
HashMap<>();
+ if (copyFileInfoRdd == null) {
+ return result;
+ }
+ List<CopyFileInfo> copyFileInfos = copyFileInfoRdd.collect();
+ for (CopyFileInfo copyIndexFileInfo : copyFileInfos) {
+ BinaryRow partition =
+
SerializationUtils.deserializeBinaryRow(copyIndexFileInfo.partition());
+ int bucket = copyIndexFileInfo.bucket();
+ IndexFileInfo indexFileInfo =
+ result.computeIfAbsent(
+ new Tuple2<>(partition, bucket),
+ k -> new IndexFileInfo(partition, bucket, new
ArrayList<>()));
+ indexFileInfo
+ .indexFileMetas()
+ .add(
+ indexFileSerializer.deserializeFromBytes(
+ copyIndexFileInfo.dataFileMeta()));
+ }
+ return result;
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesOperator.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesOperator.java
new file mode 100644
index 0000000000..073008634c
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesOperator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.catalog.Catalog;
+
+import org.apache.spark.sql.SparkSession;
+
+/** Base class for copy files. */
+public abstract class CopyFilesOperator {
+
+ protected final SparkSession spark;
+ protected final Catalog sourceCatalog;
+ protected final Catalog targetCatalog;
+
+ public CopyFilesOperator(SparkSession spark, Catalog sourceCatalog,
Catalog targetCatalog) {
+ this.spark = spark;
+ this.sourceCatalog = sourceCatalog;
+ this.targetCatalog = targetCatalog;
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
new file mode 100644
index 0000000000..092fbc238c
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.PojoDataFileMeta;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/** Utils for copy files. */
+public class CopyFilesUtil {
+
+ public static void copyFiles(
+ FileIO sourceFileIO,
+ FileIO targetFileIO,
+ Path sourcePath,
+ Path targetPath,
+ boolean overwrite)
+ throws IOException {
+ try (SeekableInputStream is = sourceFileIO.newInputStream(sourcePath);
+ PositionOutputStream os =
targetFileIO.newOutputStream(targetPath, overwrite)) {
+ IOUtils.copy(is, os);
+ }
+ }
+
+ public static DataFileMeta toNewDataFileMeta(
+ DataFileMeta oldFileMeta, String newFileName, long newSchemaId) {
+ String newExternalPath =
+ externalPathDir(oldFileMeta.externalPath().orElse(null))
+ .map(dir -> dir + "/" + newFileName)
+ .orElse(null);
+ return new PojoDataFileMeta(
+ newFileName,
+ oldFileMeta.fileSize(),
+ oldFileMeta.rowCount(),
+ oldFileMeta.minKey(),
+ oldFileMeta.maxKey(),
+ oldFileMeta.keyStats(),
+ oldFileMeta.valueStats(),
+ oldFileMeta.minSequenceNumber(),
+ oldFileMeta.maxSequenceNumber(),
+ newSchemaId,
+ oldFileMeta.level(),
+ oldFileMeta.extraFiles(),
+ oldFileMeta.creationTime(),
+ oldFileMeta.deleteRowCount().orElse(null),
+ oldFileMeta.embeddedIndex(),
+ oldFileMeta.fileSource().orElse(null),
+ oldFileMeta.valueStatsCols(),
+ newExternalPath,
+ oldFileMeta.firstRowId(),
+ oldFileMeta.writeCols());
+ }
+
+ public static IndexFileMeta toNewIndexFileMeta(IndexFileMeta oldFileMeta,
String newFileName) {
+ String newExternalPath =
+ externalPathDir(oldFileMeta.externalPath())
+ .map(dir -> dir + "/" + newFileName)
+ .orElse(null);
+ return new IndexFileMeta(
+ oldFileMeta.indexType(),
+ newFileName,
+ oldFileMeta.fileSize(),
+ oldFileMeta.rowCount(),
+ oldFileMeta.dvRanges(),
+ newExternalPath);
+ }
+
+ public static Optional<String> externalPathDir(String externalPath) {
+ return Optional.ofNullable(externalPath)
+ .map(Path::new)
+ .map(p -> p.getParent().toUri().toString());
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopySchemaOperator.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopySchemaOperator.java
new file mode 100644
index 0000000000..669a9f76a2
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopySchemaOperator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
+
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Copy schema and get latest snapshot. */
+public class CopySchemaOperator extends CopyFilesOperator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CopySchemaOperator.class);
+
+ public CopySchemaOperator(SparkSession spark, Catalog sourceCatalog,
Catalog targetCatalog) {
+ super(spark, sourceCatalog, targetCatalog);
+ }
+
+ public Snapshot execute(Identifier sourceIdentifier, Identifier
targetIdentifier)
+ throws Exception {
+ Table originalSourceTable = sourceCatalog.getTable(sourceIdentifier);
+ Preconditions.checkState(
+ originalSourceTable instanceof FileStoreTable,
+ String.format(
+ "Only support copy FileStoreTable, but this table %s
is %s.",
+ sourceIdentifier, sourceIdentifier.getClass()));
+ FileStoreTable sourceTable = (FileStoreTable) originalSourceTable;
+
+ // 1. create target table
+ targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true);
+
+ try {
+ Table existedTable = targetCatalog.getTable(targetIdentifier);
+ Preconditions.checkState(
+ existedTable instanceof FileStoreTable,
+ String.format(
+ "existed paimon table '%s' is not a
FileStoreTable, but a %s",
+ targetIdentifier,
existedTable.getClass().getName()));
+ checkCompatible(sourceTable, (FileStoreTable) existedTable);
+
+ LOG.info("paimon table '{}' already exists, use it as target
table.", targetIdentifier);
+ } catch (Catalog.TableNotExistException e) {
+ LOG.info("create target paimon table '{}'.", targetIdentifier);
+
+ targetCatalog.createTable(
+ targetIdentifier,
newSchemaFromTableSchema(sourceTable.schema()), false);
+ }
+
+ // 2. get latest snapshot files
+ FileStore<?> sourceStore = sourceTable.store();
+ SnapshotManager sourceSnapshotManager = sourceStore.snapshotManager();
+ Snapshot latestSnapshot = sourceSnapshotManager.latestSnapshot();
+ return latestSnapshot;
+ }
+
+ private static Schema newSchemaFromTableSchema(TableSchema tableSchema) {
+ return new Schema(
+ ImmutableList.copyOf(tableSchema.fields()),
+ ImmutableList.copyOf(tableSchema.partitionKeys()),
+ ImmutableList.copyOf(tableSchema.primaryKeys()),
+ ImmutableMap.copyOf(
+ Iterables.filter(
+ tableSchema.options().entrySet(),
+ entry -> !Objects.equals(entry.getKey(),
CoreOptions.PATH.key()))),
+ tableSchema.comment());
+ }
+
+ private void checkCompatible(FileStoreTable sourceTable, FileStoreTable
existedTable) {
+ Schema sourceSchema = sourceTable.schema().toSchema();
+ Schema existedSchema = existedTable.schema().toSchema();
+
+ // check bucket
+ checkState(
+ sourceTable.coreOptions().bucket() ==
existedTable.coreOptions().bucket(),
+ "source table bucket is not compatible with existed paimon
table bucket.");
+
+ // check format
+ checkState(
+ Objects.equals(
+ sourceTable.coreOptions().formatType(),
+ existedTable.coreOptions().formatType()),
+ "source table format is not compatible with existed paimon
table format.");
+
+ // check primary keys
+ List<String> sourcePrimaryKeys = sourceSchema.primaryKeys();
+ List<String> existedPrimaryKeys = existedSchema.primaryKeys();
+ checkState(
+ sourcePrimaryKeys.size() == existedPrimaryKeys.size()
+ && new
HashSet<>(existedPrimaryKeys).containsAll(sourcePrimaryKeys),
+ "source table primary keys is not compatible with existed
paimon table primary keys.");
+
+ // check partition keys
+ List<String> sourcePartitionFields = sourceSchema.partitionKeys();
+ List<String> existedPartitionFields = existedSchema.partitionKeys();
+ checkState(
+ sourcePartitionFields.size() == existedPartitionFields.size()
+ && new
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
+ "source table partition keys is not compatible with existed
paimon table partition keys.");
+
+ // check all fields
+ List<DataField> sourceFields = sourceSchema.fields();
+ List<DataField> existedFields = existedSchema.fields();
+ checkState(
+ existedFields.size() >= sourceFields.size()
+ && new
HashSet<>(existedFields).containsAll(sourceFields),
+ "source table fields is not compatible with existed paimon
table fields.");
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/DataFileInfo.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/DataFileInfo.java
new file mode 100644
index 0000000000..aa828c5cae
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/DataFileInfo.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+
+import java.util.List;
+
+/** DataFileInfo. */
+public class DataFileInfo {
+
+ private final BinaryRow partition;
+ private final int bucket;
+ private final int totalBuckets;
+ private final List<DataFileMeta> dataFileMetas;
+
+ public DataFileInfo(
+ BinaryRow partition, int bucket, int totalBuckets,
List<DataFileMeta> dataFileMetas) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.totalBuckets = totalBuckets;
+ this.dataFileMetas = dataFileMetas;
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ public int bucket() {
+ return bucket;
+ }
+
+ public int totalBuckets() {
+ return totalBuckets;
+ }
+
+ public List<DataFileMeta> dataFileMetas() {
+ return dataFileMetas;
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/IndexFileInfo.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/IndexFileInfo.java
new file mode 100644
index 0000000000..7512a9f133
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/IndexFileInfo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+
+import java.util.List;
+
+/** IndexFileInfo. */
+public class IndexFileInfo {
+
+ private final BinaryRow partition;
+ private final int bucket;
+ private final List<IndexFileMeta> indexFileMetas;
+
+ public IndexFileInfo(BinaryRow partition, int bucket, List<IndexFileMeta>
indexFileMetas) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.indexFileMetas = indexFileMetas;
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ public int bucket() {
+ return bucket;
+ }
+
+ public List<IndexFileMeta> indexFileMetas() {
+ return indexFileMetas;
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/ListDataFilesOperator.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/ListDataFilesOperator.java
new file mode 100644
index 0000000000..faf427165c
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/ListDataFilesOperator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.spark.sql.SparkSession;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/** List data files. */
+public class ListDataFilesOperator extends CopyFilesOperator {
+
+ private final DataFileMetaSerializer dataFileSerializer;
+
+ public ListDataFilesOperator(SparkSession spark, Catalog sourceCatalog,
Catalog targetCatalog) {
+ super(spark, sourceCatalog, targetCatalog);
+ this.dataFileSerializer = new DataFileMetaSerializer();
+ }
+
+ public List<CopyFileInfo> execute(
+ Identifier sourceIdentifier,
+ Identifier targetIdentifier,
+ @Nullable Snapshot snapshot,
+ @Nullable PartitionPredicate partitionPredicate)
+ throws Exception {
+ if (snapshot == null) {
+ return null;
+ }
+ FileStoreTable sourceTable = (FileStoreTable)
sourceCatalog.getTable(sourceIdentifier);
+ FileStoreTable targetTable = (FileStoreTable)
targetCatalog.getTable(targetIdentifier);
+ Iterator<ManifestEntry> manifestEntries =
+ sourceTable
+ .newSnapshotReader()
+ .withSnapshot(snapshot)
+ .withPartitionFilter(partitionPredicate)
+ .withMode(ScanMode.ALL)
+ .readFileIterator();
+
+ List<CopyFileInfo> dataFiles = new ArrayList<>();
+ while (manifestEntries.hasNext()) {
+ ManifestEntry manifestEntry = manifestEntries.next();
+ CopyFileInfo dataFile =
+ pickDataFiles(
+ manifestEntry,
+ sourceTable.store().pathFactory(),
+ targetTable.store().pathFactory(),
+ targetTable.schema().id());
+ dataFiles.add(dataFile);
+ }
+ return dataFiles;
+ }
+
+ private CopyFileInfo pickDataFiles(
+ ManifestEntry manifestEntry,
+ FileStorePathFactory sourceFileStorePathFactory,
+ FileStorePathFactory targetFileStorePathFactory,
+ long newSchemaId)
+ throws IOException {
+ Path dataFilePath =
+ sourceFileStorePathFactory
+ .createDataFilePathFactory(
+ manifestEntry.partition(),
manifestEntry.bucket())
+ .toPath(manifestEntry);
+ Path targetDataFilePath =
+ targetFileStorePathFactory
+ .createDataFilePathFactory(
+ manifestEntry.partition(),
manifestEntry.bucket())
+ .newPath();
+ DataFileMeta fileMeta = manifestEntry.file();
+ DataFileMeta targetFileMeta =
+ CopyFilesUtil.toNewDataFileMeta(
+ fileMeta, targetDataFilePath.getName(), newSchemaId);
+ return new CopyFileInfo(
+ dataFilePath.toString(),
+ targetDataFilePath.toString(),
+
SerializationUtils.serializeBinaryRow(manifestEntry.partition()),
+ manifestEntry.bucket(),
+ manifestEntry.totalBuckets(),
+ dataFileSerializer.serializeToBytes(targetFileMeta));
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/ListIndexFilesOperator.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/ListIndexFilesOperator.java
new file mode 100644
index 0000000000..4e1d6c4562
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/ListIndexFilesOperator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexFileMetaSerializer;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.spark.sql.SparkSession;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** List index files. */
+public class ListIndexFilesOperator extends CopyFilesOperator {
+
+ private final IndexFileMetaSerializer indexFileSerializer;
+
+ public ListIndexFilesOperator(
+ SparkSession spark, Catalog sourceCatalog, Catalog targetCatalog) {
+ super(spark, sourceCatalog, targetCatalog);
+ this.indexFileSerializer = new IndexFileMetaSerializer();
+ }
+
+ public List<CopyFileInfo> execute(
+ Identifier sourceIdentifier,
+ Identifier targetIdentifier,
+ Snapshot snapshot,
+ @Nullable PartitionPredicate partitionPredicate)
+ throws Exception {
+ if (snapshot == null) {
+ return null;
+ }
+ if (snapshot.indexManifest() == null) {
+ return null;
+ }
+ FileStoreTable sourceTable = (FileStoreTable)
sourceCatalog.getTable(sourceIdentifier);
+ FileStoreTable targetTable = (FileStoreTable)
targetCatalog.getTable(targetIdentifier);
+ List<CopyFileInfo> indexFiles = new ArrayList<>();
+ IndexFileHandler sourceIndexHandler =
sourceTable.store().newIndexFileHandler();
+ FileStorePathFactory targetFileStorePathFactory =
targetTable.store().pathFactory();
+ List<IndexManifestEntry> indexManifestEntries =
+
sourceIndexHandler.readManifestWithIOException(snapshot.indexManifest());
+ for (IndexManifestEntry indexManifestEntry : indexManifestEntries) {
+ if (partitionPredicate == null
+ ||
partitionPredicate.test(indexManifestEntry.partition())) {
+ CopyFileInfo indexFile =
+ pickIndexFiles(
+ indexManifestEntry, sourceIndexHandler,
targetFileStorePathFactory);
+ indexFiles.add(indexFile);
+ }
+ }
+ return indexFiles;
+ }
+
+ private CopyFileInfo pickIndexFiles(
+ IndexManifestEntry indexManifestEntry,
+ IndexFileHandler sourceIndexFileHandler,
+ FileStorePathFactory targetFileStorePathFactory)
+ throws IOException {
+ Path indexFilePath =
sourceIndexFileHandler.filePath(indexManifestEntry);
+ Path targetIndexFilePath =
+ targetFileStorePathFactory
+ .indexFileFactory(
+ indexManifestEntry.partition(),
indexManifestEntry.bucket())
+ .newPath();
+ IndexFileMeta fileMeta = indexManifestEntry.indexFile();
+ IndexFileMeta targetFileMeta =
+ CopyFilesUtil.toNewIndexFileMeta(fileMeta,
targetIndexFilePath.getName());
+ return new CopyFileInfo(
+ indexFilePath.toString(),
+ targetIndexFilePath.toString(),
+
SerializationUtils.serializeBinaryRow(indexManifestEntry.partition()),
+ indexManifestEntry.bucket(),
+ indexFileSerializer.serializeToBytes(targetFileMeta));
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CopyFilesProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CopyFilesProcedure.java
new file mode 100644
index 0000000000..8b29dabb2d
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CopyFilesProcedure.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.spark.SparkUtils;
+import org.apache.paimon.spark.catalog.SparkBaseCatalog;
+import org.apache.paimon.spark.copy.CopyDataFilesOperator;
+import org.apache.paimon.spark.copy.CopyFileInfo;
+import org.apache.paimon.spark.copy.CopyFilesCommitOperator;
+import org.apache.paimon.spark.copy.CopySchemaOperator;
+import org.apache.paimon.spark.copy.ListDataFilesOperator;
+import org.apache.paimon.spark.copy.ListIndexFilesOperator;
+import org.apache.paimon.spark.utils.CatalogUtils;
+import org.apache.paimon.spark.utils.SparkProcedureUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Copy files procedure for latest snapshot.
+ *
+ * <pre><code>
+ * CALL sys.copy(source_table => 'clg.db.tbl', target_table =>
'clg.db.tbl_copy')
+ * </code></pre>
+ */
+public class CopyFilesProcedure extends BaseProcedure {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CopyFilesProcedure.class);
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("source_table", StringType),
+ ProcedureParameter.required("target_table", StringType),
+ ProcedureParameter.optional("where", StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true,
Metadata.empty())
+ });
+
+ protected CopyFilesProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ // 1. get source catalog and identifier
+ SparkUtils.CatalogAndIdentifier sourceCatalogAndIdentifier =
+ toCatalogAndIdentifier(args.getString(0),
PARAMETERS[0].name(), tableCatalog());
+ Preconditions.checkState(
+ sourceCatalogAndIdentifier.catalog() instanceof
SparkBaseCatalog,
+ String.format(
+ "%s is not a Paimon catalog",
sourceCatalogAndIdentifier.catalog().name()));
+ SparkBaseCatalog sourceCatalog = (SparkBaseCatalog)
sourceCatalogAndIdentifier.catalog();
+ Catalog sourcePaimonCatalog = sourceCatalog.paimonCatalog();
+ Identifier sourceTableIdentifier =
+ CatalogUtils.toIdentifier(
+ sourceCatalogAndIdentifier.identifier(),
sourceCatalog.paimonCatalogName());
+
+ // 2. get target catalog and identifier
+ SparkUtils.CatalogAndIdentifier targetCatalogIdentifier =
+ toCatalogAndIdentifier(args.getString(1),
PARAMETERS[1].name(), tableCatalog());
+ Preconditions.checkState(
+ targetCatalogIdentifier.catalog() instanceof SparkBaseCatalog,
+ String.format(
+ "%s is not a Paimon catalog",
targetCatalogIdentifier.catalog().name()));
+ SparkBaseCatalog targetCatalog = (SparkBaseCatalog)
targetCatalogIdentifier.catalog();
+ Catalog targetPaimonCatalog = targetCatalog.paimonCatalog();
+ Identifier targetTableIdentifier =
+ CatalogUtils.toIdentifier(
+ targetCatalogIdentifier.identifier(),
targetCatalog.paimonCatalogName());
+
+ // 3. get partition predicate
+ String where = args.isNullAt(2) ? null : args.getString(2);
+ PartitionPredicate partitionPredicate = null;
+ try {
+ partitionPredicate =
+ getPartitionPredicate(
+ where,
+ sourceCatalogAndIdentifier,
+ sourcePaimonCatalog,
+ sourceTableIdentifier);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException("Failed to get partition predicate", e);
+ }
+
+ // 4. do copy
+ try {
+ doCopy(
+ sourcePaimonCatalog,
+ targetPaimonCatalog,
+ sourceTableIdentifier,
+ targetTableIdentifier,
+ partitionPredicate);
+ return new InternalRow[] {newInternalRow(true)};
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to copy files", e);
+ }
+ }
+
+ private void doCopy(
+ Catalog sourcePaimonCatalog,
+ Catalog targetPaimonCatalog,
+ Identifier sourceTableIdentifier,
+ Identifier targetTableIdentifier,
+ @Nullable PartitionPredicate partitionPredicate)
+ throws Exception {
+ CopySchemaOperator copySchemaOperator =
+ new CopySchemaOperator(spark(), sourcePaimonCatalog,
targetPaimonCatalog);
+ ListDataFilesOperator listDataFilesOperator =
+ new ListDataFilesOperator(spark(), sourcePaimonCatalog,
targetPaimonCatalog);
+ CopyDataFilesOperator copyDataFilesOperator =
+ new CopyDataFilesOperator(spark(), sourcePaimonCatalog,
targetPaimonCatalog);
+ CopyFilesCommitOperator copyFilesCommitOperator =
+ new CopyFilesCommitOperator(spark(), sourcePaimonCatalog,
targetPaimonCatalog);
+ ListIndexFilesOperator listIndexFilesOperator =
+ new ListIndexFilesOperator(spark(), sourcePaimonCatalog,
targetPaimonCatalog);
+
+ // 1. create target table and get latest snapshot
+ Snapshot snapshot =
+ copySchemaOperator.execute(sourceTableIdentifier,
targetTableIdentifier);
+
+ // 2. list data and index files
+ List<CopyFileInfo> dataFilesRdd =
+ listDataFilesOperator.execute(
+ sourceTableIdentifier, targetTableIdentifier,
snapshot, partitionPredicate);
+ List<CopyFileInfo> indexFilesRdd =
+ listIndexFilesOperator.execute(
+ sourceTableIdentifier, targetTableIdentifier,
snapshot, partitionPredicate);
+
+ // 3. copy data and index files
+ JavaRDD<CopyFileInfo> dataCopyFileInfoRdd =
+ copyDataFilesOperator.execute(
+ sourceTableIdentifier, targetTableIdentifier,
dataFilesRdd);
+ JavaRDD<CopyFileInfo> indexCopeFileInfoRdd =
+ copyDataFilesOperator.execute(
+ sourceTableIdentifier, targetTableIdentifier,
indexFilesRdd);
+ // 4. commit table
+ copyFilesCommitOperator.execute(
+ targetTableIdentifier, dataCopyFileInfoRdd,
indexCopeFileInfoRdd);
+ }
+
+ private PartitionPredicate getPartitionPredicate(
+ String where,
+ SparkUtils.CatalogAndIdentifier catalogAndIdentifier,
+ Catalog sourceCatalog,
+ Identifier sourceIdentifier)
+ throws Catalog.TableNotExistException {
+ DataSourceV2Relation relation =
createRelation(catalogAndIdentifier.identifier());
+ Table originalSourceTable = sourceCatalog.getTable(sourceIdentifier);
+ Preconditions.checkState(
+ originalSourceTable instanceof FileStoreTable,
+ String.format(
+ "Only support copy FileStoreTable, but this table %s
is %s.",
+ sourceIdentifier, sourceIdentifier.getClass()));
+ FileStoreTable sourceTable = (FileStoreTable) originalSourceTable;
+
+ PartitionPredicate partitionPredicate =
+ SparkProcedureUtils.convertToPartitionPredicate(
+ where, sourceTable.schema().logicalPartitionType(),
spark(), relation);
+ return partitionPredicate;
+ }
+
+ public static ProcedureBuilder builder() {
+ return new Builder<CopyFilesProcedure>() {
+ @Override
+ public CopyFilesProcedure doBuild() {
+ return new CopyFilesProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "CopyFilesProcedure";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CopyFilesProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CopyFilesProcedureTest.scala
new file mode 100644
index 0000000000..6c35af37e8
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CopyFilesProcedureTest.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+import java.util.concurrent.ThreadLocalRandom
+
+class CopyFilesProcedureTest extends PaimonSparkTestBase {
+
+ test("Paimon copy files procedure: append table") {
+ val random = ThreadLocalRandom.current().nextInt(100000);
+ withTable(s"tbl$random") {
+ sql(s"""
+ |CREATE TABLE tbl$random (k INT, v STRING)
+ |""".stripMargin)
+
+ sql(s"INSERT INTO tbl$random VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+ sql(s"INSERT INTO tbl$random VALUES (4, 'd'), (5, 'e'), (6, 'f')")
+
+ checkAnswer(
+ sql(s"CALL sys.copy(source_table => 'tbl$random', target_table =>
'target_tbl$random')"),
+ Row(true) :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT * FROM target_tbl$random"),
+ sql(s"SELECT * FROM tbl$random")
+ )
+
+ checkAnswer(
+ sql(s"CALL sys.copy(source_table => 'tbl$random', target_table =>
'target_tbl$random')"),
+ Row(true) :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT * FROM target_tbl$random"),
+ sql(s"SELECT * FROM tbl$random")
+ )
+ }
+ }
+
+ test("Paimon copy files procedure: partitioned append table") {
+ val random = ThreadLocalRandom.current().nextInt(100000);
+ withTable(s"tbl$random") {
+ sql(s"""
+ |CREATE TABLE tbl$random (k INT, v STRING, dt STRING, hh INT)
+ |PARTITIONED BY (dt, hh)
+ |""".stripMargin)
+
+ sql(s"INSERT INTO tbl$random VALUES (1, 'a', '2025-08-17', 5), (2, 'b',
'2025-10-06', 0)")
+ checkAnswer(
+ sql(s"CALL sys.copy(source_table => 'tbl$random', target_table =>
'target_tbl$random')"),
+ Row(true) :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT * FROM target_tbl$random"),
+ sql(s"SELECT * FROM tbl$random")
+ )
+
+ checkAnswer(
+ sql(s"CALL sys.copy(source_table => 'tbl$random', target_table =>
'target_tbl$random')"),
+ Row(true) :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT * FROM target_tbl$random"),
+ sql(s"SELECT * FROM tbl$random")
+ )
+ }
+ }
+
+ test("Paimon copy files procedure: partitioned append table with partition
filter") {
+ val random = ThreadLocalRandom.current().nextInt(100000);
+ withTable(s"tbl$random") {
+ sql(s"""
+ |CREATE TABLE tbl$random (k INT, v STRING, dt STRING, hh INT)
+ |PARTITIONED BY (dt, hh)
+ |""".stripMargin)
+
+ sql(s"INSERT INTO tbl$random VALUES (1, 'a', '2025-08-17', 5), (2, 'b',
'2025-10-06', 0)")
+ checkAnswer(
+ sql(
+ s"""CALL sys.copy(source_table => 'tbl$random', target_table =>
'target_tbl$random', where => "dt = '2025-08-17' and hh = 5")"""),
+ Row(true) :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT * FROM target_tbl$random"),
+ sql(s"SELECT * FROM tbl$random WHERE dt = '2025-08-17' and hh = 5")
+ )
+ }
+ }
+
+ test("Paimon copy files procedure: pk table") {
+ val random = ThreadLocalRandom.current().nextInt(100000);
+ withTable(s"tbl$random") {
+ sql(s"""
+ |CREATE TABLE tbl$random (k INT, v STRING, dt STRING, hh INT)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'dt,hh,k',
+ | 'bucket' = '-1')
+ |PARTITIONED BY (dt, hh)
+ |""".stripMargin)
+
+ sql(s"INSERT INTO tbl$random VALUES (1, 'a', '2025-08-17', 5), (2, 'b',
'2025-10-06', 0)")
+ checkAnswer(
+ sql(s"CALL sys.copy(source_table => 'tbl$random', target_table =>
'target_tbl$random')"),
+ Row(true) :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT * FROM target_tbl$random"),
+ sql(s"SELECT * FROM tbl$random")
+ )
+
+ }
+ }
+
+ test("Paimon copy files procedure: schema change") {
+ val random = ThreadLocalRandom.current().nextInt(100000);
+ withTable(s"tbl$random") {
+ // source table
+ sql(s"""
+ |CREATE TABLE tbl$random (k INT, v STRING, dt STRING, hh INT)
+ |PARTITIONED BY (dt, hh)
+ |""".stripMargin)
+ sql(s"INSERT INTO tbl$random VALUES (1, 'a', '2025-08-17', 5), (2, 'b',
'2025-10-06', 0)")
+
+ sql(s"""
+ |ALTER TABLE tbl$random
+ |DROP COLUMN v
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"CALL sys.copy(source_table => 'tbl$random', target_table =>
'target_tbl$random')"),
+ Row(true) :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT * FROM target_tbl$random"),
+ sql(s"SELECT * FROM tbl$random")
+ )
+
+ }
+ }
+
+ test("Paimon copy files procedure: copy to existed table") {
+ val random = ThreadLocalRandom.current().nextInt(100000);
+ withTable(s"tbl$random") {
+ // source table
+ sql(s"""
+ |CREATE TABLE tbl$random (k INT, v STRING, dt STRING, hh INT)
+ |PARTITIONED BY (dt, hh)
+ |""".stripMargin)
+ sql(s"INSERT INTO tbl$random VALUES (1, 'a', '2025-08-17', 5), (2, 'b',
'2025-10-06', 0)")
+
+ // target table
+ sql(s"""
+ |CREATE TABLE target_tbl$random (k INT, v STRING, dt STRING, hh
INT, v2 STRING)
+ |PARTITIONED BY (dt, hh)
+ |""".stripMargin)
+ // partition should overwrite
+ sql(
+ s"INSERT INTO target_tbl$random VALUES (3, 'c', '2025-08-17', 5,
'c1'), (4, 'd', '2025-08-17', 6, 'd1')")
+
+ checkAnswer(
+ sql(s"CALL sys.copy(source_table => 'tbl$random', target_table =>
'target_tbl$random')"),
+ Row(true) :: Nil
+ )
+
+ checkAnswer(
+ sql(s"SELECT * FROM target_tbl$random WHERE dt = '2025-08-17' and hh =
5"),
+ Row(1, "a", "2025-08-17", 5, null)
+ )
+
+ checkAnswer(
+ sql(s"SELECT * FROM target_tbl$random WHERE dt = '2025-10-06' and hh =
0"),
+ Row(2, "b", "2025-10-06", 0, null)
+ )
+
+ checkAnswer(
+ sql(s"SELECT * FROM target_tbl$random WHERE dt = '2025-08-17' and hh =
6"),
+ Row(4, "d", "2025-08-17", 6, "d1")
+ )
+ }
+ }
+
+ test("Paimon copy files procedure: copy to existed compatible table") {
+ val random = ThreadLocalRandom.current().nextInt(100000);
+ withTable(s"tbl$random") {
+ // source table
+ sql(s"""
+ |CREATE TABLE tbl$random (k INT, v STRING, dt STRING, hh INT)
+ |PARTITIONED BY (dt, hh)
+ |""".stripMargin)
+
+ // target table
+ sql(s"""
+ |CREATE TABLE target_tbl$random (k INT, v2 STRING, dt STRING, hh
INT)
+ |PARTITIONED BY (dt, hh)
+ |""".stripMargin)
+
+ assertThrows[RuntimeException] {
+ sql(s"CALL sys.copy(source_table => 'tbl$random', target_table =>
'target_tbl$random')")
+ }
+ }
+ }
+}