This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0197aab272 [procedure] Merge migrate_file to migrate_table (#5240)
0197aab272 is described below
commit 0197aab272e6d3fc4df6f065de48ef243251f0af
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 10 14:38:24 2025 +0800
[procedure] Merge migrate_file to migrate_table (#5240)
---
docs/content/flink/procedures.md | 34 +-----
docs/content/migration/migration-from-hive.md | 112 ++++++-----------
docs/content/spark/procedures.md | 12 --
.../java/org/apache/paimon/migrate/Migrator.java | 2 +-
.../flink/procedure/MigrateFileProcedure.java | 112 -----------------
.../ProcedurePositionalArgumentsITCase.java | 2 -
.../paimon/flink/action/MigrateFileAction.java | 66 -----------
.../flink/action/MigrateFileActionFactory.java | 83 -------------
.../paimon/flink/action/MigrateTableAction.java | 4 +-
.../flink/procedure/MigrateFileProcedure.java | 101 ----------------
.../flink/procedure/MigrateTableProcedure.java | 36 ++++--
.../services/org.apache.paimon.factories.Factory | 2 -
.../hive/procedure/MigrateFileProcedureITCase.java | 70 +++--------
.../procedure/MigrateTableProcedureITCase.java | 58 +++------
.../org/apache/paimon/spark/SparkProcedures.java | 2 -
.../spark/procedure/MigrateFileProcedure.java | 132 ---------------------
.../spark/procedure/MigrateFileProcedureTest.scala | 10 +-
17 files changed, 105 insertions(+), 733 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 84cbfe9068..495a96b074 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -532,10 +532,6 @@ All available procedures are listed below.
<tr>
<td>migrate_table</td>
<td>
- -- for Flink 1.18<br/>
- -- migrate hive table to a paimon table.<br/>
- CALL [catalog.]sys.migrate_table('connector', 'tableIdentifier',
'options'[, <parallelism>])<br/><br/>
- -- for Flink 1.19 and later<br/>
-- migrate hive table to a paimon table.<br/>
CALL [catalog.]sys.migrate_table(connector => 'connector',
source_table => 'tableIdentifier', options => 'options'[, <parallelism =>
parallelism>])<br/><br/>
</td>
@@ -543,39 +539,13 @@ All available procedures are listed below.
To migrate hive table to a paimon table. Argument:
<li>connector: the origin table's type to be migrated, such as
hive. Cannot be empty.</li>
<li>source_table: name of the origin table to be migrated. Cannot
be empty.</li>
+ <li>target_table: name of the target paimon table to migrate. If
not set would keep the same name with origin table</li>
<li>options: the table options of the paimon table to migrate.</li>
<li>parallelism: the parallelism for migrate process, default is
core numbers of machine.</li>
- </td>
- <td>
- -- for Flink 1.18<br/>
- CALL sys.migrate_table('hive', 'db01.t1', 'file.format=parquet', 6)
- -- for Flink 1.19 and later<br/>
- CALL sys.migrate_table(connector => 'hive', source_table =>
'db01.t1', options => 'file.format=parquet', parallelism => 6)
- </td>
- </tr>
- <tr>
- <td>migrate_file</td>
- <td>
- -- for Flink 1.18<br/>
- -- migrate files from hive table to a paimon table.<br/>
- CALL [catalog.]sys.migrate_file('connector', 'srcTableIdentifier',
'destTableIdentifier', [, <delete_origin>, <parallelism>])<br/><br/>
- -- for Flink 1.19 and later<br/>
- -- migrate hive table to a paimon table.<br/>
- CALL [catalog.]sys.migrate_file(connector => 'connector',
source_table => 'srcTableIdentifier', target_table => 'destTableIdentifier'[,
<delete_origin => bool>, <parallelism => parallelism>])<br/><br/>
- </td>
- <td>
- To migrate files from hive table to a paimon table. Argument:
- <li>connector: the origin table's type to be migrated, such as
hive. Cannot be empty.</li>
- <li>source_table: name of the origin table to migrate. Cannot be
empty.</li>
- <li>target_table: name of the target table to be migrated. Cannot
be empty.</li>
<li>delete_origin: If had set target_table, can set delete_origin
to decide whether delete the origin table metadata from hms after migrate.
Default is true</li>
- <li>parallelism: the parallelism for migrate process, default is
core numbers of machine.</li>
</td>
<td>
- -- for Flink 1.18<br/>
- CALL sys.migrate_file('hive', 'default.T', 'default.T2', true, 6)
- -- for Flink 1.19 and later<br/>
- CALL sys.migrate_file(connector => 'hive', source_table =>
'default.T', target_table => 'default.T2', delete_origin => true, parallelism
=> 6)
+ CALL sys.migrate_table(connector => 'hive', source_table =>
'db01.t1', options => 'file.format=parquet', parallelism => 6)
</td>
</tr>
<tr>
diff --git a/docs/content/migration/migration-from-hive.md
b/docs/content/migration/migration-from-hive.md
index b30b2680cc..03ff208ee5 100644
--- a/docs/content/migration/migration-from-hive.md
+++ b/docs/content/migration/migration-from-hive.md
@@ -30,59 +30,41 @@ Apache Hive supports ORC, Parquet file formats that could
be migrated to Paimon.
When migrating data to a paimon table, the origin table will be permanently
disappeared. So please back up your data if you
still need the original table. The migrated table will be [append table]({{<
ref "append-table/overview" >}}).
-Now, we can use paimon hive catalog with Migrate Table Procedure and Migrate
File Procedure to totally migrate a table from hive to paimon.
+Now, we can use paimon hive catalog with Migrate Table Procedure to totally
migrate a table from hive to paimon.
At the same time, you can use paimon hive catalog with Migrate Database
Procedure to fully synchronize all tables in the database to paimon.
* Migrate Table Procedure: Paimon table does not exist, use the procedure
upgrade hive table to paimon table. Hive table will disappear after action done.
* Migrate Database Procedure: Paimon table does not exist, use the procedure
upgrade all hive tables in database to paimon table. All hive tables will
disappear after action done.
-* Migrate File Procedure: Paimon table already exists, use the procedure to
migrate files from hive table to paimon table. **Notice that, Hive table will
also disappear after action done.**
These three actions now support file format of hive "orc" and "parquet" and
"avro".
<span style="color: red; "> **We highly recommend to back up hive table data
before migrating, because migrating action is not atomic. If been interrupted
while migrating, you may lose your data.** </span>
-## Example for Migration
-
-**Migrate Hive Table**
-
-Command: <br>
-
-***CALL <font color="green">sys.migrate_table</font>('hive',
'<hive_database>.<hive_tablename>',
'<paimon_tableconf>');***
-
-**Example**
+## Migrate Hive Table
+{{< tabs "migrate table" >}}
+{{< tab "Flink SQL" >}}
```sql
-CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' =
'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
+CREATE CATALOG PAIMON WITH (
+ 'type'='paimon',
+ 'metastore' = 'hive',
+ 'uri' = 'thrift://localhost:9083',
+ 'warehouse'='/path/to/warehouse/');
USE CATALOG PAIMON;
-CALL sys.migrate_table(connector => 'hive', source_table =>
'default.hivetable', options => 'file.format=orc');
-```
-After invoke, "hivetable" will totally convert to paimon format. Writing and
reading the table by old "hive way" will fail.
-We can add our table properties while importing by
sys.migrate_table('<database>.<tablename>', '<tableproperties>').
-<tableproperties> here should be separated by ",". For example:
-
-```sql
CALL sys.migrate_table(
- connector => 'hive',
- source_table => 'my_db.wait_to_upgrate',
- options => 'file.format=orc,read.batch-size=2096,write-only=true'
-);
+ connector => 'hive',
+ source_table => 'default.hivetable',
+ -- You can specify the target table, and if the target table already exists
+ -- the file will be migrated directly to it
+ -- target_table => 'default.paimontarget',
+ -- You can specify delete_origin is false, this won't delete hivetable
+ -- delete_origin => false,
+ options => 'file.format=orc');
```
-
-If your flink version is below 1.17, you can use flink action to achieve this:
-```bash
-<FLINK_HOME>/bin/flink run \
-/path/to/paimon-flink-action-{{< version >}}.jar \
-migrate_table \
---warehouse <warehouse-path> \
---source_type hive \
---table <database.table-name> \
-[--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf>
...]] \
-[--options <paimon-table-conf [,paimon-table-conf ...]> ]
-```
-
-Example:
+{{< /tab >}}
+{{< tab "Flink Action" >}}
```bash
<FLINK_HOME>/flink run ./paimon-flink-action-{{< version >}}.jar \
migrate_table \
@@ -92,35 +74,31 @@ migrate_table \
--source_type hive \
--table default.hive_or_paimon
```
+{{< /tab >}}
+{{< /tabs >}}
-**Migrate Hive Database**
-
-Command: <br>
-
-***CALL <font color="green">sys.migrate_database</font>('hive',
'<hive_database>', '<paimon_tableconf>');***
+After invoke, "hivetable" will totally convert to paimon format. Writing and
reading the table by old "hive way" will fail.
-**Example**
+## Migrate Hive Database
+{{< tabs "migrate database" >}}
+{{< tab "Flink SQL" >}}
```sql
-CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' =
'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
+CREATE CATALOG PAIMON WITH (
+ 'type'='paimon',
+ 'metastore' = 'hive',
+ 'uri' = 'thrift://localhost:9083',
+ 'warehouse'='/path/to/warehouse/');
USE CATALOG PAIMON;
-CALL sys.migrate_database(connector => 'hive', source_database => 'default',
options => 'file.format=orc');
-```
-After invoke, all tables in "default" database will totally convert to paimon
format. Writing and reading the table by old "hive way" will fail.
-We can add our table properties while importing by
sys.migrate_database('<database>', '<tableproperties>').
-<tableproperties> here should be separated by ",". For example:
-
-```sql
CALL sys.migrate_database(
- connector => 'hive',
- source_database => 'my_db',
- options => 'file.format=orc,read.batch-size=2096,write-only=true'
-);
+ connector => 'hive',
+ source_database => 'default',
+ options => 'file.format=orc');
```
-
-If your flink version is below 1.17, you can use flink action to achieve this:
+{{< /tab >}}
+{{< tab "Flink Action" >}}
```bash
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-{{< version >}}.jar \
@@ -141,21 +119,7 @@ Example:
--source_type hive \
--database default
```
+{{< /tab >}}
+{{< /tabs >}}
-**Migrate Hive File**
-
-Command: <br>
-
-***CALL <font color="green">sys.migrate_file</font>('hive',
'<hive_database>.<hive_table_name>',
'<paimon_database>.<paimon_tablename>');***
-
-**Example**
-
-```sql
-CREATE CATALOG PAIMON WITH ('type'='paimon', 'metastore' = 'hive', 'uri' =
'thrift://localhost:9083', 'warehouse'='/path/to/warehouse/');
-
-USE CATALOG PAIMON;
-
-CALL sys.migrate_file(connector => 'hive', source_table =>
'default.hivetable', target_table => 'default.paimontable');
-```
-After invoke, "hivetable" will disappear. And all files will be moved and
renamed to paimon directory. "paimontable" here must have the same
-partition keys with "hivetable", and "paimontable" should be in unaware-bucket
mode.
+After invoke, all tables in "default" database will totally convert to paimon
format. Writing and reading the table by old "hive way" will fail.
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 0dd009005b..b3eee78e8d 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -230,18 +230,6 @@ This section introduce all available spark procedures
about paimon.
</td>
<td>CALL sys.migrate_table(source_type => 'hive', table => 'default.T',
options => 'file.format=parquet', options_map => map('k1','v1'), parallelism =>
6)</td>
</tr>
- <tr>
- <td>migrate_file</td>
- <td>
- Migrate from hive table to a paimon table. Arguments:
- <li>source_type: the origin table's type to be migrated, such as
hive. Cannot be empty.</li>
- <li>source_table: name of the origin table to migrate. Cannot be
empty.</li>
- <li>target_table: name of the target table to be migrated. Cannot
be empty.</li>
- <li>delete_origin: If had set target_table, can set delete_origin
to decide whether delete the origin table metadata from hms after migrate.
Default is true</li>
- <li>parallelism: the parallelism for migrate process, default is
core numbers of machine.</li>
- </td>
- <td>CALL sys.migrate_file(connector => 'hive', source_table =>
'default.hivetable', target_table => 'default.paimontable', delete_origin =>
true, parallelism => 6)</td>
- </tr>
<tr>
<td>remove_orphan_files</td>
<td>
diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
index ad6de123dc..aa611408a8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/Migrator.java
@@ -25,5 +25,5 @@ public interface Migrator {
void renameTable(boolean ignoreIfNotExists) throws Exception;
- public void deleteOriginTable(boolean delete) throws Exception;
+ void deleteOriginTable(boolean delete) throws Exception;
}
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
deleted file mode 100644
index 1e581c38cb..0000000000
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.procedure;
-
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.migrate.Migrator;
-
-import org.apache.flink.table.procedure.ProcedureContext;
-
-import java.util.Collections;
-
-/** Add file procedure to add file from hive to paimon. */
-public class MigrateFileProcedure extends ProcedureBase {
-
- @Override
- public String identifier() {
- return "migrate_file";
- }
-
- public String[] call(
- ProcedureContext procedureContext,
- String connector,
- String sourceTablePath,
- String targetPaimonTablePath)
- throws Exception {
- migrateHandle(
- connector,
- sourceTablePath,
- targetPaimonTablePath,
- true,
- Runtime.getRuntime().availableProcessors());
- return new String[] {"Success"};
- }
-
- public String[] call(
- ProcedureContext procedureContext,
- String connector,
- String sourceTablePath,
- String targetPaimonTablePath,
- boolean deleteOrigin)
- throws Exception {
- migrateHandle(
- connector,
- sourceTablePath,
- targetPaimonTablePath,
- deleteOrigin,
- Runtime.getRuntime().availableProcessors());
- return new String[] {"Success"};
- }
-
- public String[] call(
- ProcedureContext procedureContext,
- String connector,
- String sourceTablePath,
- String targetPaimonTablePath,
- boolean deleteOrigin,
- Integer parallelism)
- throws Exception {
- Integer p = parallelism == null ?
Runtime.getRuntime().availableProcessors() : parallelism;
- migrateHandle(connector, sourceTablePath, targetPaimonTablePath,
deleteOrigin, p);
- return new String[] {"Success"};
- }
-
- public void migrateHandle(
- String connector,
- String sourceTablePath,
- String targetPaimonTablePath,
- boolean deleteOrigin,
- Integer parallelism)
- throws Exception {
- Identifier sourceTableId = Identifier.fromString(sourceTablePath);
- Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
-
- try {
- catalog.getTable(targetTableId);
- } catch (Catalog.TableNotExistException e) {
- throw new IllegalArgumentException(
- "Target paimon table does not exist: " +
targetPaimonTablePath);
- }
-
- Migrator importer =
- TableMigrationUtils.getImporter(
- connector,
- catalog,
- sourceTableId.getDatabaseName(),
- sourceTableId.getObjectName(),
- targetTableId.getDatabaseName(),
- targetTableId.getObjectName(),
- parallelism,
- Collections.emptyMap());
- importer.deleteOriginTable(deleteOrigin);
- importer.executeMigrate();
- }
-}
diff --git
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
index e4d1738667..2f32c326a7 100644
---
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
+++
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java
@@ -418,8 +418,6 @@ public class ProcedurePositionalArgumentsITCase extends
CatalogITCaseBase {
.hasMessageContaining("Only support Hive Catalog.");
assertThatThrownBy(() -> sql("CALL sys.migrate_table('hive',
'default.T', '')"))
.hasMessageContaining("Only support Hive Catalog.");
- assertThatThrownBy(() -> sql("CALL sys.migrate_file('hive',
'default.T', 'default.S')"))
- .hasMessageContaining("Only support Hive Catalog.");
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
deleted file mode 100644
index e874536e78..0000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action;
-
-import org.apache.paimon.flink.procedure.MigrateFileProcedure;
-
-import org.apache.flink.table.procedure.DefaultProcedureContext;
-
-import java.util.Map;
-
-/** Migrate from external hive table to paimon table. */
-public class MigrateFileAction extends ActionBase {
-
- private final String connector;
- private final String sourceTable;
- private final String targetTable;
- private final String tableProperties;
- private boolean deleteOrigin;
- private Integer parallelism;
-
- public MigrateFileAction(
- String connector,
- String sourceTable,
- String targetTable,
- boolean deleteOrigin,
- Map<String, String> catalogConfig,
- String tableProperties,
- Integer parallelism) {
- super(catalogConfig);
- this.connector = connector;
- this.sourceTable = sourceTable;
- this.targetTable = targetTable;
- this.deleteOrigin = deleteOrigin;
- this.tableProperties = tableProperties;
- this.parallelism = parallelism;
- }
-
- @Override
- public void run() throws Exception {
- MigrateFileProcedure migrateTableProcedure = new
MigrateFileProcedure();
- migrateTableProcedure.withCatalog(catalog);
- migrateTableProcedure.call(
- new DefaultProcedureContext(env),
- connector,
- sourceTable,
- targetTable,
- deleteOrigin,
- parallelism);
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
deleted file mode 100644
index b43fc67752..0000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileActionFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.action;
-
-import java.util.Map;
-import java.util.Optional;
-
-/** Action Factory for {@link MigrateFileAction}. */
-public class MigrateFileActionFactory implements ActionFactory {
-
- public static final String IDENTIFIER = "migrate_file";
-
- private static final String SOURCE_TYPE = "source_type";
-
- private static final String SOURCE_TABLE = "source_table";
-
- private static final String TARGET_TABLE = "target_table";
-
- private static final String DELETE_ORIGIN = "delete_origin";
-
- private static final String OPTIONS = "options";
- private static final String PARALLELISM = "parallelism";
-
- @Override
- public String identifier() {
- return IDENTIFIER;
- }
-
- @Override
- public Optional<Action> create(MultipleParameterToolAdapter params) {
- String connector = params.get(SOURCE_TYPE);
- String sourceHiveTable = params.get(SOURCE_TABLE);
- String targetTable = params.get(TARGET_TABLE);
- boolean deleteOrigin = Boolean.parseBoolean(params.get(DELETE_ORIGIN));
- Map<String, String> catalogConfig = catalogConfigMap(params);
- String tableConf = params.get(OPTIONS);
- Integer parallelism = Integer.parseInt(params.get(PARALLELISM));
-
- MigrateFileAction migrateFileAction =
- new MigrateFileAction(
- connector,
- sourceHiveTable,
- targetTable,
- deleteOrigin,
- catalogConfig,
- tableConf,
- parallelism);
- return Optional.of(migrateFileAction);
- }
-
- @Override
- public void printHelp() {
- System.out.println("Action \"migrate_file\" runs a migrating job from
hive to paimon.");
- System.out.println();
-
- System.out.println("Syntax:");
- System.out.println(
- " migrate_file \\\n"
- + "--warehouse <warehouse_path> \\\n"
- + "--source_type hive \\\n"
- + "--source_table <database.table_name> \\\n"
- + "--target_table <database.table_name> \\\n"
- + "--delete_origin true \\\n"
- + "[--catalog_conf <key>=<value] \\\n"
- + "[--options <key>=<value>,<key>=<value>,...]");
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
index b12cb5f862..9bf85ab870 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
@@ -53,7 +53,9 @@ public class MigrateTableAction extends ActionBase {
new DefaultProcedureContext(env),
connector,
hiveTableFullName,
+ null,
tableProperties,
- parallelism);
+ parallelism,
+ null);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
deleted file mode 100644
index f2f10d0874..0000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.procedure;
-
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.migrate.Migrator;
-
-import org.apache.flink.table.annotation.ArgumentHint;
-import org.apache.flink.table.annotation.DataTypeHint;
-import org.apache.flink.table.annotation.ProcedureHint;
-import org.apache.flink.table.procedure.ProcedureContext;
-
-import java.util.Collections;
-
-/** Add file procedure to add file from hive to paimon. */
-public class MigrateFileProcedure extends ProcedureBase {
-
- @Override
- public String identifier() {
- return "migrate_file";
- }
-
- @ProcedureHint(
- argument = {
- @ArgumentHint(name = "connector", type =
@DataTypeHint("STRING")),
- @ArgumentHint(name = "source_table", type =
@DataTypeHint("STRING")),
- @ArgumentHint(name = "target_table", type =
@DataTypeHint("STRING")),
- @ArgumentHint(
- name = "delete_origin",
- type = @DataTypeHint("BOOLEAN"),
- isOptional = true),
- @ArgumentHint(
- name = "parallelism",
- type = @DataTypeHint("Integer"),
- isOptional = true)
- })
- public String[] call(
- ProcedureContext procedureContext,
- String connector,
- String sourceTablePath,
- String targetPaimonTablePath,
- Boolean deleteOrigin,
- Integer parallelism)
- throws Exception {
- if (deleteOrigin == null) {
- deleteOrigin = true;
- }
- Integer p = parallelism == null ?
Runtime.getRuntime().availableProcessors() : parallelism;
- migrateHandle(connector, sourceTablePath, targetPaimonTablePath,
deleteOrigin, p);
- return new String[] {"Success"};
- }
-
- public void migrateHandle(
- String connector,
- String sourceTablePath,
- String targetPaimonTablePath,
- boolean deleteOrigin,
- Integer parallelism)
- throws Exception {
- Identifier sourceTableId = Identifier.fromString(sourceTablePath);
- Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
-
- try {
- catalog.getTable(targetTableId);
- } catch (Catalog.TableNotExistException e) {
- throw new IllegalArgumentException(
- "Target paimon table does not exist: " +
targetPaimonTablePath);
- }
-
- Migrator importer =
- TableMigrationUtils.getImporter(
- connector,
- catalog,
- sourceTableId.getDatabaseName(),
- sourceTableId.getObjectName(),
- targetTableId.getDatabaseName(),
- targetTableId.getObjectName(),
- parallelism,
- Collections.emptyMap());
- importer.deleteOriginTable(deleteOrigin);
- importer.executeMigrate();
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index 32a2a16dc5..98e907bcea 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.migrate.Migrator;
-import org.apache.paimon.utils.ParameterUtils;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
@@ -30,6 +29,8 @@ import org.apache.flink.table.procedure.ProcedureContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.paimon.utils.ParameterUtils.parseCommaSeparatedKeyValues;
+
/** Migrate procedure to migrate hive table to paimon table. */
public class MigrateTableProcedure extends ProcedureBase {
@@ -46,25 +47,33 @@ public class MigrateTableProcedure extends ProcedureBase {
argument = {
@ArgumentHint(name = "connector", type =
@DataTypeHint("STRING")),
@ArgumentHint(name = "source_table", type =
@DataTypeHint("STRING")),
+ @ArgumentHint(
+ name = "target_table",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
@ArgumentHint(name = "options", type =
@DataTypeHint("STRING"), isOptional = true),
@ArgumentHint(
name = "parallelism",
type = @DataTypeHint("Integer"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "delete_origin",
+ type = @DataTypeHint("BOOLEAN"),
isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
String connector,
- String sourceTablePath,
+ String sourceTable,
+ String targetTable,
String properties,
- Integer parallelism)
+ Integer parallelism,
+ Boolean deleteOrigin)
throws Exception {
- properties = notnull(properties);
-
- String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
-
- Identifier sourceTableId = Identifier.fromString(sourceTablePath);
- Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
+ Identifier sourceTableId = Identifier.fromString(sourceTable);
+ Identifier targetTableId =
+ Identifier.fromString(
+ targetTable == null ? sourceTable + PAIMON_SUFFIX :
targetTable);
Integer p = parallelism == null ?
Runtime.getRuntime().availableProcessors() : parallelism;
@@ -77,11 +86,16 @@ public class MigrateTableProcedure extends ProcedureBase {
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
p,
-
ParameterUtils.parseCommaSeparatedKeyValues(properties));
+ parseCommaSeparatedKeyValues(notnull(properties)));
LOG.info("create migrator success.");
+ if (deleteOrigin != null) {
+ migrator.deleteOriginTable(deleteOrigin);
+ }
migrator.executeMigrate();
- migrator.renameTable(false);
+ if (targetTable == null) {
+ migrator.renameTable(false);
+ }
return new String[] {"Success"};
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index ae624f848d..f099e699d0 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -30,7 +30,6 @@ org.apache.paimon.flink.action.ExpireTagsActionFactory
org.apache.paimon.flink.action.ReplaceTagActionFactory
org.apache.paimon.flink.action.ResetConsumerActionFactory
org.apache.paimon.flink.action.MigrateTableActionFactory
-org.apache.paimon.flink.action.MigrateFileActionFactory
org.apache.paimon.flink.action.MigrateDatabaseActionFactory
org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory
org.apache.paimon.flink.action.QueryServiceActionFactory
@@ -67,7 +66,6 @@ org.apache.paimon.flink.procedure.RollbackToTimestampProcedure
org.apache.paimon.flink.procedure.RollbackToWatermarkProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
org.apache.paimon.flink.procedure.MigrateDatabaseProcedure
-org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
org.apache.paimon.flink.procedure.QueryServiceProcedure
org.apache.paimon.flink.procedure.ExpireSnapshotsProcedure
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
index 7ffc2ae54f..2ce9ede857 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateFileProcedureITCase.java
@@ -19,8 +19,6 @@
package org.apache.paimon.hive.procedure;
import org.apache.paimon.flink.action.ActionITCaseBase;
-import org.apache.paimon.flink.action.MigrateFileAction;
-import org.apache.paimon.flink.procedure.MigrateFileProcedure;
import org.apache.paimon.hive.TestHiveMetastore;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -36,13 +34,11 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import java.util.stream.Stream;
-/** Tests for {@link MigrateFileProcedure}. */
+/** Tests for {@code MigrateFileProcedure}. */
public class MigrateFileProcedureITCase extends ActionITCaseBase {
private static final TestHiveMetastore TEST_HIVE_METASTORE = new
TestHiveMetastore();
@@ -60,23 +56,17 @@ public class MigrateFileProcedureITCase extends
ActionITCaseBase {
}
private static Stream<Arguments> testArguments() {
- return Stream.of(
- Arguments.of("orc", true),
- Arguments.of("avro", true),
- Arguments.of("parquet", true),
- Arguments.of("orc", false),
- Arguments.of("avro", false),
- Arguments.of("parquet", false));
+ return Stream.of(Arguments.of("orc"), Arguments.of("avro"),
Arguments.of("parquet"));
}
@ParameterizedTest
@MethodSource("testArguments")
- public void testMigrateFile(String format, boolean isNamedArgument) throws
Exception {
- test(format, isNamedArgument);
- testMigrateFileAction(format, isNamedArgument);
+ public void testMigrateFile(String format) throws Exception {
+ test(format);
+ testMigrateFileAction(format);
}
- public void test(String format, boolean isNamedArgument) throws Exception {
+ public void test(String format) throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
@@ -101,21 +91,16 @@ public class MigrateFileProcedureITCase extends
ActionITCaseBase {
tEnv.useCatalog("PAIMON");
tEnv.executeSql(
"CREATE TABLE paimontable (id STRING, id2 INT, id3 INT)
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
- if (isNamedArgument) {
- tEnv.executeSql(
- "CALL sys.migrate_file(connector => 'hive',
source_table => 'default.hivetable', target_table => 'default.paimontable')")
- .await();
- } else {
- tEnv.executeSql(
- "CALL sys.migrate_file('hive',
'default.hivetable', 'default.paimontable')")
- .await();
- }
+ tEnv.executeSql(
+ "CALL sys.migrate_table(connector => 'hive',
source_table => 'default.hivetable', target_table => 'default.paimontable')")
+ .await();
+
List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
paimontable").collect());
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}
- public void testMigrateFileAction(String format, boolean isNamedArgument)
throws Exception {
+ public void testMigrateFileAction(String format) throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
@@ -144,41 +129,14 @@ public class MigrateFileProcedureITCase extends
ActionITCaseBase {
tEnv.useCatalog("PAIMON");
tEnv.executeSql(
"CREATE TABLE paimontable01 (id STRING, id2 INT, id3 INT)
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
- tEnv.executeSql(
- "CREATE TABLE paimontable02 (id STRING, id2 INT, id3 INT)
PARTITIONED BY (id2, id3) with ('bucket' = '-1');");
-
- if (isNamedArgument) {
- tEnv.executeSql(
- "CALL sys.migrate_file(connector => 'hive',
source_table => 'default.hivetable01', target_table => 'default.paimontable01',
delete_origin => false)")
- .await();
- } else {
- tEnv.executeSql(
- "CALL sys.migrate_file('hive',
'default.hivetable01', 'default.paimontable01', false)")
- .await();
- }
- tEnv.useCatalog("PAIMON_GE");
- Map<String, String> catalogConf = new HashMap<>();
- catalogConf.put("metastore", "hive");
- catalogConf.put("uri", "thrift://localhost:" + PORT);
- catalogConf.put(
- "warehouse",
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname));
- MigrateFileAction migrateFileAction =
- new MigrateFileAction(
- "hive",
- "default.hivetable02",
- "default.paimontable02",
- false,
- catalogConf,
- "",
- 6);
- migrateFileAction.run();
+ tEnv.executeSql(
+ "CALL sys.migrate_table(connector => 'hive',
source_table => 'default.hivetable01', target_table => 'default.paimontable01',
delete_origin => false)")
+ .await();
tEnv.useCatalog("HIVE");
List<Row> r1 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable01").collect());
- List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable02").collect());
Assertions.assertThat(r1.size() == 0);
- Assertions.assertThat(r2.size() == 0);
}
private String data(int i) {
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
index 8d6ded69dc..66cb057abf 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
@@ -20,7 +20,6 @@ package org.apache.paimon.hive.procedure;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.flink.action.MigrateTableAction;
-import org.apache.paimon.flink.procedure.MigrateFileProcedure;
import org.apache.paimon.hive.TestHiveMetastore;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -43,7 +42,7 @@ import java.util.Map;
import java.util.Random;
import java.util.stream.Stream;
-/** Tests for {@link MigrateFileProcedure}. */
+/** Tests for {@code MigrateFileProcedure}. */
public class MigrateTableProcedureITCase extends ActionITCaseBase {
private static final TestHiveMetastore TEST_HIVE_METASTORE = new
TestHiveMetastore();
@@ -61,21 +60,15 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
}
private static Stream<Arguments> testArguments() {
- return Stream.of(
- Arguments.of("orc", true),
- Arguments.of("avro", true),
- Arguments.of("parquet", true),
- Arguments.of("orc", false),
- Arguments.of("avro", false),
- Arguments.of("parquet", false));
+ return Stream.of(Arguments.of("orc"), Arguments.of("avro"),
Arguments.of("parquet"));
}
@ParameterizedTest
@MethodSource("testArguments")
- public void testMigrateProcedure(String format, boolean isNamedArgument)
throws Exception {
- testUpgradeNonPartitionTable(format, isNamedArgument);
+ public void testMigrateProcedure(String format) throws Exception {
+ testUpgradeNonPartitionTable(format);
resetMetastore();
- testUpgradePartitionTable(format, isNamedArgument);
+ testUpgradePartitionTable(format);
}
private void resetMetastore() throws Exception {
@@ -84,7 +77,7 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
TEST_HIVE_METASTORE.start(PORT);
}
- public void testUpgradePartitionTable(String format, boolean
isNamedArgument) throws Exception {
+ public void testUpgradePartitionTable(String format) throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
@@ -107,26 +100,17 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
+
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+ "')");
tEnv.useCatalog("PAIMON");
- if (isNamedArgument) {
- tEnv.executeSql(
- "CALL sys.migrate_table(connector => 'hive',
source_table => 'default.hivetable', options => 'file.format="
- + format
- + "')")
- .await();
- } else {
- tEnv.executeSql(
- "CALL sys.migrate_table('hive',
'default.hivetable', 'file.format="
- + format
- + "')")
- .await();
- }
+ tEnv.executeSql(
+ "CALL sys.migrate_table(connector => 'hive',
source_table => 'default.hivetable', options => 'file.format="
+ + format
+ + "')")
+ .await();
List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
}
- public void testUpgradeNonPartitionTable(String format, boolean
isNamedArgument)
- throws Exception {
+ public void testUpgradeNonPartitionTable(String format) throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.executeSql("CREATE CATALOG HIVE WITH ('type'='hive')");
tEnv.useCatalog("HIVE");
@@ -147,19 +131,11 @@ public class MigrateTableProcedureITCase extends
ActionITCaseBase {
+
System.getProperty(HiveConf.ConfVars.METASTOREWAREHOUSE.varname)
+ "')");
tEnv.useCatalog("PAIMON");
- if (isNamedArgument) {
- tEnv.executeSql(
- "CALL sys.migrate_table(connector => 'hive',
source_table => 'default.hivetable', options => 'file.format="
- + format
- + "')")
- .await();
- } else {
- tEnv.executeSql(
- "CALL sys.migrate_table('hive',
'default.hivetable', 'file.format="
- + format
- + "')")
- .await();
- }
+ tEnv.executeSql(
+ "CALL sys.migrate_table(connector => 'hive',
source_table => 'default.hivetable', options => 'file.format="
+ + format
+ + "')")
+ .await();
List<Row> r2 = ImmutableList.copyOf(tEnv.executeSql("SELECT * FROM
hivetable").collect());
Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index eff62cad96..e1cf70d3bd 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -32,7 +32,6 @@ import org.apache.paimon.spark.procedure.ExpireTagsProcedure;
import org.apache.paimon.spark.procedure.FastForwardProcedure;
import org.apache.paimon.spark.procedure.MarkPartitionDoneProcedure;
import org.apache.paimon.spark.procedure.MigrateDatabaseProcedure;
-import org.apache.paimon.spark.procedure.MigrateFileProcedure;
import org.apache.paimon.spark.procedure.MigrateTableProcedure;
import org.apache.paimon.spark.procedure.Procedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;
@@ -90,7 +89,6 @@ public class SparkProcedures {
procedureBuilders.put("compact", CompactProcedure::builder);
procedureBuilders.put("migrate_database",
MigrateDatabaseProcedure::builder);
procedureBuilders.put("migrate_table", MigrateTableProcedure::builder);
- procedureBuilders.put("migrate_file", MigrateFileProcedure::builder);
procedureBuilders.put("remove_orphan_files",
RemoveOrphanFilesProcedure::builder);
procedureBuilders.put("remove_unexisting_files",
RemoveUnexistingFilesProcedure::builder);
procedureBuilders.put("expire_snapshots",
ExpireSnapshotsProcedure::builder);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
deleted file mode 100644
index 95d55df011..0000000000
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.procedure;
-
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.migrate.Migrator;
-import org.apache.paimon.spark.catalog.WithPaimonCatalog;
-import org.apache.paimon.spark.utils.TableMigrationUtils;
-
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.catalog.TableCatalog;
-import org.apache.spark.sql.types.Metadata;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
-
-import java.util.Collections;
-
-import static org.apache.spark.sql.types.DataTypes.BooleanType;
-import static org.apache.spark.sql.types.DataTypes.IntegerType;
-import static org.apache.spark.sql.types.DataTypes.StringType;
-
-/**
- * Migrate file procedure. Usage:
- *
- * <pre><code>
- * CALL sys.migrate_file(source_type => 'hive', source_table =>
'db.source_tbl', target_table => 'db.target_tbl')
- * </code></pre>
- */
-public class MigrateFileProcedure extends BaseProcedure {
-
- private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {
- ProcedureParameter.required("source_type", StringType),
- ProcedureParameter.required("source_table", StringType),
- ProcedureParameter.required("target_table", StringType),
- ProcedureParameter.optional("delete_origin", BooleanType),
- ProcedureParameter.optional("parallelism", IntegerType)
- };
-
- private static final StructType OUTPUT_TYPE =
- new StructType(
- new StructField[] {
- new StructField("result", BooleanType, true,
Metadata.empty())
- });
-
- protected MigrateFileProcedure(TableCatalog tableCatalog) {
- super(tableCatalog);
- }
-
- @Override
- public ProcedureParameter[] parameters() {
- return PARAMETERS;
- }
-
- @Override
- public StructType outputType() {
- return OUTPUT_TYPE;
- }
-
- @Override
- public InternalRow[] call(InternalRow args) {
- String format = args.getString(0);
- String sourceTable = args.getString(1);
- String targetTable = args.getString(2);
- boolean deleteNeed = args.isNullAt(3) ? true : args.getBoolean(3);
- int parallelism =
- args.isNullAt(4) ? Runtime.getRuntime().availableProcessors()
: args.getInt(4);
-
- Identifier sourceTableId = Identifier.fromString(sourceTable);
- Identifier targetTableId = Identifier.fromString(targetTable);
-
- Catalog paimonCatalog = ((WithPaimonCatalog)
tableCatalog()).paimonCatalog();
-
- try {
- paimonCatalog.getTable(targetTableId);
- } catch (Catalog.TableNotExistException e) {
- throw new IllegalArgumentException(
- "Target paimon table does not exist: " + targetTable);
- }
-
- try {
- Migrator migrator =
- TableMigrationUtils.getImporter(
- format,
- paimonCatalog,
- sourceTableId.getDatabaseName(),
- sourceTableId.getObjectName(),
- targetTableId.getDatabaseName(),
- targetTableId.getObjectName(),
- parallelism,
- Collections.emptyMap());
-
- migrator.deleteOriginTable(deleteNeed);
- migrator.executeMigrate();
- } catch (Exception e) {
- throw new RuntimeException("Call migrate_file error", e);
- }
-
- return new InternalRow[] {newInternalRow(true)};
- }
-
- public static ProcedureBuilder builder() {
- return new BaseProcedure.Builder<MigrateFileProcedure>() {
- @Override
- public MigrateFileProcedure doBuild() {
- return new MigrateFileProcedure(tableCatalog());
- }
- };
- }
-
- @Override
- public String description() {
- return "MigrateFileProcedure";
- }
-}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
index 043223d05a..4b658d1873 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MigrateFileProcedureTest.scala
@@ -45,7 +45,7 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4',
'd', 'p2')")
spark.sql(
- s"CALL sys.migrate_file(source_type => 'hive', source_table =>
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl')")
+ s"CALL sys.migrate_table(source_type => 'hive', table =>
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl')")
checkAnswer(
spark.sql("SELECT * FROM paimon_tbl ORDER BY id"),
@@ -80,7 +80,7 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
spark.sql(s"INSERT INTO paimon_tbl_02 VALUES ('3', 'c', 'p1'), ('4',
'd', 'p2')")
spark.sql(
- s"CALL sys.migrate_file(source_type => 'hive', source_table =>
'$hiveDbName.hive_tbl_02', target_table => '$hiveDbName.paimon_tbl_02',
parallelism => 6)")
+ s"CALL sys.migrate_table(source_type => 'hive', table =>
'$hiveDbName.hive_tbl_02', target_table => '$hiveDbName.paimon_tbl_02',
parallelism => 6)")
checkAnswer(
spark.sql("SELECT * FROM paimon_tbl_02 ORDER BY id"),
@@ -115,7 +115,7 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4',
'd', 'p2')")
spark.sql(
- s"CALL sys.migrate_file(source_type => 'hive', source_table =>
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin
=> false)")
+ s"CALL sys.migrate_table(source_type => 'hive', table =>
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin
=> false)")
checkAnswer(spark.sql("SELECT * FROM hive_tbl ORDER BY id"), Nil)
@@ -153,7 +153,7 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4',
'd', 'p2')")
spark.sql(
- s"CALL sys.migrate_file(source_type => 'hive', source_table =>
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl')")
+ s"CALL sys.migrate_table(source_type => 'hive', table =>
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl')")
checkAnswer(
spark.sql("SELECT * FROM paimon_tbl ORDER BY id"),
@@ -190,7 +190,7 @@ class MigrateFileProcedureTest extends PaimonHiveTestBase {
spark.sql(s"INSERT INTO paimon_tbl VALUES ('3', 'c', 'p1'), ('4',
'd', 'p2')")
spark.sql(
- s"CALL sys.migrate_file(source_type => 'hive', source_table =>
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin
=> false)")
+ s"CALL sys.migrate_table(source_type => 'hive', table =>
'$hiveDbName.hive_tbl', target_table => '$hiveDbName.paimon_tbl', delete_origin
=> false)")
checkAnswer(
spark.sql("SELECT * FROM paimon_tbl ORDER BY id"),