This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0197aab272 [procedure] Merge migrate_file to migrate_table (#5240)
0197aab272 is described below

commit 0197aab272e6d3fc4df6f065de48ef243251f0af
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 10 14:38:24 2025 +0800

    [procedure] Merge migrate_file to migrate_table (#5240)
---
 docs/content/flink/procedures.md                   |  34 +-----
 docs/content/migration/migration-from-hive.md      | 112 ++++++-----------
 docs/content/spark/procedures.md                   |  12 --
 .../java/org/apache/paimon/migrate/Migrator.java   |   2 +-
 .../flink/procedure/MigrateFileProcedure.java      | 112 -----------------
 .../ProcedurePositionalArgumentsITCase.java        |   2 -
 .../paimon/flink/action/MigrateFileAction.java     |  66 -----------
 .../flink/action/MigrateFileActionFactory.java     |  83 -------------
 .../paimon/flink/action/MigrateTableAction.java    |   4 +-
 .../flink/procedure/MigrateFileProcedure.java      | 101 ----------------
 .../flink/procedure/MigrateTableProcedure.java     |  36 ++++--
 .../services/org.apache.paimon.factories.Factory   |   2 -
 .../hive/procedure/MigrateFileProcedureITCase.java |  70 +++--------
 .../procedure/MigrateTableProcedureITCase.java     |  58 +++------
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 -
 .../spark/procedure/MigrateFileProcedure.java      | 132 ---------------------
 .../spark/procedure/MigrateFileProcedureTest.scala |  10 +-
 17 files changed, 105 insertions(+), 733 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 84cbfe9068..495a96b074 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -532,10 +532,6 @@ All available procedures are listed below.
    <tr>
       <td>migrate_table</td>
       <td>
-         -- for Flink 1.18<br/>
-         -- migrate hive table to a paimon table.<br/>
-         CALL [catalog.]sys.migrate_table('connector', 'tableIdentifier', 
'options'[, &ltparallelism&gt])<br/><br/>
-         -- for Flink 1.19 and later<br/>
          -- migrate hive table to a paimon table.<br/>
          CALL [catalog.]sys.migrate_table(connector => 'connector', 
source_table => 'tableIdentifier', options => 'options'[, &ltparallelism => 
parallelism&gt])<br/><br/>
       </td>
@@ -543,39 +539,13 @@ All available procedures are listed below.
          To migrate hive table to a paimon table. Argument:
             <li>connector: the origin table's type to be migrated, such as 
hive. Cannot be empty.</li>
             <li>source_table: name of the origin table to be migrated. Cannot 
be empty.</li>
+            <li>target_table: name of the target paimon table to migrate. If 
not set would keep the same name with origin table</li>
             <li>options: the table options of the paimon table to migrate.</li>
             <li>parallelism: the parallelism for migrate process, default is 
core numbers of machine.</li>
-      </td>
-      <td>
-         -- for Flink 1.18<br/>
-         CALL sys.migrate_table('hive', 'db01.t1', 'file.format=parquet', 6)
-         -- for Flink 1.19 and later<br/>
-         CALL sys.migrate_table(connector => 'hive', source_table => 
'db01.t1', options => 'file.format=parquet', parallelism => 6)
-      </td>
-   </tr>
-   <tr>
-      <td>migrate_file</td>
-      <td>
-         -- for Flink 1.18<br/>
-         -- migrate files from hive table to a paimon table.<br/>
-         CALL [catalog.]sys.migrate_file('connector', 'srcTableIdentifier', 
'destTableIdentifier', [, &ltdelete_origin&gt, &ltparallelism&gt])<br/><br/>
-         -- for Flink 1.19 and later<br/>
-         -- migrate hive table to a paimon table.<br/>
-         CALL [catalog.]sys.migrate_file(connector => 'connector', 
source_table => 'srcTableIdentifier', target_table => 'destTableIdentifier'[, 
&ltdelete_origin => bool&gt, &ltparallelism => parallelism&gt])<br/><br/>
-      </td>
-      <td>
-         To migrate files from hive table to a paimon table. Argument:
-            <li>connector: the origin table's type to be migrated, such as 
hive. Cannot be empty.</li>
-            <li>source_table: name of the origin table to migrate. Cannot be 
empty.</li>
-            <li>target_table: name of the target table to be migrated. Cannot 
be empty.</li>
             <li>delete_origin: If had set target_table, can set delete_origin 
to decide whether delete the origin table metadata from hms after migrate. 
Default is true</li>
-            <li>parallelism: the parallelism for migrate process, default is 
core numbers of machine.</li>
       </td>
       <td>
-         -- for Flink 1.18<br/>
-         CALL sys.migrate_file('hive', 'default.T', 'default.T2', true, 6)
-         -- for Flink 1.19 and later<br/>
-         CALL sys.migrate_file(connector => 'hive', source_table => 
'default.T', target_table => 'default.T2', delete_origin => true, parallelism 
=> 6)
+         CALL sys.migrate_table(connector => 'hive', source_table => 
'db01.t1', options => 'file.format=parquet', parallelism => 6)
       </td>
    </tr>
    <tr>
diff --git a/docs/content/migration/migration-from-hive.md 
b/docs/content/migration/migration-from-hive.md
index b30b2680cc..03ff208ee5 100644
--- a/docs/content/migration/migration-from-hive.md
+++ b/docs/content/migration/migration-from-hive.md
@@ -30,59 +30,41 @@ Apache Hive supports ORC, Parquet file formats that could 
be migrated to Paimon.
 When migrating data to a paimon table, the origin table will be permanently 
disappeared. So please back up your data if you
 still need the original table. The migrated table will be [append table]({{< 
ref "append-table/overview" >}}).
 
-Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate 
File Procedure to totally migrate a table from hive to paimon.
+Now, we can use paimon hive catalog with Migrate Table Procedure to totally 
migrate a table from hive to paimon.
 At the same time, you can use paimon hive catalog with Migrate Database 
Procedure to fully synchronize all tables in the database to paimon.
 
 * Migrate Table Procedure: Paimon table does not exist, use the procedure 
upgrade hive table to paimon table. Hive table will disappear after action done.
 * Migrate Database Procedure: Paimon table does not exist, use the procedure 
upgrade all hive tables in database to paimon table. All hive tables will 
disappear after action done.
-* Migrate File Procedure:  Paimon table already exists, use the procedure to 
migrate files from hive table to paimon table. **Notice that, Hive table will 
also disappear after action done.**
 
 These three actions now support file format of hive "orc" and "parquet" and 
"avro".
 
 <span style="color: red; "> **We highly recommend to back up hive table data 
before migrating, because migrating action is not atomic. If been interrupted 
while migrating, you may lose your data.** </span>
 
-## Example for Migration
-
-**Migrate Hive Table**
-
-Command: <br>
-
-***CALL <font color="green">sys.migrate_table</font>(&#39;hive&#39;, 
&#39;&lt;hive_database&gt;.&lt;hive_tablename&gt;&#39;, 
&#39;&lt;paimon_tableconf&gt;&#39;);***
-
-**Example**
+## Migrate Hive Table
 
+{{< tabs "migrate table" >}}
+{{< tab "Flink SQL" >}}
 ```sql
-CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 
'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
+CREATE CATALOG PAIMON WITH (
+   'type'='paimon',
+   'metastore' = 'hive',
+   'uri' = 'thrift://localhost:9083',
+   'warehouse'='/path/to/warehouse/');
 
 USE CATALOG PAIMON;
 
-CALL sys.migrate_table(connector => 'hive', source_table => 
'default.hivetable', options => 'file.format=orc');
-```
-After invoke, "hivetable" will totally convert to paimon format. Writing and 
reading the table by old "hive way" will fail.
-We can add our table properties while importing by 
sys.migrate_table('<database>.<tablename>', '<tableproperties>').
-<tableproperties> here should be separated by ",".  For example:
-
-```sql
 CALL sys.migrate_table(
-    connector => 'hive', 
-    source_table => 'my_db.wait_to_upgrate', 
-    options => 'file.format=orc,read.batch-size=2096,write-only=true'
-);
+    connector => 'hive',
+    source_table => 'default.hivetable',
+    -- You can specify the target table, and if the target table already exists
+    -- the file will be migrated directly to it
+    -- target_table => 'default.paimontarget',
+    -- You can specify delete_origin is false, this won't delete hivetable
+    -- delete_origin => false,
+    options => 'file.format=orc');
 ```
-
-If your flink version is below 1.17, you can use flink action to achieve this:
-```bash
-<FLINK_HOME>/bin/flink run \
-/path/to/paimon-flink-action-{{< version >}}.jar \
-migrate_table \
---warehouse <warehouse-path> \
---source_type hive \
---table <database.table-name> \
-[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> 
...]] \
-[--options <paimon-table-conf  [,paimon-table-conf ...]> ]
-```
-
-Example:
+{{< /tab >}}
+{{< tab "Flink Action" >}}
 ```bash
 <FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
 migrate_table \
@@ -92,35 +74,31 @@ migrate_table \
 --source_type hive \
 --table default.hive_or_paimon
 ```
+{{< /tab >}}
+{{< /tabs >}}
 
-**Migrate Hive Database**
-
-Command: <br>
-
-***CALL <font color="green">sys.migrate_database</font>(&#39;hive&#39;, 
&#39;&lt;hive_database&gt;&#39;, &#39;&lt;paimon_tableconf&gt;&#39;);***
+After invoke, "hivetable" will totally convert to paimon format. Writing and 
reading the table by old "hive way" will fail.
 
-**Example**
+## Migrate Hive Database
 
+{{< tabs "migrate database" >}}
+{{< tab "Flink SQL" >}}
 ```sql
-CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 
'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
+CREATE CATALOG PAIMON WITH (
+   'type'='paimon', 
+   'metastore' = 'hive', 
+   'uri' = 'thrift://localhost:9083', 
+   'warehouse'='/path/to/warehouse/');
 
 USE CATALOG PAIMON;
 
-CALL sys.migrate_database(connector => 'hive', source_database => 'default', 
options => 'file.format=orc');
-```
-After invoke, all tables in "default" database will totally convert to paimon 
format. Writing and reading the table by old "hive way" will fail.
-We can add our table properties while importing by 
sys.migrate_database('<database>', '<tableproperties>').
-<tableproperties> here should be separated by ",".  For example:
-
-```sql
 CALL sys.migrate_database(
-    connector => 'hive', 
-    source_database => 'my_db', 
-    options => 'file.format=orc,read.batch-size=2096,write-only=true'
-);
+    connector => 'hive',
+    source_database => 'default',
+    options => 'file.format=orc');
 ```
-
-If your flink version is below 1.17, you can use flink action to achieve this:
+{{< /tab >}}
+{{< tab "Flink Action" >}}
 ```bash
 <FLINK_HOME>/bin/flink run \
 /path/to/paimon-flink-action-{{< version >}}.jar \
@@ -141,21 +119,7 @@ Example:
 --source_type hive \
 --database default
 ```
+{{< /tab >}}
+{{< /tabs >}}
 
-**Migrate Hive File**
-
-Command: <br>
-
-***CALL <font color="green">sys.migrate_file</font>(&#39;hive&#39;, 
&#39;&lt;hive_database&gt;.&lt;hive_table_name&gt;&#39;, 
&#39;&lt;paimon_database&gt;.&lt;paimon_tablename&gt;&#39;);***
-
-**Example**
-
-```sql
-CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' = 
'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
-
-USE CATALOG PAIMON;
-
-CALL sys.migrate_file(connector => 'hive', source_table => 
'default.hivetable', target_table => 'default.paimontable');
-```
-After invoke, "hivetable" will disappear. And all files will be moved and 
renamed to paimon directory. "paimontable" here must have the same
-partition keys with "hivetable", and "paimontable" should be in unaware-bucket 
mode.
+After invoke, all tables in "default" database will totally convert to paimon 
format. Writing and reading the table by old "hive way" will fail.
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 0dd009005b..b3eee78e8d 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -230,18 +230,6 @@ This section introduce all available spark procedures 
about paimon.
       </td>
       <td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T', 
options => 'file.format=parquet', options_map => map('k1','v1'), parallelism => 
6)</td>
     </tr>
-    <tr>
-      <td>migrate_file</td>
-      <td>
-         Migrate from hive table to a paimon table. Arguments:
-            <li>source_type: the origin table's type to be migrated, such as 
hive. Cannot be empty.</li>
-            <li>source_table: name of the origin table to migrate. Cannot be 
empty.</li>
-            <li>target_table: name of the target table to be migrated. Cannot 
be empty.</li>
-            <li>delete_origin: If had set target_table, can set delete_origin 
to decide whether delete the origin table metadata from hms after migrate. 
Default is true</li>
-            <li>parallelism: the parallelism for migrate process, default is 
core numbers of machine.</li>
-      </td>
-      <td>CALL sys.migrate_file(connector => 'hive', source_table => 
'default.hivetable', target_table => 'default.paimontable', delete_origin => 
true, parallelism => 6)</td>
-    </tr>
     <tr>
       <td>remove_orphan_files</td>
       <td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
index ad6de123dc..aa611408a8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
@@ -25,5 +25,5 @@ public interface Migrator {
 
     void renameTable(boolean ignoreIfNotExists) throws Exception;
 
-    public void deleteOriginTable(boolean delete) throws Exception;
+    void deleteOriginTable(boolean delete) throws Exception;
 }
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
deleted file mode 100644
index 1e581c38cb..0000000000
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.procedure;
-
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.migrate.Migrator;
-
-import org.apache.flink.table.procedure.ProcedureContext;
-
-import java.util.Collections;
-
-/** Add file procedure to add file from hive to paimon. */
-public class MigrateFileProcedure extends ProcedureBase {
-
-    @Override
-    public String identifier() {
-        return "migrate_file";
-    }
-
-    public String[] call(
-            ProcedureContext procedureContext,
-            String connector,
-            String sourceTablePath,
-            String targetPaimonTablePath)
-            throws Exception {
-        migrateHandle(
-                connector,
-                sourceTablePath,
-                targetPaimonTablePath,
-                true,
-                Runtime.getRuntime().availableProcessors());
-        return new String[] {"Success"};
-    }
-
-    public String[] call(
-            ProcedureContext procedureContext,
-            String connector,
-            String sourceTablePath,
-            String targetPaimonTablePath,
-            boolean deleteOrigin)
-            throws Exception {
-        migrateHandle(
-                connector,
-                sourceTablePath,
-                targetPaimonTablePath,
-                deleteOrigin,
-                Runtime.getRuntime().availableProcessors());
-        return new String[] {"Success"};
-    }
-
-    public String[] call(
-            ProcedureContext procedureContext,
-            String connector,
-            String sourceTablePath,
-            String targetPaimonTablePath,
-            boolean deleteOrigin,
-            Integer parallelism)
-            throws Exception {
-        Integer p = parallelism == null ? 
Runtime.getRuntime().availableProcessors() : parallelism;
-        migrateHandle(connector, sourceTablePath, targetPaimonTablePath, 
deleteOrigin, p);
-        return new String[] {"Success"};
-    }
-
-    public void migrateHandle(
-            String connector,
-            String sourceTablePath,
-            String targetPaimonTablePath,
-            boolean deleteOrigin,
-            Integer parallelism)
-            throws Exception {
-        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
-        Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
-
-        try {
-            catalog.getTable(targetTableId);
-        } catch (Catalog.TableNotExistException e) {
-            throw new IllegalArgumentException(
-                    "Target paimon table does not exist: " + 
targetPaimonTablePath);
-        }
-
-        Migrator importer =
-                TableMigrationUtils.getImporter(
-                        connector,
-                        catalog,
-                        sourceTableId.getDatabaseName(),
-                        sourceTableId.getObjectName(),
-                        targetTableId.getDatabaseName(),
-                        targetTableId.getObjectName(),
-                        parallelism,
-                        Collections.emptyMap());
-        importer.deleteOriginTable(deleteOrigin);
-        importer.executeMigrate();
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index e4d1738667..2f32c326a7 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -418,8 +418,6 @@ public class ProcedurePositionalArgumentsITCase extends 
CatalogITCaseBase {
                 .hasMessageContaining("Only support Hive Catalog.");
         assertThatThrownBy(() -> sql("CALL sys.migrate_table('hive', 
'default.T', '')"))
                 .hasMessageContaining("Only support Hive Catalog.");
-        assertThatThrownBy(() -> sql("CALL sys.migrate_file('hive', 
'default.T', 'default.S')"))
-                .hasMessageContaining("Only support Hive Catalog.");
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
deleted file mode 100644
index e874536e78..0000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action;
-
-import org.apache.paimon.flink.procedure.MigrateFileProcedure;
-
-import org.apache.flink.table.procedure.DefaultProcedureContext;
-
-import java.util.Map;
-
-/** Migrate from external hive table to paimon table. */
-public class MigrateFileAction extends ActionBase {
-
-    private final String connector;
-    private final String sourceTable;
-    private final String targetTable;
-    private final String tableProperties;
-    private boolean deleteOrigin;
-    private Integer parallelism;
-
-    public MigrateFileAction(
-            String connector,
-            String sourceTable,
-            String targetTable,
-            boolean deleteOrigin,
-            Map<String, String> catalogConfig,
-            String tableProperties,
-            Integer parallelism) {
-        super(catalogConfig);
-        this.connector = connector;
-        this.sourceTable = sourceTable;
-        this.targetTable = targetTable;
-        this.deleteOrigin = deleteOrigin;
-        this.tableProperties = tableProperties;
-        this.parallelism = parallelism;
-    }
-
-    @Override
-    public void run() throws Exception {
-        MigrateFileProcedure migrateTableProcedure = new 
MigrateFileProcedure();
-        migrateTableProcedure.withCatalog(catalog);
-        migrateTableProcedure.call(
-                new DefaultProcedureContext(env),
-                connector,
-                sourceTable,
-                targetTable,
-                deleteOrigin,
-                parallelism);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
deleted file mode 100644
index b43fc67752..0000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action;
-
-import java.util.Map;
-import java.util.Optional;
-
-/** Action Factory for {@link MigrateFileAction}. */
-public class MigrateFileActionFactory implements ActionFactory {
-
-    public static final String IDENTIFIER = "migrate_file";
-
-    private static final String SOURCE_TYPE = "source_type";
-
-    private static final String SOURCE_TABLE = "source_table";
-
-    private static final String TARGET_TABLE = "target_table";
-
-    private static final String DELETE_ORIGIN = "delete_origin";
-
-    private static final String OPTIONS = "options";
-    private static final String PARALLELISM = "parallelism";
-
-    @Override
-    public String identifier() {
-        return IDENTIFIER;
-    }
-
-    @Override
-    public Optional<Action> create(MultipleParameterToolAdapter params) {
-        String connector = params.get(SOURCE_TYPE);
-        String sourceHiveTable = params.get(SOURCE_TABLE);
-        String targetTable = params.get(TARGET_TABLE);
-        boolean deleteOrigin = Boolean.parseBoolean(params.get(DELETE_ORIGIN));
-        Map<String, String> catalogConfig = catalogConfigMap(params);
-        String tableConf = params.get(OPTIONS);
-        Integer parallelism = Integer.parseInt(params.get(PARALLELISM));
-
-        MigrateFileAction migrateFileAction =
-                new MigrateFileAction(
-                        connector,
-                        sourceHiveTable,
-                        targetTable,
-                        deleteOrigin,
-                        catalogConfig,
-                        tableConf,
-                        parallelism);
-        return Optional.of(migrateFileAction);
-    }
-
-    @Override
-    public void printHelp() {
-        System.out.println("Action \"migrate_file\" runs a migrating job from 
hive to paimon.");
-        System.out.println();
-
-        System.out.println("Syntax:");
-        System.out.println(
-                "  migrate_file \\\n"
-                        + "--warehouse <warehouse_path> \\\n"
-                        + "--source_type hive \\\n"
-                        + "--source_table <database.table_name> \\\n"
-                        + "--target_table <database.table_name> \\\n"
-                        + "--delete_origin true \\\n"
-                        + "[--catalog_conf <key>=<value] \\\n"
-                        + "[--options <key>=<value>,<key>=<value>,...]");
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
index b12cb5f862..9bf85ab870 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
@@ -53,7 +53,9 @@ public class MigrateTableAction extends ActionBase {
                 new DefaultProcedureContext(env),
                 connector,
                 hiveTableFullName,
+                null,
                 tableProperties,
-                parallelism);
+                parallelism,
+                null);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
deleted file mode 100644
index f2f10d0874..0000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.procedure;
-
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.migrate.Migrator;
-
-import org.apache.flink.table.annotation.ArgumentHint;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.annotation.ProcedureHint;
-import org.apache.flink.table.procedure.ProcedureContext;
-
-import java.util.Collections;
-
-/** Add file procedure to add file from hive to paimon. */
-public class MigrateFileProcedure extends ProcedureBase {
-
-    @Override
-    public String identifier() {
-        return "migrate_file";
-    }
-
-    @ProcedureHint(
-            argument = {
-                @ArgumentHint(name = "connector", type = 
@DataTypeHint("STRING")),
-                @ArgumentHint(name = "source_table", type = 
@DataTypeHint("STRING")),
-                @ArgumentHint(name = "target_table", type = 
@DataTypeHint("STRING")),
-                @ArgumentHint(
-                        name = "delete_origin",
-                        type = @DataTypeHint("BOOLEAN"),
-                        isOptional = true),
-                @ArgumentHint(
-                        name = "parallelism",
-                        type = @DataTypeHint("Integer"),
-                        isOptional = true)
-            })
-    public String[] call(
-            ProcedureContext procedureContext,
-            String connector,
-            String sourceTablePath,
-            String targetPaimonTablePath,
-            Boolean deleteOrigin,
-            Integer parallelism)
-            throws Exception {
-        if (deleteOrigin == null) {
-            deleteOrigin = true;
-        }
-        Integer p = parallelism == null ? 
Runtime.getRuntime().availableProcessors() : parallelism;
-        migrateHandle(connector, sourceTablePath, targetPaimonTablePath, 
deleteOrigin, p);
-        return new String[] {"Success"};
-    }
-
-    public void migrateHandle(
-            String connector,
-            String sourceTablePath,
-            String targetPaimonTablePath,
-            boolean deleteOrigin,
-            Integer parallelism)
-            throws Exception {
-        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
-        Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
-
-        try {
-            catalog.getTable(targetTableId);
-        } catch (Catalog.TableNotExistException e) {
-            throw new IllegalArgumentException(
-                    "Target paimon table does not exist: " + 
targetPaimonTablePath);
-        }
-
-        Migrator importer =
-                TableMigrationUtils.getImporter(
-                        connector,
-                        catalog,
-                        sourceTableId.getDatabaseName(),
-                        sourceTableId.getObjectName(),
-                        targetTableId.getDatabaseName(),
-                        targetTableId.getObjectName(),
-                        parallelism,
-                        Collections.emptyMap());
-        importer.deleteOriginTable(deleteOrigin);
-        importer.executeMigrate();
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index 32a2a16dc5..98e907bcea 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.procedure;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.utils.TableMigrationUtils;
 import org.apache.paimon.migrate.Migrator;
-import org.apache.paimon.utils.ParameterUtils;
 
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
@@ -30,6 +29,8 @@ import org.apache.flink.table.procedure.ProcedureContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
+
 /** Migrate procedure to migrate hive table to paimon table. */
 public class MigrateTableProcedure extends ProcedureBase {
 
@@ -46,25 +47,33 @@ public class MigrateTableProcedure extends ProcedureBase {
             argument = {
                 @ArgumentHint(name = "connector", type = 
@DataTypeHint("STRING")),
                 @ArgumentHint(name = "source_table", type = 
@DataTypeHint("STRING")),
+                @ArgumentHint(
+                        name = "target_table",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
                 @ArgumentHint(name = "options", type = 
@DataTypeHint("STRING"), isOptional = true),
                 @ArgumentHint(
                         name = "parallelism",
                         type = @DataTypeHint("Integer"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "delete_origin",
+                        type = @DataTypeHint("BOOLEAN"),
                         isOptional = true)
             })
     public String[] call(
             ProcedureContext procedureContext,
             String connector,
-            String sourceTablePath,
+            String sourceTable,
+            String targetTable,
             String properties,
-            Integer parallelism)
+            Integer parallelism,
+            Boolean deleteOrigin)
             throws Exception {
-        properties = notnull(properties);
-
-        String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
-
-        Identifier sourceTableId = Identifier.fromString(sourceTablePath);
-        Identifier targetTableId = 
Identifier.fromString(targetPaimonTablePath);
+        Identifier sourceTableId = Identifier.fromString(sourceTable);
+        Identifier targetTableId =
+                Identifier.fromString(
+                        targetTable == null ? sourceTable + PAIMON_SUFFIX : 
targetTable);
 
         Integer p = parallelism == null ? 
Runtime.getRuntime().availableProcessors() : parallelism;
 
@@ -77,11 +86,16 @@ public class MigrateTableProcedure extends ProcedureBase {
                         targetTableId.getDatabaseName(),
                         targetTableId.getObjectName(),
                         p,
-                        
ParameterUtils.parseCommaSeparatedKeyValues(properties));
+                        parseCommaSeparatedKeyValues(notnull(properties)));
         LOG.info("create migrator success.");
+        if (deleteOrigin != null) {
+            migrator.deleteOriginTable(deleteOrigin);
+        }
         migrator.executeMigrate();
 
-        migrator.renameTable(false);
+        if (targetTable == null) {
+            migrator.renameTable(false);
+        }
         return new String[] {"Success"};
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index ae624f848d..f099e699d0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -30,7 +30,6 @@ org.apache.paimon.flink.action.ExpireTagsActionFactory
 org.apache.paimon.flink.action.ReplaceTagActionFactory
 org.apache.paimon.flink.action.ResetConsumerActionFactory
 org.apache.paimon.flink.action.MigrateTableActionFactory
-org.apache.paimon.flink.action.MigrateFileActionFactory
 org.apache.paimon.flink.action.MigrateDatabaseActionFactory
 org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory
 org.apache.paimon.flink.action.QueryServiceActionFactory
@@ -67,7 +66,6 @@ org.apache.paimon.flink.procedure.RollbackToTimestampProcedure
 org.apache.paimon.flink.procedure.RollbackToWatermarkProcedure
 org.apache.paimon.flink.procedure.MigrateTableProcedure
 org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
-org.apache.paimon.flink.procedure.MigrateFileProcedure
 org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
 org.apache.paimon.flink.procedure.QueryServiceProcedure
 org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
index 7ffc2ae54f..2ce9ede857 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
@@ -19,8 +19,6 @@
 package org.apache.paimon.hive.procedure;
 
 import org.apache.paimon.flink.action.ActionITCaseBase;
-import org.apache.paimon.flink.action.MigrateFileAction;
-import org.apache.paimon.flink.procedure.MigrateFileProcedure;
 import org.apache.paimon.hive.TestHiveMetastore;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -36,13 +34,11 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.stream.Stream;
 
-/** Tests for {@link MigrateFileProcedure}. */
+/** Tests for {@code MigrateFileProcedure}. */
 public class MigrateFileProcedureITCase extends ActionITCaseBase {
 
     private static final TestHiveMetastore TEST_HIVE_METASTORE = new 
TestHiveMetastore();
@@ -60,23 +56,17 @@ public class MigrateFileProcedureITCase extends 
ActionITCaseBase {
     }
 
     private static Stream<Arguments> testArguments() {
-        return Stream.of(
-                Arguments.of("orc", true),
-                Arguments.of("avro", true),
-                Arguments.of("parquet", true),
-                Arguments.of("orc", false),
-                Arguments.of("avro", false),
-                Arguments.of("parquet", false));
+        return Stream.of(Arguments.of("orc"), Arguments.of("avro"), 
Arguments.of("parquet"));
     }
 
     @ParameterizedTest
     @MethodSource("testArguments")
-    public void testMigrateFile(String format, boolean isNamedArgument) throws 
Exception {
-        test(format, isNamedArgument);
-        testMigrateFileAction(format, isNamedArgument);
+    public void testMigrateFile(String format) throws Exception {
+        test(format);
+        testMigrateFileAction(format);
     }
 
-    public void test(String format, boolean isNamedArgument) throws Exception {
+    public void test(String format) throws Exception {
         TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
@@ -101,21 +91,16 @@ public class MigrateFileProcedureITCase extends 
ActionITCaseBase {
         tEnv.useCatalog("PAIMON");
         tEnv.executeSql(
                 "CREATE TABLE paimontable (id STRING, id2 INT, id3 INT) 
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
-        if (isNamedArgument) {
-            tEnv.executeSql(
-                            "CALL sys.migrate_file(connector => 'hive', 
source_table => 'default.hivetable', target_table => 'default.paimontable')")
-                    .await();
-        } else {
-            tEnv.executeSql(
-                            "CALL sys.migrate_file('hive', 
'default.hivetable', 'default.paimontable')")
-                    .await();
-        }
+        tEnv.executeSql(
+                        "CALL sys.migrate_table(connector => 'hive', 
source_table => 'default.hivetable', target_table => 'default.paimontable')")
+                .await();
+
         List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
paimontable").collect());
 
         Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
     }
 
-    public void testMigrateFileAction(String format, boolean isNamedArgument) 
throws Exception {
+    public void testMigrateFileAction(String format) throws Exception {
         TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
@@ -144,41 +129,14 @@ public class MigrateFileProcedureITCase extends 
ActionITCaseBase {
         tEnv.useCatalog("PAIMON");
         tEnv.executeSql(
                 "CREATE TABLE paimontable01 (id STRING, id2 INT, id3 INT) 
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
-        tEnv.executeSql(
-                "CREATE TABLE paimontable02 (id STRING, id2 INT, id3 INT) 
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
-
-        if (isNamedArgument) {
-            tEnv.executeSql(
-                            "CALL sys.migrate_file(connector => 'hive', 
source_table => 'default.hivetable01', target_table => 'default.paimontable01', 
delete_origin => false)")
-                    .await();
-        } else {
-            tEnv.executeSql(
-                            "CALL sys.migrate_file('hive', 
'default.hivetable01', 'default.paimontable01', false)")
-                    .await();
-        }
 
-        tEnv.useCatalog("PAIMON_GE");
-        Map<String, String> catalogConf = new HashMap<>();
-        catalogConf.put("metastore", "hive");
-        catalogConf.put("uri", "thrift://localhost:" + PORT);
-        catalogConf.put(
-                "warehouse", 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname));
-        MigrateFileAction migrateFileAction =
-                new MigrateFileAction(
-                        "hive",
-                        "default.hivetable02",
-                        "default.paimontable02",
-                        false,
-                        catalogConf,
-                        "",
-                        6);
-        migrateFileAction.run();
+        tEnv.executeSql(
+                        "CALL sys.migrate_table(connector => 'hive', 
source_table => 'default.hivetable01', target_table => 'default.paimontable01', 
delete_origin => false)")
+                .await();
 
         tEnv.useCatalog("HIVE");
         List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable01").collect());
-        List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable02").collect());
         Assertions.assertThat(r1.size() == 0);
-        Assertions.assertThat(r2.size() == 0);
     }
 
     private String data(int i) {
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
index 8d6ded69dc..66cb057abf 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
@@ -20,7 +20,6 @@ package org.apache.paimon.hive.procedure;
 
 import org.apache.paimon.flink.action.ActionITCaseBase;
 import org.apache.paimon.flink.action.MigrateTableAction;
-import org.apache.paimon.flink.procedure.MigrateFileProcedure;
 import org.apache.paimon.hive.TestHiveMetastore;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -43,7 +42,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.stream.Stream;
 
-/** Tests for {@link MigrateFileProcedure}. */
+/** Tests for {@code MigrateFileProcedure}. */
 public class MigrateTableProcedureITCase extends ActionITCaseBase {
 
     private static final TestHiveMetastore TEST_HIVE_METASTORE = new 
TestHiveMetastore();
@@ -61,21 +60,15 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
     }
 
     private static Stream<Arguments> testArguments() {
-        return Stream.of(
-                Arguments.of("orc", true),
-                Arguments.of("avro", true),
-                Arguments.of("parquet", true),
-                Arguments.of("orc", false),
-                Arguments.of("avro", false),
-                Arguments.of("parquet", false));
+        return Stream.of(Arguments.of("orc"), Arguments.of("avro"), 
Arguments.of("parquet"));
     }
 
     @ParameterizedTest
     @MethodSource("testArguments")
-    public void testMigrateProcedure(String format, boolean isNamedArgument) 
throws Exception {
-        testUpgradeNonPartitionTable(format, isNamedArgument);
+    public void testMigrateProcedure(String format) throws Exception {
+        testUpgradeNonPartitionTable(format);
         resetMetastore();
-        testUpgradePartitionTable(format, isNamedArgument);
+        testUpgradePartitionTable(format);
     }
 
     private void resetMetastore() throws Exception {
@@ -84,7 +77,7 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
         TEST_HIVE_METASTORE.start(PORT);
     }
 
-    public void testUpgradePartitionTable(String format, boolean 
isNamedArgument) throws Exception {
+    public void testUpgradePartitionTable(String format) throws Exception {
         TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
@@ -107,26 +100,17 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
                         + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
                         + "')");
         tEnv.useCatalog("PAIMON");
-        if (isNamedArgument) {
-            tEnv.executeSql(
-                            "CALL sys.migrate_table(connector => 'hive', 
source_table => 'default.hivetable', options => 'file.format="
-                                    + format
-                                    + "')")
-                    .await();
-        } else {
-            tEnv.executeSql(
-                            "CALL sys.migrate_table('hive', 
'default.hivetable', 'file.format="
-                                    + format
-                                    + "')")
-                    .await();
-        }
+        tEnv.executeSql(
+                        "CALL sys.migrate_table(connector => 'hive', 
source_table => 'default.hivetable', options => 'file.format="
+                                + format
+                                + "')")
+                .await();
         List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
 
         Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
     }
 
-    public void testUpgradeNonPartitionTable(String format, boolean 
isNamedArgument)
-            throws Exception {
+    public void testUpgradeNonPartitionTable(String format) throws Exception {
         TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
         tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
         tEnv.useCatalog("HIVE");
@@ -147,19 +131,11 @@ public class MigrateTableProcedureITCase extends 
ActionITCaseBase {
                         + 
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
                         + "')");
         tEnv.useCatalog("PAIMON");
-        if (isNamedArgument) {
-            tEnv.executeSql(
-                            "CALL sys.migrate_table(connector => 'hive', 
source_table => 'default.hivetable', options => 'file.format="
-                                    + format
-                                    + "')")
-                    .await();
-        } else {
-            tEnv.executeSql(
-                            "CALL sys.migrate_table('hive', 
'default.hivetable', 'file.format="
-                                    + format
-                                    + "')")
-                    .await();
-        }
+        tEnv.executeSql(
+                        "CALL sys.migrate_table(connector => 'hive', 
source_table => 'default.hivetable', options => 'file.format="
+                                + format
+                                + "')")
+                .await();
         List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM 
hivetable").collect());
 
         Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index eff62cad96..e1cf70d3bd 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -32,7 +32,6 @@ import org.apache.paimon.spark.procedure.ExpireTagsProcedure;
 import org.apache.paimon.spark.procedure.FastForwardProcedure;
 import org.apache.paimon.spark.procedure.MarkPartitionDoneProcedure;
 import org.apache.paimon.spark.procedure.MigrateDatabaseProcedure;
-import org.apache.paimon.spark.procedure.MigrateFileProcedure;
 import org.apache.paimon.spark.procedure.MigrateTableProcedure;
 import org.apache.paimon.spark.procedure.Procedure;
 import org.apache.paimon.spark.procedure.ProcedureBuilder;
@@ -90,7 +89,6 @@ public class SparkProcedures {
         procedureBuilders.put("compact", CompactProcedure::builder);
         procedureBuilders.put("migrate_database", 
MigrateDatabaseProcedure::builder);
         procedureBuilders.put("migrate_table", MigrateTableProcedure::builder);
-        procedureBuilders.put("migrate_file", MigrateFileProcedure::builder);
         procedureBuilders.put("remove_orphan_files", 
RemoveOrphanFilesProcedure::builder);
         procedureBuilders.put("remove_unexisting_files", 
RemoveUnexistingFilesProcedure::builder);
         procedureBuilders.put("expire_snapshots", 
ExpireSnapshotsProcedure::builder);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
deleted file mode 100644
index 95d55df011..0000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.procedure;
-
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.migrate.Migrator;
-import org.apache.paimon.spark.catalog.WithPaimonCatalog;
-import org.apache.paimon.spark.utils.TableMigrationUtils;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.Collections;
-
-import static org.apache.spark.sql.types.DataTypes.BooleanType;
-import static org.apache.spark.sql.types.DataTypes.IntegerType;
-import static org.apache.spark.sql.types.DataTypes.StringType;
-
-/**
- * Migrate file procedure. Usage:
- *
- * <pre><code>
- *  CALL sys.migrate_file(source_type => 'hive', source_table => 
'db.source_tbl', target_table => 'db.target_tbl')
- * </code></pre>
- */
-public class MigrateFileProcedure extends BaseProcedure {
-
-    private static final ProcedureParameter[] PARAMETERS =
-            new ProcedureParameter[] {
-                ProcedureParameter.required("source_type", StringType),
-                ProcedureParameter.required("source_table", StringType),
-                ProcedureParameter.required("target_table", StringType),
-                ProcedureParameter.optional("delete_origin", BooleanType),
-                ProcedureParameter.optional("parallelism", IntegerType)
-            };
-
-    private static final StructType OUTPUT_TYPE =
-            new StructType(
-                    new StructField[] {
-                        new StructField("result", BooleanType, true, 
Metadata.empty())
-                    });
-
-    protected MigrateFileProcedure(TableCatalog tableCatalog) {
-        super(tableCatalog);
-    }
-
-    @Override
-    public ProcedureParameter[] parameters() {
-        return PARAMETERS;
-    }
-
-    @Override
-    public StructType outputType() {
-        return OUTPUT_TYPE;
-    }
-
-    @Override
-    public InternalRow[] call(InternalRow args) {
-        String format = args.getString(0);
-        String sourceTable = args.getString(1);
-        String targetTable = args.getString(2);
-        boolean deleteNeed = args.isNullAt(3) ? true : args.getBoolean(3);
-        int parallelism =
-                args.isNullAt(4) ? Runtime.getRuntime().availableProcessors() 
: args.getInt(4);
-
-        Identifier sourceTableId = Identifier.fromString(sourceTable);
-        Identifier targetTableId = Identifier.fromString(targetTable);
-
-        Catalog paimonCatalog = ((WithPaimonCatalog) 
tableCatalog()).paimonCatalog();
-
-        try {
-            paimonCatalog.getTable(targetTableId);
-        } catch (Catalog.TableNotExistException e) {
-            throw new IllegalArgumentException(
-                    "Target paimon table does not exist: " + targetTable);
-        }
-
-        try {
-            Migrator migrator =
-                    TableMigrationUtils.getImporter(
-                            format,
-                            paimonCatalog,
-                            sourceTableId.getDatabaseName(),
-                            sourceTableId.getObjectName(),
-                            targetTableId.getDatabaseName(),
-                            targetTableId.getObjectName(),
-                            parallelism,
-                            Collections.emptyMap());
-
-            migrator.deleteOriginTable(deleteNeed);
-            migrator.executeMigrate();
-        } catch (Exception e) {
-            throw new RuntimeException("Call migrate_file error", e);
-        }
-
-        return new InternalRow[] {newInternalRow(true)};
-    }
-
-    public static ProcedureBuilder builder() {
-        return new BaseProcedure.Builder<MigrateFileProcedure>() {
-            @Override
-            public MigrateFileProcedure doBuild() {
-                return new MigrateFileProcedure(tableCatalog());
-            }
-        };
-    }
-
-    @Override
-    public String description() {
-        return "MigrateFileProcedure";
-    }
-}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
index 043223d05a..4b658d1873 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
@@ -45,7 +45,7 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
           spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4', 
'd', 'p2')")
 
           spark.sql(
-            s"CALL sys.migrate_file(source_type => 'hive', source_table => 
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl')")
+            s"CALL sys.migrate_table(source_type => 'hive', table => 
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl')")
 
           checkAnswer(
             spark.sql("SELECT * FROM paimon_tbl ORDER BY id"),
@@ -80,7 +80,7 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
           spark.sql(s"INSERT INTO paimon_tbl_02 VALUES ('3', 'c', 'p1'), ('4', 
'd', 'p2')")
 
           spark.sql(
-            s"CALL sys.migrate_file(source_type => 'hive', source_table => 
'$hiveDbName.hive_tbl_02', target_table => '$hiveDbName.paimon_tbl_02', 
parallelism => 6)")
+            s"CALL sys.migrate_table(source_type => 'hive', table => 
'$hiveDbName.hive_tbl_02', target_table => '$hiveDbName.paimon_tbl_02', 
parallelism => 6)")
 
           checkAnswer(
             spark.sql("SELECT * FROM paimon_tbl_02 ORDER BY id"),
@@ -115,7 +115,7 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
           spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4', 
'd', 'p2')")
 
           spark.sql(
-            s"CALL sys.migrate_file(source_type => 'hive', source_table => 
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin 
=> false)")
+            s"CALL sys.migrate_table(source_type => 'hive', table => 
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin 
=> false)")
 
           checkAnswer(spark.sql("SELECT * FROM hive_tbl ORDER BY id"), Nil)
 
@@ -153,7 +153,7 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
           spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4', 
'd', 'p2')")
 
           spark.sql(
-            s"CALL sys.migrate_file(source_type => 'hive', source_table => 
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl')")
+            s"CALL sys.migrate_table(source_type => 'hive', table => 
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl')")
 
           checkAnswer(
             spark.sql("SELECT * FROM paimon_tbl ORDER BY id"),
@@ -190,7 +190,7 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
           spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4', 
'd', 'p2')")
 
           spark.sql(
-            s"CALL sys.migrate_file(source_type => 'hive', source_table => 
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin 
=> false)")
+            s"CALL sys.migrate_table(source_type => 'hive', table => 
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin 
=> false)")
 
           checkAnswer(
             spark.sql("SELECT * FROM paimon_tbl ORDER BY id"),

Reply via email to