This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b7a42cdf78 [spark] Support CopyFilesProcedure in spark (#6625)
b7a42cdf78 is described below

commit b7a42cdf78c75d0e5f629d390d368d4ed59c5241
Author: shyjsarah <[email protected]>
AuthorDate: Fri Nov 21 15:06:25 2025 +0800

    [spark] Support CopyFilesProcedure in spark (#6625)
---
 docs/content/spark/procedures.md                   |  13 ++
 .../org/apache/paimon/migrate/FileMetaUtils.java   |  21 ++
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../paimon/spark/copy/CopyDataFilesOperator.java   |  87 ++++++++
 .../org/apache/paimon/spark/copy/CopyFileInfo.java |  92 +++++++++
 .../paimon/spark/copy/CopyFilesCommitOperator.java | 150 ++++++++++++++
 .../paimon/spark/copy/CopyFilesOperator.java       |  37 ++++
 .../apache/paimon/spark/copy/CopyFilesUtil.java    |  98 +++++++++
 .../paimon/spark/copy/CopySchemaOperator.java      | 146 +++++++++++++
 .../org/apache/paimon/spark/copy/DataFileInfo.java |  57 ++++++
 .../apache/paimon/spark/copy/IndexFileInfo.java    |  50 +++++
 .../paimon/spark/copy/ListDataFilesOperator.java   | 114 +++++++++++
 .../paimon/spark/copy/ListIndexFilesOperator.java  | 105 ++++++++++
 .../paimon/spark/procedure/CopyFilesProcedure.java | 226 ++++++++++++++++++++
 .../spark/procedure/CopyFilesProcedureTest.scala   | 228 +++++++++++++++++++++
 15 files changed, 1426 insertions(+)

diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index ad938daaa7..c0abd0bae2 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -463,5 +463,18 @@ This section introduce all available spark procedures 
about paimon.
          CALL sys.rewrite_file_index(table => "t", where => "day = 
'2025-08-17'")<br/>
       </td>
    </tr>
+   <tr>
+      <td>copy</td>
+      <td>
+         copy table files. Arguments:
+            <li>source_table: the source table identifier. Cannot be 
empty.</li>
+            <li>target_table: the target table identifier. Cannot be 
empty.</li>
+            <li>where: partition predicate. Left empty for all partitions.</li>
+      </td>
+      <td>
+         CALL sys.copy(source_table => "t1", target_table => "t1_copy")<br/>
+         CALL sys.copy(source_table => "t1", target_table => "t1_copy", where 
=> "day = '2025-08-17'")<br/>
+      </td>
+   </tr>
    </tbody>
 </table>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 0dc14971b3..de51a91da2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -28,6 +28,7 @@ import org.apache.paimon.format.SimpleStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
@@ -95,6 +96,26 @@ public class FileMetaUtils {
                         Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()));
     }
 
+    public static CommitMessage createCommitMessage(
+            BinaryRow partition,
+            int bucket,
+            int totalBuckets,
+            List<DataFileMeta> dataFileMetas,
+            List<IndexFileMeta> indexFileMetas) {
+        return new CommitMessageImpl(
+                partition,
+                bucket,
+                totalBuckets,
+                new DataIncrement(
+                        dataFileMetas,
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        indexFileMetas,
+                        Collections.emptyList()),
+                new CompactIncrement(
+                        Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()));
+    }
+
     public static DataFileMeta constructFileMeta(
             String format,
             FileStatus fileStatus,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index b5db4192b1..32909a2caa 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -23,6 +23,7 @@ import 
org.apache.paimon.spark.procedure.AlterViewDialectProcedure;
 import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
 import org.apache.paimon.spark.procedure.CompactManifestProcedure;
 import org.apache.paimon.spark.procedure.CompactProcedure;
+import org.apache.paimon.spark.procedure.CopyFilesProcedure;
 import org.apache.paimon.spark.procedure.CreateBranchProcedure;
 import org.apache.paimon.spark.procedure.CreateFunctionProcedure;
 import org.apache.paimon.spark.procedure.CreateTagFromTimestampProcedure;
@@ -113,6 +114,7 @@ public class SparkProcedures {
         procedureBuilders.put(
                 "trigger_tag_automatic_creation", 
TriggerTagAutomaticCreationProcedure::builder);
         procedureBuilders.put("rewrite_file_index", 
RewriteFileIndexProcedure::builder);
+        procedureBuilders.put("copy", CopyFilesProcedure::builder);
         return procedureBuilders.build();
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyDataFilesOperator.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyDataFilesOperator.java
new file mode 100644
index 0000000000..6de060c01f
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyDataFilesOperator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.spark.utils.SparkProcedureUtils;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.SparkSession;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/** Copy data files from source table to target table. */
+public class CopyDataFilesOperator extends CopyFilesOperator {
+
+    public CopyDataFilesOperator(SparkSession spark, Catalog sourceCatalog, 
Catalog targetCatalog) {
+        super(spark, sourceCatalog, targetCatalog);
+    }
+
+    public JavaRDD<CopyFileInfo> execute(
+            Identifier sourceIdentifier, Identifier targetIdentifier, 
List<CopyFileInfo> dataFiles)
+            throws Exception {
+        if (CollectionUtils.isEmpty(dataFiles)) {
+            return null;
+        }
+        FileStoreTable sourceTable = (FileStoreTable) 
sourceCatalog.getTable(sourceIdentifier);
+        FileStoreTable targetTable = (FileStoreTable) 
targetCatalog.getTable(targetIdentifier);
+        int readParallelism = SparkProcedureUtils.readParallelism(dataFiles, 
spark);
+        JavaSparkContext context = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+        JavaRDD<CopyFileInfo> copyFileInfoRdd =
+                context.parallelize(dataFiles, readParallelism)
+                        .mapPartitions(new DataFileProcesser(sourceTable, 
targetTable));
+        return copyFileInfoRdd;
+    }
+
+    /** Copy data files. */
+    public static class DataFileProcesser
+            implements FlatMapFunction<Iterator<CopyFileInfo>, CopyFileInfo> {
+
+        private final FileStoreTable sourceTable;
+        private final FileStoreTable targetTable;
+
+        public DataFileProcesser(FileStoreTable sourceTable, FileStoreTable 
targetTable) {
+            this.sourceTable = sourceTable;
+            this.targetTable = targetTable;
+        }
+
+        @Override
+        public Iterator<CopyFileInfo> call(Iterator<CopyFileInfo> 
dataFileIterator)
+                throws Exception {
+            List<CopyFileInfo> result = new ArrayList<>();
+            while (dataFileIterator.hasNext()) {
+                CopyFileInfo file = dataFileIterator.next();
+                Path sourcePath = new Path(file.sourceFilePath());
+                Path targetPath = new Path(file.targetFilePath());
+                CopyFilesUtil.copyFiles(
+                        sourceTable.fileIO(), targetTable.fileIO(), 
sourcePath, targetPath, false);
+                result.add(file);
+            }
+            return result.iterator();
+        }
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFileInfo.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFileInfo.java
new file mode 100644
index 0000000000..254c493d7e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFileInfo.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import java.io.Serializable;
+
+/** The information of copy data file. */
+public class CopyFileInfo implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String sourceFilePath;
+
+    private final String targetFilePath;
+
+    private final byte[] partition;
+
+    private final int bucket;
+
+    private final int totalBuckets;
+
+    private final byte[] dataFileMeta;
+
+    public CopyFileInfo(
+            String sourceFilePath,
+            String targetFilePath,
+            byte[] partition,
+            int bucket,
+            byte[] dataFileMeta) {
+        this.sourceFilePath = sourceFilePath;
+        this.targetFilePath = targetFilePath;
+        this.partition = partition;
+        this.bucket = bucket;
+        this.totalBuckets = 0;
+        this.dataFileMeta = dataFileMeta;
+    }
+
+    public CopyFileInfo(
+            String sourceFilePath,
+            String targetFilePath,
+            byte[] partition,
+            int bucket,
+            int totalBuckets,
+            byte[] dataFileMeta) {
+        this.sourceFilePath = sourceFilePath;
+        this.targetFilePath = targetFilePath;
+        this.partition = partition;
+        this.bucket = bucket;
+        this.totalBuckets = totalBuckets;
+        this.dataFileMeta = dataFileMeta;
+    }
+
+    public String sourceFilePath() {
+        return sourceFilePath;
+    }
+
+    public String targetFilePath() {
+        return targetFilePath;
+    }
+
+    public byte[] partition() {
+        return partition;
+    }
+
+    public int bucket() {
+        return bucket;
+    }
+
+    public int totalBuckets() {
+        return totalBuckets;
+    }
+
+    public byte[] dataFileMeta() {
+        return dataFileMeta;
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesCommitOperator.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesCommitOperator.java
new file mode 100644
index 0000000000..d2284e5e55
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesCommitOperator.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexFileMetaSerializer;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.SparkSession;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import scala.Tuple2;
+
+/** Commit operator for copy files. */
+public class CopyFilesCommitOperator extends CopyFilesOperator {
+
+    private DataFileMetaSerializer dataFileSerializer;
+
+    private IndexFileMetaSerializer indexFileSerializer;
+
+    public CopyFilesCommitOperator(
+            SparkSession spark, Catalog sourceCatalog, Catalog targetCatalog) {
+        super(spark, sourceCatalog, targetCatalog);
+        this.dataFileSerializer = new DataFileMetaSerializer();
+        this.indexFileSerializer = new IndexFileMetaSerializer();
+    }
+
+    public void execute(
+            Identifier targetIdentifier,
+            JavaRDD<CopyFileInfo> dataCopyFileInfoRdd,
+            JavaRDD<CopyFileInfo> indexCopyFileInfoRdd)
+            throws Exception {
+        FileStoreTable targetTable = (FileStoreTable) 
targetCatalog.getTable(targetIdentifier);
+
+        // deserialize data file meta
+        Map<Tuple2<BinaryRow, Integer>, DataFileInfo> dataFileMetaMap =
+                deserializeDataFileMeta(dataCopyFileInfoRdd);
+
+        // deserialize index file meta
+        Map<Tuple2<BinaryRow, Integer>, IndexFileInfo> indexFileMetaMap =
+                deserializeIndexFileMeta(indexCopyFileInfoRdd);
+
+        // construct commit messages
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        for (Map.Entry<Tuple2<BinaryRow, Integer>, DataFileInfo> entry :
+                dataFileMetaMap.entrySet()) {
+            Tuple2<BinaryRow, Integer> partitionAndBucket = entry.getKey();
+            DataFileInfo dataFileInfo = entry.getValue();
+            List<DataFileMeta> dataFileMetas = dataFileInfo.dataFileMetas();
+            List<IndexFileMeta> indexFileMetas = new ArrayList<>();
+            if (indexFileMetaMap.containsKey(partitionAndBucket)) {
+                IndexFileInfo indexFileInfo = 
indexFileMetaMap.get(partitionAndBucket);
+                indexFileMetas.addAll(indexFileInfo.indexFileMetas());
+            }
+            commitMessages.add(
+                    FileMetaUtils.createCommitMessage(
+                            partitionAndBucket._1,
+                            partitionAndBucket._2,
+                            dataFileInfo.totalBuckets(),
+                            dataFileMetas,
+                            indexFileMetas));
+        }
+        try (BatchTableCommit commit =
+                
targetTable.newBatchWriteBuilder().withOverwrite().newCommit()) {
+            commit.commit(commitMessages);
+        }
+    }
+
+    private Map<Tuple2<BinaryRow, Integer>, DataFileInfo> 
deserializeDataFileMeta(
+            JavaRDD<CopyFileInfo> copyFileInfoRdd) throws IOException {
+        Map<Tuple2<BinaryRow, Integer>, DataFileInfo> result = new HashMap<>();
+        if (copyFileInfoRdd == null) {
+            return result;
+        }
+        List<CopyFileInfo> copyFileInfos = copyFileInfoRdd.collect();
+        for (CopyFileInfo copyDataFileInfo : copyFileInfos) {
+            BinaryRow partition =
+                    
SerializationUtils.deserializeBinaryRow(copyDataFileInfo.partition());
+            int bucket = copyDataFileInfo.bucket();
+            DataFileInfo dataFileInfo =
+                    result.computeIfAbsent(
+                            new Tuple2<>(partition, bucket),
+                            k ->
+                                    new DataFileInfo(
+                                            partition,
+                                            bucket,
+                                            copyDataFileInfo.totalBuckets(),
+                                            new ArrayList<>()));
+            dataFileInfo
+                    .dataFileMetas()
+                    
.add(dataFileSerializer.deserializeFromBytes(copyDataFileInfo.dataFileMeta()));
+        }
+        return result;
+    }
+
+    private Map<Tuple2<BinaryRow, Integer>, IndexFileInfo> 
deserializeIndexFileMeta(
+            JavaRDD<CopyFileInfo> copyFileInfoRdd) throws IOException {
+        Map<Tuple2<BinaryRow, Integer>, IndexFileInfo> result = new 
HashMap<>();
+        if (copyFileInfoRdd == null) {
+            return result;
+        }
+        List<CopyFileInfo> copyFileInfos = copyFileInfoRdd.collect();
+        for (CopyFileInfo copyIndexFileInfo : copyFileInfos) {
+            BinaryRow partition =
+                    
SerializationUtils.deserializeBinaryRow(copyIndexFileInfo.partition());
+            int bucket = copyIndexFileInfo.bucket();
+            IndexFileInfo indexFileInfo =
+                    result.computeIfAbsent(
+                            new Tuple2<>(partition, bucket),
+                            k -> new IndexFileInfo(partition, bucket, new 
ArrayList<>()));
+            indexFileInfo
+                    .indexFileMetas()
+                    .add(
+                            indexFileSerializer.deserializeFromBytes(
+                                    copyIndexFileInfo.dataFileMeta()));
+        }
+        return result;
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesOperator.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesOperator.java
new file mode 100644
index 0000000000..073008634c
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesOperator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.catalog.Catalog;
+
+import org.apache.spark.sql.SparkSession;
+
+/** Base class for copy files. */
+public abstract class CopyFilesOperator {
+
+    protected final SparkSession spark;
+    protected final Catalog sourceCatalog;
+    protected final Catalog targetCatalog;
+
+    public CopyFilesOperator(SparkSession spark, Catalog sourceCatalog, 
Catalog targetCatalog) {
+        this.spark = spark;
+        this.sourceCatalog = sourceCatalog;
+        this.targetCatalog = targetCatalog;
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
new file mode 100644
index 0000000000..092fbc238c
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.PojoDataFileMeta;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/** Utils for copy files. */
+public class CopyFilesUtil {
+
+    public static void copyFiles(
+            FileIO sourceFileIO,
+            FileIO targetFileIO,
+            Path sourcePath,
+            Path targetPath,
+            boolean overwrite)
+            throws IOException {
+        try (SeekableInputStream is = sourceFileIO.newInputStream(sourcePath);
+                PositionOutputStream os = 
targetFileIO.newOutputStream(targetPath, overwrite)) {
+            IOUtils.copy(is, os);
+        }
+    }
+
+    public static DataFileMeta toNewDataFileMeta(
+            DataFileMeta oldFileMeta, String newFileName, long newSchemaId) {
+        String newExternalPath =
+                externalPathDir(oldFileMeta.externalPath().orElse(null))
+                        .map(dir -> dir + "/" + newFileName)
+                        .orElse(null);
+        return new PojoDataFileMeta(
+                newFileName,
+                oldFileMeta.fileSize(),
+                oldFileMeta.rowCount(),
+                oldFileMeta.minKey(),
+                oldFileMeta.maxKey(),
+                oldFileMeta.keyStats(),
+                oldFileMeta.valueStats(),
+                oldFileMeta.minSequenceNumber(),
+                oldFileMeta.maxSequenceNumber(),
+                newSchemaId,
+                oldFileMeta.level(),
+                oldFileMeta.extraFiles(),
+                oldFileMeta.creationTime(),
+                oldFileMeta.deleteRowCount().orElse(null),
+                oldFileMeta.embeddedIndex(),
+                oldFileMeta.fileSource().orElse(null),
+                oldFileMeta.valueStatsCols(),
+                newExternalPath,
+                oldFileMeta.firstRowId(),
+                oldFileMeta.writeCols());
+    }
+
+    public static IndexFileMeta toNewIndexFileMeta(IndexFileMeta oldFileMeta, 
String newFileName) {
+        String newExternalPath =
+                externalPathDir(oldFileMeta.externalPath())
+                        .map(dir -> dir + "/" + newFileName)
+                        .orElse(null);
+        return new IndexFileMeta(
+                oldFileMeta.indexType(),
+                newFileName,
+                oldFileMeta.fileSize(),
+                oldFileMeta.rowCount(),
+                oldFileMeta.dvRanges(),
+                newExternalPath);
+    }
+
+    public static Optional<String> externalPathDir(String externalPath) {
+        return Optional.ofNullable(externalPath)
+                .map(Path::new)
+                .map(p -> p.getParent().toUri().toString());
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopySchemaOperator.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopySchemaOperator.java
new file mode 100644
index 0000000000..669a9f76a2
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopySchemaOperator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.FileStore;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
+
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Copy schema and get latest snapshot. */
+public class CopySchemaOperator extends CopyFilesOperator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CopySchemaOperator.class);
+
+    public CopySchemaOperator(SparkSession spark, Catalog sourceCatalog, 
Catalog targetCatalog) {
+        super(spark, sourceCatalog, targetCatalog);
+    }
+
+    public Snapshot execute(Identifier sourceIdentifier, Identifier 
targetIdentifier)
+            throws Exception {
+        Table originalSourceTable = sourceCatalog.getTable(sourceIdentifier);
+        Preconditions.checkState(
+                originalSourceTable instanceof FileStoreTable,
+                String.format(
+                        "Only support copy FileStoreTable, but this table %s 
is %s.",
+                        sourceIdentifier, sourceIdentifier.getClass()));
+        FileStoreTable sourceTable = (FileStoreTable) originalSourceTable;
+
+        // 1. create target table
+        targetCatalog.createDatabase(targetIdentifier.getDatabaseName(), true);
+
+        try {
+            Table existedTable = targetCatalog.getTable(targetIdentifier);
+            Preconditions.checkState(
+                    existedTable instanceof FileStoreTable,
+                    String.format(
+                            "existed paimon table '%s' is not a 
FileStoreTable, but a %s",
+                            targetIdentifier, 
existedTable.getClass().getName()));
+            checkCompatible(sourceTable, (FileStoreTable) existedTable);
+
+            LOG.info("paimon table '{}' already exists, use it as target 
table.", targetIdentifier);
+        } catch (Catalog.TableNotExistException e) {
+            LOG.info("create target paimon table '{}'.", targetIdentifier);
+
+            targetCatalog.createTable(
+                    targetIdentifier, 
newSchemaFromTableSchema(sourceTable.schema()), false);
+        }
+
+        // 2. get latest snapshot files
+        FileStore<?> sourceStore = sourceTable.store();
+        SnapshotManager sourceSnapshotManager = sourceStore.snapshotManager();
+        Snapshot latestSnapshot = sourceSnapshotManager.latestSnapshot();
+        return latestSnapshot;
+    }
+
+    private static Schema newSchemaFromTableSchema(TableSchema tableSchema) {
+        return new Schema(
+                ImmutableList.copyOf(tableSchema.fields()),
+                ImmutableList.copyOf(tableSchema.partitionKeys()),
+                ImmutableList.copyOf(tableSchema.primaryKeys()),
+                ImmutableMap.copyOf(
+                        Iterables.filter(
+                                tableSchema.options().entrySet(),
+                                entry -> !Objects.equals(entry.getKey(), 
CoreOptions.PATH.key()))),
+                tableSchema.comment());
+    }
+
+    private void checkCompatible(FileStoreTable sourceTable, FileStoreTable 
existedTable) {
+        Schema sourceSchema = sourceTable.schema().toSchema();
+        Schema existedSchema = existedTable.schema().toSchema();
+
+        // check bucket
+        checkState(
+                sourceTable.coreOptions().bucket() == 
existedTable.coreOptions().bucket(),
+                "source table bucket is not compatible with existed paimon 
table bucket.");
+
+        // check format
+        checkState(
+                Objects.equals(
+                        sourceTable.coreOptions().formatType(),
+                        existedTable.coreOptions().formatType()),
+                "source table format is not compatible with existed paimon 
table format.");
+
+        // check primary keys
+        List<String> sourcePrimaryKeys = sourceSchema.primaryKeys();
+        List<String> existedPrimaryKeys = existedSchema.primaryKeys();
+        checkState(
+                sourcePrimaryKeys.size() == existedPrimaryKeys.size()
+                        && new 
HashSet<>(existedPrimaryKeys).containsAll(sourcePrimaryKeys),
+                "source table primary keys is not compatible with existed 
paimon table primary keys.");
+
+        // check partition keys
+        List<String> sourcePartitionFields = sourceSchema.partitionKeys();
+        List<String> existedPartitionFields = existedSchema.partitionKeys();
+        checkState(
+                sourcePartitionFields.size() == existedPartitionFields.size()
+                        && new 
HashSet<>(existedPartitionFields).containsAll(sourcePartitionFields),
+                "source table partition keys is not compatible with existed 
paimon table partition keys.");
+
+        // check all fields
+        List<DataField> sourceFields = sourceSchema.fields();
+        List<DataField> existedFields = existedSchema.fields();
+        checkState(
+                existedFields.size() >= sourceFields.size()
+                        && new 
HashSet<>(existedFields).containsAll(sourceFields),
+                "source table fields is not compatible with existed paimon 
table fields.");
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/DataFileInfo.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/DataFileInfo.java
new file mode 100644
index 0000000000..aa828c5cae
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/DataFileInfo.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+
+import java.util.List;
+
+/** DataFileInfo. */
+public class DataFileInfo {
+
+    private final BinaryRow partition;
+    private final int bucket;
+    private final int totalBuckets;
+    private final List<DataFileMeta> dataFileMetas;
+
+    public DataFileInfo(
+            BinaryRow partition, int bucket, int totalBuckets, 
List<DataFileMeta> dataFileMetas) {
+        this.partition = partition;
+        this.bucket = bucket;
+        this.totalBuckets = totalBuckets;
+        this.dataFileMetas = dataFileMetas;
+    }
+
+    public BinaryRow partition() {
+        return partition;
+    }
+
+    public int bucket() {
+        return bucket;
+    }
+
+    public int totalBuckets() {
+        return totalBuckets;
+    }
+
+    public List<DataFileMeta> dataFileMetas() {
+        return dataFileMetas;
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/IndexFileInfo.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/IndexFileInfo.java
new file mode 100644
index 0000000000..7512a9f133
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/IndexFileInfo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+
+import java.util.List;
+
+/** IndexFileInfo. */
+public class IndexFileInfo {
+
+    private final BinaryRow partition;
+    private final int bucket;
+    private final List<IndexFileMeta> indexFileMetas;
+
+    public IndexFileInfo(BinaryRow partition, int bucket, List<IndexFileMeta> 
indexFileMetas) {
+        this.partition = partition;
+        this.bucket = bucket;
+        this.indexFileMetas = indexFileMetas;
+    }
+
+    public BinaryRow partition() {
+        return partition;
+    }
+
+    public int bucket() {
+        return bucket;
+    }
+
+    public List<IndexFileMeta> indexFileMetas() {
+        return indexFileMetas;
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/ListDataFilesOperator.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/ListDataFilesOperator.java
new file mode 100644
index 0000000000..faf427165c
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/ListDataFilesOperator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.spark.sql.SparkSession;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/** List data files. */
+public class ListDataFilesOperator extends CopyFilesOperator {
+
+    private final DataFileMetaSerializer dataFileSerializer;
+
+    public ListDataFilesOperator(SparkSession spark, Catalog sourceCatalog, 
Catalog targetCatalog) {
+        super(spark, sourceCatalog, targetCatalog);
+        this.dataFileSerializer = new DataFileMetaSerializer();
+    }
+
+    public List<CopyFileInfo> execute(
+            Identifier sourceIdentifier,
+            Identifier targetIdentifier,
+            @Nullable Snapshot snapshot,
+            @Nullable PartitionPredicate partitionPredicate)
+            throws Exception {
+        if (snapshot == null) {
+            return null;
+        }
+        FileStoreTable sourceTable = (FileStoreTable) 
sourceCatalog.getTable(sourceIdentifier);
+        FileStoreTable targetTable = (FileStoreTable) 
targetCatalog.getTable(targetIdentifier);
+        Iterator<ManifestEntry> manifestEntries =
+                sourceTable
+                        .newSnapshotReader()
+                        .withSnapshot(snapshot)
+                        .withPartitionFilter(partitionPredicate)
+                        .withMode(ScanMode.ALL)
+                        .readFileIterator();
+
+        List<CopyFileInfo> dataFiles = new ArrayList<>();
+        while (manifestEntries.hasNext()) {
+            ManifestEntry manifestEntry = manifestEntries.next();
+            CopyFileInfo dataFile =
+                    pickDataFiles(
+                            manifestEntry,
+                            sourceTable.store().pathFactory(),
+                            targetTable.store().pathFactory(),
+                            targetTable.schema().id());
+            dataFiles.add(dataFile);
+        }
+        return dataFiles;
+    }
+
+    private CopyFileInfo pickDataFiles(
+            ManifestEntry manifestEntry,
+            FileStorePathFactory sourceFileStorePathFactory,
+            FileStorePathFactory targetFileStorePathFactory,
+            long newSchemaId)
+            throws IOException {
+        Path dataFilePath =
+                sourceFileStorePathFactory
+                        .createDataFilePathFactory(
+                                manifestEntry.partition(), 
manifestEntry.bucket())
+                        .toPath(manifestEntry);
+        Path targetDataFilePath =
+                targetFileStorePathFactory
+                        .createDataFilePathFactory(
+                                manifestEntry.partition(), 
manifestEntry.bucket())
+                        .newPath();
+        DataFileMeta fileMeta = manifestEntry.file();
+        DataFileMeta targetFileMeta =
+                CopyFilesUtil.toNewDataFileMeta(
+                        fileMeta, targetDataFilePath.getName(), newSchemaId);
+        return new CopyFileInfo(
+                dataFilePath.toString(),
+                targetDataFilePath.toString(),
+                
SerializationUtils.serializeBinaryRow(manifestEntry.partition()),
+                manifestEntry.bucket(),
+                manifestEntry.totalBuckets(),
+                dataFileSerializer.serializeToBytes(targetFileMeta));
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/ListIndexFilesOperator.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/ListIndexFilesOperator.java
new file mode 100644
index 0000000000..4e1d6c4562
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/ListIndexFilesOperator.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copy;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexFileMetaSerializer;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.SerializationUtils;
+
+import org.apache.spark.sql.SparkSession;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** List index files. */
+public class ListIndexFilesOperator extends CopyFilesOperator {
+
+    private final IndexFileMetaSerializer indexFileSerializer;
+
+    public ListIndexFilesOperator(
+            SparkSession spark, Catalog sourceCatalog, Catalog targetCatalog) {
+        super(spark, sourceCatalog, targetCatalog);
+        this.indexFileSerializer = new IndexFileMetaSerializer();
+    }
+
+    public List<CopyFileInfo> execute(
+            Identifier sourceIdentifier,
+            Identifier targetIdentifier,
+            Snapshot snapshot,
+            @Nullable PartitionPredicate partitionPredicate)
+            throws Exception {
+        if (snapshot == null) {
+            return null;
+        }
+        if (snapshot.indexManifest() == null) {
+            return null;
+        }
+        FileStoreTable sourceTable = (FileStoreTable) 
sourceCatalog.getTable(sourceIdentifier);
+        FileStoreTable targetTable = (FileStoreTable) 
targetCatalog.getTable(targetIdentifier);
+        List<CopyFileInfo> indexFiles = new ArrayList<>();
+        IndexFileHandler sourceIndexHandler = 
sourceTable.store().newIndexFileHandler();
+        FileStorePathFactory targetFileStorePathFactory = 
targetTable.store().pathFactory();
+        List<IndexManifestEntry> indexManifestEntries =
+                
sourceIndexHandler.readManifestWithIOException(snapshot.indexManifest());
+        for (IndexManifestEntry indexManifestEntry : indexManifestEntries) {
+            if (partitionPredicate == null
+                    || 
partitionPredicate.test(indexManifestEntry.partition())) {
+                CopyFileInfo indexFile =
+                        pickIndexFiles(
+                                indexManifestEntry, sourceIndexHandler, 
targetFileStorePathFactory);
+                indexFiles.add(indexFile);
+            }
+        }
+        return indexFiles;
+    }
+
+    private CopyFileInfo pickIndexFiles(
+            IndexManifestEntry indexManifestEntry,
+            IndexFileHandler sourceIndexFileHandler,
+            FileStorePathFactory targetFileStorePathFactory)
+            throws IOException {
+        Path indexFilePath = 
sourceIndexFileHandler.filePath(indexManifestEntry);
+        Path targetIndexFilePath =
+                targetFileStorePathFactory
+                        .indexFileFactory(
+                                indexManifestEntry.partition(), 
indexManifestEntry.bucket())
+                        .newPath();
+        IndexFileMeta fileMeta = indexManifestEntry.indexFile();
+        IndexFileMeta targetFileMeta =
+                CopyFilesUtil.toNewIndexFileMeta(fileMeta, 
targetIndexFilePath.getName());
+        return new CopyFileInfo(
+                indexFilePath.toString(),
+                targetIndexFilePath.toString(),
+                
SerializationUtils.serializeBinaryRow(indexManifestEntry.partition()),
+                indexManifestEntry.bucket(),
+                indexFileSerializer.serializeToBytes(targetFileMeta));
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CopyFilesProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CopyFilesProcedure.java
new file mode 100644
index 0000000000..8b29dabb2d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CopyFilesProcedure.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.spark.SparkUtils;
+import org.apache.paimon.spark.catalog.SparkBaseCatalog;
+import org.apache.paimon.spark.copy.CopyDataFilesOperator;
+import org.apache.paimon.spark.copy.CopyFileInfo;
+import org.apache.paimon.spark.copy.CopyFilesCommitOperator;
+import org.apache.paimon.spark.copy.CopySchemaOperator;
+import org.apache.paimon.spark.copy.ListDataFilesOperator;
+import org.apache.paimon.spark.copy.ListIndexFilesOperator;
+import org.apache.paimon.spark.utils.CatalogUtils;
+import org.apache.paimon.spark.utils.SparkProcedureUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Copy files procedure for latest snapshot.
+ *
+ * <pre><code>
+ *  CALL sys.copy(source_table => 'clg.db.tbl', target_table => 
'clg.db.tbl_copy')
+ * </code></pre>
+ */
+public class CopyFilesProcedure extends BaseProcedure {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CopyFilesProcedure.class);
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.required("source_table", StringType),
+                ProcedureParameter.required("target_table", StringType),
+                ProcedureParameter.optional("where", StringType)
+            };
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    protected CopyFilesProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        // 1. get source catalog and identifier
+        SparkUtils.CatalogAndIdentifier sourceCatalogAndIdentifier =
+                toCatalogAndIdentifier(args.getString(0), 
PARAMETERS[0].name(), tableCatalog());
+        Preconditions.checkState(
+                sourceCatalogAndIdentifier.catalog() instanceof 
SparkBaseCatalog,
+                String.format(
+                        "%s is not a Paimon catalog", 
sourceCatalogAndIdentifier.catalog().name()));
+        SparkBaseCatalog sourceCatalog = (SparkBaseCatalog) 
sourceCatalogAndIdentifier.catalog();
+        Catalog sourcePaimonCatalog = sourceCatalog.paimonCatalog();
+        Identifier sourceTableIdentifier =
+                CatalogUtils.toIdentifier(
+                        sourceCatalogAndIdentifier.identifier(), 
sourceCatalog.paimonCatalogName());
+
+        // 2. get target catalog and identifier
+        SparkUtils.CatalogAndIdentifier targetCatalogIdentifier =
+                toCatalogAndIdentifier(args.getString(1), 
PARAMETERS[1].name(), tableCatalog());
+        Preconditions.checkState(
+                targetCatalogIdentifier.catalog() instanceof SparkBaseCatalog,
+                String.format(
+                        "%s is not a Paimon catalog", 
targetCatalogIdentifier.catalog().name()));
+        SparkBaseCatalog targetCatalog = (SparkBaseCatalog) 
targetCatalogIdentifier.catalog();
+        Catalog targetPaimonCatalog = targetCatalog.paimonCatalog();
+        Identifier targetTableIdentifier =
+                CatalogUtils.toIdentifier(
+                        targetCatalogIdentifier.identifier(), 
targetCatalog.paimonCatalogName());
+
+        // 3. get partition predicate
+        String where = args.isNullAt(2) ? null : args.getString(2);
+        PartitionPredicate partitionPredicate = null;
+        try {
+            partitionPredicate =
+                    getPartitionPredicate(
+                            where,
+                            sourceCatalogAndIdentifier,
+                            sourcePaimonCatalog,
+                            sourceTableIdentifier);
+        } catch (Catalog.TableNotExistException e) {
+            throw new RuntimeException("Failed to get partition predicate", e);
+        }
+
+        // 4. do copy
+        try {
+            doCopy(
+                    sourcePaimonCatalog,
+                    targetPaimonCatalog,
+                    sourceTableIdentifier,
+                    targetTableIdentifier,
+                    partitionPredicate);
+            return new InternalRow[] {newInternalRow(true)};
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to copy files", e);
+        }
+    }
+
+    private void doCopy(
+            Catalog sourcePaimonCatalog,
+            Catalog targetPaimonCatalog,
+            Identifier sourceTableIdentifier,
+            Identifier targetTableIdentifier,
+            @Nullable PartitionPredicate partitionPredicate)
+            throws Exception {
+        CopySchemaOperator copySchemaOperator =
+                new CopySchemaOperator(spark(), sourcePaimonCatalog, 
targetPaimonCatalog);
+        ListDataFilesOperator listDataFilesOperator =
+                new ListDataFilesOperator(spark(), sourcePaimonCatalog, 
targetPaimonCatalog);
+        CopyDataFilesOperator copyDataFilesOperator =
+                new CopyDataFilesOperator(spark(), sourcePaimonCatalog, 
targetPaimonCatalog);
+        CopyFilesCommitOperator copyFilesCommitOperator =
+                new CopyFilesCommitOperator(spark(), sourcePaimonCatalog, 
targetPaimonCatalog);
+        ListIndexFilesOperator listIndexFilesOperator =
+                new ListIndexFilesOperator(spark(), sourcePaimonCatalog, 
targetPaimonCatalog);
+
+        // 1. create target table and get latest snapshot
+        Snapshot snapshot =
+                copySchemaOperator.execute(sourceTableIdentifier, 
targetTableIdentifier);
+
+        // 2. list data and index files
+        List<CopyFileInfo> dataFilesRdd =
+                listDataFilesOperator.execute(
+                        sourceTableIdentifier, targetTableIdentifier, 
snapshot, partitionPredicate);
+        List<CopyFileInfo> indexFilesRdd =
+                listIndexFilesOperator.execute(
+                        sourceTableIdentifier, targetTableIdentifier, 
snapshot, partitionPredicate);
+
+        // 3. copy data and index files
+        JavaRDD<CopyFileInfo> dataCopyFileInfoRdd =
+                copyDataFilesOperator.execute(
+                        sourceTableIdentifier, targetTableIdentifier, 
dataFilesRdd);
+        JavaRDD<CopyFileInfo> indexCopeFileInfoRdd =
+                copyDataFilesOperator.execute(
+                        sourceTableIdentifier, targetTableIdentifier, 
indexFilesRdd);
+        // 4. commit table
+        copyFilesCommitOperator.execute(
+                targetTableIdentifier, dataCopyFileInfoRdd, 
indexCopeFileInfoRdd);
+    }
+
+    private PartitionPredicate getPartitionPredicate(
+            String where,
+            SparkUtils.CatalogAndIdentifier catalogAndIdentifier,
+            Catalog sourceCatalog,
+            Identifier sourceIdentifier)
+            throws Catalog.TableNotExistException {
+        DataSourceV2Relation relation = 
createRelation(catalogAndIdentifier.identifier());
+        Table originalSourceTable = sourceCatalog.getTable(sourceIdentifier);
+        Preconditions.checkState(
+                originalSourceTable instanceof FileStoreTable,
+                String.format(
+                        "Only support copy FileStoreTable, but this table %s 
is %s.",
+                        sourceIdentifier, sourceIdentifier.getClass()));
+        FileStoreTable sourceTable = (FileStoreTable) originalSourceTable;
+
+        PartitionPredicate partitionPredicate =
+                SparkProcedureUtils.convertToPartitionPredicate(
+                        where, sourceTable.schema().logicalPartitionType(), 
spark(), relation);
+        return partitionPredicate;
+    }
+
+    public static ProcedureBuilder builder() {
+        return new Builder<CopyFilesProcedure>() {
+            @Override
+            public CopyFilesProcedure doBuild() {
+                return new CopyFilesProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "CopyFilesProcedure";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CopyFilesProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CopyFilesProcedureTest.scala
new file mode 100644
index 0000000000..6c35af37e8
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CopyFilesProcedureTest.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+import java.util.concurrent.ThreadLocalRandom
+
+class CopyFilesProcedureTest extends PaimonSparkTestBase {
+
+  test("Paimon copy files procedure: append table") {
+    val random = ThreadLocalRandom.current().nextInt(100000);
+    withTable(s"tbl$random") {
+      sql(s"""
+             |CREATE TABLE tbl$random (k INT, v STRING)
+             |""".stripMargin)
+
+      sql(s"INSERT INTO tbl$random VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+      sql(s"INSERT INTO tbl$random VALUES (4, 'd'), (5, 'e'), (6, 'f')")
+
+      checkAnswer(
+        sql(s"CALL sys.copy(source_table => 'tbl$random', target_table => 
'target_tbl$random')"),
+        Row(true) :: Nil
+      )
+
+      checkAnswer(
+        sql(s"SELECT * FROM target_tbl$random"),
+        sql(s"SELECT * FROM tbl$random")
+      )
+
+      checkAnswer(
+        sql(s"CALL sys.copy(source_table => 'tbl$random', target_table => 
'target_tbl$random')"),
+        Row(true) :: Nil
+      )
+
+      checkAnswer(
+        sql(s"SELECT * FROM target_tbl$random"),
+        sql(s"SELECT * FROM tbl$random")
+      )
+    }
+  }
+
+  test("Paimon copy files procedure: partitioned append table") {
+    val random = ThreadLocalRandom.current().nextInt(100000);
+    withTable(s"tbl$random") {
+      sql(s"""
+             |CREATE TABLE tbl$random (k INT, v STRING, dt STRING, hh INT)
+             |PARTITIONED BY (dt, hh)
+             |""".stripMargin)
+
+      sql(s"INSERT INTO tbl$random VALUES (1, 'a', '2025-08-17', 5), (2, 'b', 
'2025-10-06', 0)")
+      checkAnswer(
+        sql(s"CALL sys.copy(source_table => 'tbl$random', target_table => 
'target_tbl$random')"),
+        Row(true) :: Nil
+      )
+
+      checkAnswer(
+        sql(s"SELECT * FROM target_tbl$random"),
+        sql(s"SELECT * FROM tbl$random")
+      )
+
+      checkAnswer(
+        sql(s"CALL sys.copy(source_table => 'tbl$random', target_table => 
'target_tbl$random')"),
+        Row(true) :: Nil
+      )
+
+      checkAnswer(
+        sql(s"SELECT * FROM target_tbl$random"),
+        sql(s"SELECT * FROM tbl$random")
+      )
+    }
+  }
+
+  test("Paimon copy files procedure: partitioned append table with partition 
filter") {
+    val random = ThreadLocalRandom.current().nextInt(100000);
+    withTable(s"tbl$random") {
+      sql(s"""
+             |CREATE TABLE tbl$random (k INT, v STRING, dt STRING, hh INT)
+             |PARTITIONED BY (dt, hh)
+             |""".stripMargin)
+
+      sql(s"INSERT INTO tbl$random VALUES (1, 'a', '2025-08-17', 5), (2, 'b', 
'2025-10-06', 0)")
+      checkAnswer(
+        sql(
+          s"""CALL sys.copy(source_table => 'tbl$random', target_table => 
'target_tbl$random', where => "dt = '2025-08-17' and hh = 5")"""),
+        Row(true) :: Nil
+      )
+
+      checkAnswer(
+        sql(s"SELECT * FROM target_tbl$random"),
+        sql(s"SELECT * FROM tbl$random WHERE dt = '2025-08-17' and hh = 5")
+      )
+    }
+  }
+
+  test("Paimon copy files procedure: pk table") {
+    val random = ThreadLocalRandom.current().nextInt(100000);
+    withTable(s"tbl$random") {
+      sql(s"""
+             |CREATE TABLE tbl$random (k INT, v STRING, dt STRING, hh INT)
+             |TBLPROPERTIES (
+             |  'primary-key' = 'dt,hh,k',
+             |  'bucket' = '-1')
+             |PARTITIONED BY (dt, hh)
+             |""".stripMargin)
+
+      sql(s"INSERT INTO tbl$random VALUES (1, 'a', '2025-08-17', 5), (2, 'b', 
'2025-10-06', 0)")
+      checkAnswer(
+        sql(s"CALL sys.copy(source_table => 'tbl$random', target_table => 
'target_tbl$random')"),
+        Row(true) :: Nil
+      )
+
+      checkAnswer(
+        sql(s"SELECT * FROM target_tbl$random"),
+        sql(s"SELECT * FROM tbl$random")
+      )
+
+    }
+  }
+
+  test("Paimon copy files procedure: schema change") {
+    val random = ThreadLocalRandom.current().nextInt(100000);
+    withTable(s"tbl$random") {
+      // source table
+      sql(s"""
+             |CREATE TABLE tbl$random (k INT, v STRING, dt STRING, hh INT)
+             |PARTITIONED BY (dt, hh)
+             |""".stripMargin)
+      sql(s"INSERT INTO tbl$random VALUES (1, 'a', '2025-08-17', 5), (2, 'b', 
'2025-10-06', 0)")
+
+      sql(s"""
+             |ALTER TABLE tbl$random
+             |DROP COLUMN v
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"CALL sys.copy(source_table => 'tbl$random', target_table => 
'target_tbl$random')"),
+        Row(true) :: Nil
+      )
+
+      checkAnswer(
+        sql(s"SELECT * FROM target_tbl$random"),
+        sql(s"SELECT * FROM tbl$random")
+      )
+
+    }
+  }
+
+  test("Paimon copy files procedure: copy to existed table") {
+    val random = ThreadLocalRandom.current().nextInt(100000);
+    withTable(s"tbl$random") {
+      // source table
+      sql(s"""
+             |CREATE TABLE tbl$random (k INT, v STRING, dt STRING, hh INT)
+             |PARTITIONED BY (dt, hh)
+             |""".stripMargin)
+      sql(s"INSERT INTO tbl$random VALUES (1, 'a', '2025-08-17', 5), (2, 'b', 
'2025-10-06', 0)")
+
+      // target table
+      sql(s"""
+             |CREATE TABLE target_tbl$random (k INT, v STRING, dt STRING, hh 
INT, v2 STRING)
+             |PARTITIONED BY (dt, hh)
+             |""".stripMargin)
+      // partition should overwrite
+      sql(
+        s"INSERT INTO target_tbl$random VALUES (3, 'c', '2025-08-17', 5, 
'c1'), (4, 'd', '2025-08-17', 6, 'd1')")
+
+      checkAnswer(
+        sql(s"CALL sys.copy(source_table => 'tbl$random', target_table => 
'target_tbl$random')"),
+        Row(true) :: Nil
+      )
+
+      checkAnswer(
+        sql(s"SELECT * FROM target_tbl$random WHERE dt = '2025-08-17' and hh = 
5"),
+        Row(1, "a", "2025-08-17", 5, null)
+      )
+
+      checkAnswer(
+        sql(s"SELECT * FROM target_tbl$random WHERE dt = '2025-10-06' and hh = 
0"),
+        Row(2, "b", "2025-10-06", 0, null)
+      )
+
+      checkAnswer(
+        sql(s"SELECT * FROM target_tbl$random WHERE dt = '2025-08-17' and hh = 
6"),
+        Row(4, "d", "2025-08-17", 6, "d1")
+      )
+    }
+  }
+
+  test("Paimon copy files procedure: copy to existed compatible table") {
+    val random = ThreadLocalRandom.current().nextInt(100000);
+    withTable(s"tbl$random") {
+      // source table
+      sql(s"""
+             |CREATE TABLE tbl$random (k INT, v STRING, dt STRING, hh INT)
+             |PARTITIONED BY (dt, hh)
+             |""".stripMargin)
+
+      // target table
+      sql(s"""
+             |CREATE TABLE target_tbl$random (k INT, v2 STRING, dt STRING, hh 
INT)
+             |PARTITIONED BY (dt, hh)
+             |""".stripMargin)
+
+      assertThrows[RuntimeException] {
+        sql(s"CALL sys.copy(source_table => 'tbl$random', target_table => 
'target_tbl$random')")
+      }
+    }
+  }
+}

Reply via email to