This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 183ca8f5de [iceberg] support migration for iceberg hive-catalog and
introduce flink procedure and action (#4878)
183ca8f5de is described below
commit 183ca8f5de04d03db3fee133926a76f092282446
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Feb 13 13:51:01 2025 +0800
[iceberg] support migration for iceberg hive-catalog and introduce flink
procedure and action (#4878)
This closes #4878.
---
...dure.java => MigrateIcebergTableProcedure.java} | 59 +++---
.../flink/procedure/MigrateTableProcedure.java | 40 ++--
.../flink/action/MigrateIcebergTableAction.java | 61 ++++++
.../action/MigrateIcebergTableActionFactory.java | 70 +++++++
...dure.java => MigrateIcebergTableProcedure.java} | 31 +--
.../flink/procedure/MigrateTableProcedure.java | 12 +-
.../paimon/flink/utils/TableMigrationUtils.java | 23 +++
.../services/org.apache.paimon.factories.Factory | 3 +-
.../migrate/IcebergMigrateHiveMetadata.java | 168 +++++++++++++++
.../migrate/IcebergMigrateHiveMetadataFactory.java | 36 ++++
.../services/org.apache.paimon.factories.Factory | 1 +
paimon-hive/paimon-hive-connector-common/pom.xml | 19 ++
.../MigrateIcebergTableProcedureITCase.java | 229 +++++++++++++++++++++
paimon-hive/pom.xml | 1 +
14 files changed, 675 insertions(+), 78 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java
similarity index 57%
copy from
paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
copy to
paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java
index 196528d31c..0402d3e896 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java
@@ -20,82 +20,73 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.flink.table.procedure.ProcedureContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** Migrate procedure to migrate hive table to paimon table. */
-public class MigrateTableProcedure extends ProcedureBase {
+/** Migrate procedure to migrate iceberg table to paimon table. */
+public class MigrateIcebergTableProcedure extends ProcedureBase {
- private static final Logger LOG =
LoggerFactory.getLogger(MigrateTableProcedure.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(MigrateIcebergTableProcedure.class);
private static final String PAIMON_SUFFIX = "_paimon_";
@Override
public String identifier() {
- return "migrate_table";
+ return "migrate_iceberg_table";
}
public String[] call(
- ProcedureContext procedureContext, String connector, String
sourceTablePath)
+ ProcedureContext procedureContext, String sourceTablePath, String
icebergProperties)
throws Exception {
- return call(procedureContext, connector, sourceTablePath, "");
+
+ return call(procedureContext, sourceTablePath, icebergProperties, "");
}
public String[] call(
ProcedureContext procedureContext,
- String connector,
String sourceTablePath,
+ String icebergProperties,
String properties)
throws Exception {
- String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
-
- Identifier sourceTableId = Identifier.fromString(sourceTablePath);
- Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
-
- TableMigrationUtils.getImporter(
- connector,
- catalog,
- sourceTableId.getDatabaseName(),
- sourceTableId.getObjectName(),
- targetTableId.getDatabaseName(),
- targetTableId.getObjectName(),
- Runtime.getRuntime().availableProcessors(),
-
ParameterUtils.parseCommaSeparatedKeyValues(properties))
- .executeMigrate();
- LOG.info("Last step: rename " + targetTableId + " to " +
sourceTableId);
- catalog.renameTable(targetTableId, sourceTableId, false);
- return new String[] {"Success"};
+ return call(
+ procedureContext,
+ sourceTablePath,
+ icebergProperties,
+ properties,
+ Runtime.getRuntime().availableProcessors());
}
public String[] call(
ProcedureContext procedureContext,
- String connector,
String sourceTablePath,
+ String icebergProperties,
String properties,
Integer parallelism)
throws Exception {
- String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
+ String targetTablePath = sourceTablePath + PAIMON_SUFFIX;
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
- Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
+ Identifier targetTableId = Identifier.fromString(targetTablePath);
- TableMigrationUtils.getImporter(
- connector,
+ Migrator migrator =
+ TableMigrationUtils.getIcebergImporter(
catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
parallelism,
-
ParameterUtils.parseCommaSeparatedKeyValues(properties))
- .executeMigrate();
+
ParameterUtils.parseCommaSeparatedKeyValues(properties),
+
ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties));
+ LOG.info("create migrator success.");
+ migrator.executeMigrate();
- LOG.info("Last step: rename " + targetTableId + " to " +
sourceTableId);
- catalog.renameTable(targetTableId, sourceTableId, false);
+ migrator.renameTable(false);
return new String[] {"Success"};
}
}
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index 196528d31c..8778b9d5e1 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.flink.table.procedure.ProcedureContext;
@@ -50,25 +51,13 @@ public class MigrateTableProcedure extends ProcedureBase {
String sourceTablePath,
String properties)
throws Exception {
- String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
- Identifier sourceTableId = Identifier.fromString(sourceTablePath);
- Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
-
- TableMigrationUtils.getImporter(
- connector,
- catalog,
- sourceTableId.getDatabaseName(),
- sourceTableId.getObjectName(),
- targetTableId.getDatabaseName(),
- targetTableId.getObjectName(),
- Runtime.getRuntime().availableProcessors(),
-
ParameterUtils.parseCommaSeparatedKeyValues(properties))
- .executeMigrate();
-
- LOG.info("Last step: rename " + targetTableId + " to " +
sourceTableId);
- catalog.renameTable(targetTableId, sourceTableId, false);
- return new String[] {"Success"};
+ return call(
+ procedureContext,
+ connector,
+ sourceTablePath,
+ properties,
+ Runtime.getRuntime().availableProcessors());
}
public String[] call(
@@ -78,12 +67,13 @@ public class MigrateTableProcedure extends ProcedureBase {
String properties,
Integer parallelism)
throws Exception {
- String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
+ String targetTablePath = sourceTablePath + PAIMON_SUFFIX;
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
- Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
+ Identifier targetTableId = Identifier.fromString(targetTablePath);
- TableMigrationUtils.getImporter(
+ Migrator migrator =
+ TableMigrationUtils.getImporter(
connector,
catalog,
sourceTableId.getDatabaseName(),
@@ -91,11 +81,11 @@ public class MigrateTableProcedure extends ProcedureBase {
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
parallelism,
-
ParameterUtils.parseCommaSeparatedKeyValues(properties))
- .executeMigrate();
+
ParameterUtils.parseCommaSeparatedKeyValues(properties));
+ LOG.info("create migrator success.");
+ migrator.executeMigrate();
- LOG.info("Last step: rename " + targetTableId + " to " +
sourceTableId);
- catalog.renameTable(targetTableId, sourceTableId, false);
+ migrator.renameTable(false);
return new String[] {"Success"};
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java
new file mode 100644
index 0000000000..1b9fcb46a9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure;
+
+import org.apache.flink.table.procedure.DefaultProcedureContext;
+
+import java.util.Map;
+
+/** Migrate from iceberg table to paimon table. */
+public class MigrateIcebergTableAction extends ActionBase {
+
+ private final String sourceTableFullName;
+ private final String tableProperties;
+ private final Integer parallelism;
+
+ private final String icebergProperties;
+
+ public MigrateIcebergTableAction(
+ String sourceTableFullName,
+ Map<String, String> catalogConfig,
+ String icebergProperties,
+ String tableProperties,
+ Integer parallelism) {
+ super(catalogConfig);
+ this.sourceTableFullName = sourceTableFullName;
+ this.tableProperties = tableProperties;
+ this.parallelism = parallelism;
+ this.icebergProperties = icebergProperties;
+ }
+
+ @Override
+ public void run() throws Exception {
+ MigrateIcebergTableProcedure migrateIcebergTableProcedure =
+ new MigrateIcebergTableProcedure();
+ migrateIcebergTableProcedure.withCatalog(catalog);
+ migrateIcebergTableProcedure.call(
+ new DefaultProcedureContext(env),
+ sourceTableFullName,
+ icebergProperties,
+ tableProperties,
+ parallelism);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java
new file mode 100644
index 0000000000..c85559d66b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Action Factory for {@link MigrateIcebergTableAction}. */
+public class MigrateIcebergTableActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "migrate_iceberg_table";
+
+ private static final String OPTIONS = "options";
+ private static final String PARALLELISM = "parallelism";
+
+ private static final String ICEBERG_OPTIONS = "iceberg_options";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+
+ String sourceTable = params.get(TABLE);
+ Map<String, String> catalogConfig = catalogConfigMap(params);
+ String tableConf = params.get(OPTIONS);
+ Integer parallelism =
+ params.get(PARALLELISM) == null ? null :
Integer.parseInt(params.get(PARALLELISM));
+
+ String icebergOptions = params.get(ICEBERG_OPTIONS);
+
+ MigrateIcebergTableAction migrateIcebergTableAction =
+ new MigrateIcebergTableAction(
+ sourceTable, catalogConfig, icebergOptions, tableConf,
parallelism);
+ return Optional.of(migrateIcebergTableAction);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println(
+ "Action \"migrate_iceberg_table\" runs a migrating job from
iceberg to paimon.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " migrate_iceberg_table"
+ + "--table <database.table_name> "
+ + "--iceberg_options <key>=<value>[,<key>=<value>,...]"
+ + "[--catalog_conf <key>=<value] "
+ + "[--options <key>=<value>,<key>=<value>,...]");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java
similarity index 77%
copy from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
copy to
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java
index fff05a1a85..f43d29ed4f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.flink.table.annotation.ArgumentHint;
@@ -29,22 +30,24 @@ import org.apache.flink.table.procedure.ProcedureContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** Migrate procedure to migrate hive table to paimon table. */
-public class MigrateTableProcedure extends ProcedureBase {
-
- private static final Logger LOG =
LoggerFactory.getLogger(MigrateTableProcedure.class);
+/** Migrate procedure to migrate iceberg table to paimon table. */
+public class MigrateIcebergTableProcedure extends ProcedureBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(MigrateIcebergTableProcedure.class);
private static final String PAIMON_SUFFIX = "_paimon_";
@Override
public String identifier() {
- return "migrate_table";
+ return "migrate_iceberg_table";
}
@ProcedureHint(
argument = {
- @ArgumentHint(name = "connector", type =
@DataTypeHint("STRING")),
@ArgumentHint(name = "source_table", type =
@DataTypeHint("STRING")),
+ @ArgumentHint(
+ name = "iceberg_options",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
@ArgumentHint(name = "options", type =
@DataTypeHint("STRING"), isOptional = true),
@ArgumentHint(
name = "parallelism",
@@ -53,12 +56,13 @@ public class MigrateTableProcedure extends ProcedureBase {
})
public String[] call(
ProcedureContext procedureContext,
- String connector,
String sourceTablePath,
+ String icebergProperties,
String properties,
Integer parallelism)
throws Exception {
properties = notnull(properties);
+ icebergProperties = notnull(icebergProperties);
String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
@@ -67,19 +71,20 @@ public class MigrateTableProcedure extends ProcedureBase {
Integer p = parallelism == null ?
Runtime.getRuntime().availableProcessors() : parallelism;
- TableMigrationUtils.getImporter(
- connector,
+ Migrator migrator =
+ TableMigrationUtils.getIcebergImporter(
catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
p,
-
ParameterUtils.parseCommaSeparatedKeyValues(properties))
- .executeMigrate();
+
ParameterUtils.parseCommaSeparatedKeyValues(properties),
+
ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties));
+ LOG.info("create migrator success.");
+ migrator.executeMigrate();
- LOG.info("Last step: rename " + targetTableId + " to " +
sourceTableId);
- catalog.renameTable(targetTableId, sourceTableId, false);
+ migrator.renameTable(false);
return new String[] {"Success"};
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index fff05a1a85..32a2a16dc5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.flink.table.annotation.ArgumentHint;
@@ -67,7 +68,8 @@ public class MigrateTableProcedure extends ProcedureBase {
Integer p = parallelism == null ?
Runtime.getRuntime().availableProcessors() : parallelism;
- TableMigrationUtils.getImporter(
+ Migrator migrator =
+ TableMigrationUtils.getImporter(
connector,
catalog,
sourceTableId.getDatabaseName(),
@@ -75,11 +77,11 @@ public class MigrateTableProcedure extends ProcedureBase {
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
p,
-
ParameterUtils.parseCommaSeparatedKeyValues(properties))
- .executeMigrate();
+
ParameterUtils.parseCommaSeparatedKeyValues(properties));
+ LOG.info("create migrator success.");
+ migrator.executeMigrate();
- LOG.info("Last step: rename " + targetTableId + " to " +
sourceTableId);
- catalog.renameTable(targetTableId, sourceTableId, false);
+ migrator.renameTable(false);
return new String[] {"Success"};
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
index b59c3592a9..4e7268c6f1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
@@ -22,7 +22,9 @@ import org.apache.paimon.catalog.CachingCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.migrate.HiveMigrator;
+import org.apache.paimon.iceberg.migrate.IcebergMigrator;
import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.options.Options;
import java.util.List;
import java.util.Map;
@@ -60,6 +62,27 @@ public class TableMigrationUtils {
}
}
+ public static Migrator getIcebergImporter(
+ Catalog catalog,
+ String sourceDatabase,
+ String sourceTableName,
+ String targetDatabase,
+ String targetTableName,
+ Integer parallelism,
+ Map<String, String> options,
+ Map<String, String> icebergOptions) {
+
+ Options icebergConf = new Options(icebergOptions);
+ return new IcebergMigrator(
+ catalog,
+ targetDatabase,
+ targetTableName,
+ sourceDatabase,
+ sourceTableName,
+ icebergConf,
+ parallelism);
+ }
+
public static List<Migrator> getImporters(
String connector,
Catalog catalog,
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6f6becf85f..efaa25627d 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -85,4 +85,5 @@ org.apache.paimon.flink.procedure.CloneProcedure
org.apache.paimon.flink.procedure.CompactManifestProcedure
org.apache.paimon.flink.procedure.RefreshObjectTableProcedure
org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure
-org.apache.paimon.flink.procedure.ClearConsumersProcedure
\ No newline at end of file
+org.apache.paimon.flink.procedure.ClearConsumersProcedure
+org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java
new file mode 100644
index 0000000000..3c0d7da024
--- /dev/null
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.client.ClientPool;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.HiveCatalog;
+import org.apache.paimon.hive.pool.CachedClientPool;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Get iceberg table latest snapshot metadata in hive. */
+public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata {
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergMigrateHiveMetadata.class);
+
+ public static final String TABLE_TYPE_PROP = "table_type";
+ public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg";
+ private static final String ICEBERG_METADATA_LOCATION =
"metadata_location";
+
+ private FileIO fileIO;
+ private final Options icebergOptions;
+ private final Identifier icebergIdentifier;
+
+ private final ClientPool<IMetaStoreClient, TException> clients;
+
+ private String metadataLocation = null;
+
+ private IcebergMetadata icebergMetadata;
+
+ public IcebergMigrateHiveMetadata(Identifier icebergIdentifier, Options
icebergOptions) {
+
+ this.icebergIdentifier = icebergIdentifier;
+ this.icebergOptions = icebergOptions;
+
+ String uri = icebergOptions.get(IcebergOptions.URI);
+ String hiveConfDir = icebergOptions.get(IcebergOptions.HIVE_CONF_DIR);
+ String hadoopConfDir =
icebergOptions.get(IcebergOptions.HADOOP_CONF_DIR);
+ Configuration hadoopConf = new Configuration();
+
hadoopConf.setClassLoader(IcebergMigrateHiveMetadata.class.getClassLoader());
+ HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir,
hadoopConfDir, hadoopConf);
+
+ icebergOptions.toMap().forEach(hiveConf::set);
+ if (uri != null) {
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri);
+ }
+
+ if (hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) {
+ LOG.error(
+ "Can't find hive metastore uri to connect: "
+ + "either set {} in iceberg options or set
hive.metastore.uris "
+ + "in hive-site.xml or hadoop configurations. "
+ + "Will use empty metastore uris, which means we
may use a embedded metastore. "
+ + "Please make sure hive metastore uri for iceberg
table is correctly set as expected.",
+ IcebergOptions.URI.key());
+ }
+
+ this.clients =
+ new CachedClientPool(
+ hiveConf,
+ icebergOptions,
+
icebergOptions.getString(IcebergOptions.HIVE_CLIENT_CLASS));
+ }
+
+ @Override
+ public IcebergMetadata icebergMetadata() {
+ try {
+ boolean isExist = tableExists(icebergIdentifier);
+ if (!isExist) {
+ throw new RuntimeException(
+ String.format(
+ "iceberg table %s is not existed in hive
metastore",
+ icebergIdentifier));
+ }
+ Table icebergHiveTable =
+ clients.run(
+ client ->
+ client.getTable(
+
icebergIdentifier.getDatabaseName(),
+ icebergIdentifier.getTableName()));
+ // check whether it is an iceberg table
+ String tableType =
icebergHiveTable.getParameters().get(TABLE_TYPE_PROP);
+ Preconditions.checkArgument(
+ tableType != null &&
tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE),
+ "not an iceberg table: %s (table-type=%s)",
+ icebergIdentifier.toString(),
+ tableType);
+
+ metadataLocation =
icebergHiveTable.getParameters().get(ICEBERG_METADATA_LOCATION);
+ LOG.info("iceberg latest metadata location: {}", metadataLocation);
+
+ fileIO = FileIO.get(new Path(metadataLocation),
CatalogContext.create(icebergOptions));
+
+ icebergMetadata = IcebergMetadata.fromPath(fileIO, new
Path(metadataLocation));
+ return icebergMetadata;
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to read Iceberg metadata from path
%s", metadataLocation),
+ e);
+ }
+ }
+
+ @Override
+ public String icebergLatestMetadataLocation() {
+ return metadataLocation;
+ }
+
+ @Override
+ public void deleteOriginTable() {
+ LOG.info("Iceberg table in hive to be deleted:{}",
icebergIdentifier.toString());
+ try {
+ clients.run(
+ client -> {
+ client.dropTable(
+ icebergIdentifier.getDatabaseName(),
+ icebergIdentifier.getTableName(),
+ true,
+ true);
+ return null;
+ });
+
+ // iceberg table in hive is external table, client.dropTable only
deletes the metadata
+ // of iceberg table in hive, so we manually delete the data files
+ Path icebergTablePath = new Path(icebergMetadata.location());
+
+ if (fileIO.exists(icebergTablePath) &&
fileIO.isDir(icebergTablePath)) {
+ fileIO.deleteDirectoryQuietly(icebergTablePath);
+ }
+ } catch (Exception e) {
+ LOG.warn("exception occurred when deleting origin table", e);
+ }
+ }
+
+ private boolean tableExists(Identifier identifier) throws Exception {
+ return clients.run(
+ client ->
+ client.tableExists(
+ identifier.getDatabaseName(),
identifier.getTableName()));
+ }
+}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java
new file mode 100644
index 0000000000..0a539cdec2
--- /dev/null
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.options.Options;
+
+/** Factory to create {@link IcebergMigrateHiveMetadata}. */
+public class IcebergMigrateHiveMetadataFactory implements
IcebergMigrateMetadataFactory {
+ @Override
+ public String identifier() {
+ return IcebergOptions.StorageType.HIVE_CATALOG.toString() + "_migrate";
+ }
+
+ @Override
+ public IcebergMigrateHiveMetadata create(Identifier icebergIdentifier,
Options icebergOptions) {
+ return new IcebergMigrateHiveMetadata(icebergIdentifier,
icebergOptions);
+ }
+}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 26f0944d91..608f034659 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -16,3 +16,4 @@
org.apache.paimon.hive.HiveCatalogFactory
org.apache.paimon.hive.HiveCatalogLockFactory
org.apache.paimon.iceberg.IcebergHiveMetadataCommitterFactory
+org.apache.paimon.iceberg.migrate.IcebergMigrateHiveMetadataFactory
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml
b/paimon-hive/paimon-hive-connector-common/pom.xml
index 397dfc9421..a79f2002ea 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -562,6 +562,25 @@ under the License.
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-flink-${iceberg.flink.version}</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-dropwizard</artifactId>
+ <version>${iceberg.flink.dropwizard.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java
new file mode 100644
index 0000000000..1875b08eba
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.procedure;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.MigrateIcebergTableAction;
+import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure;
+import org.apache.paimon.hive.TestHiveMetastore;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** Tests for {@link MigrateIcebergTableProcedure}. */
+public class MigrateIcebergTableProcedureITCase extends ActionITCaseBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MigrateIcebergTableProcedureITCase.class);
+
+ private static final TestHiveMetastore TEST_HIVE_METASTORE = new
TestHiveMetastore();
+
+ private static final int PORT = 9087;
+
+ @TempDir java.nio.file.Path iceTempDir;
+ @TempDir java.nio.file.Path paiTempDir;
+
+ @BeforeEach
+ public void beforeEach() {
+ TEST_HIVE_METASTORE.start(PORT);
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ TEST_HIVE_METASTORE.stop();
+ }
+
+ @Test
+ public void testMigrateIcebergTableProcedure() throws Exception {
+ TableEnvironment tEnv =
+ TableEnvironmentImpl.create(
+
EnvironmentSettings.newInstance().inBatchMode().build());
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ boolean isPartitioned = random.nextBoolean();
+ boolean icebergIsHive = random.nextBoolean();
+ boolean paimonIsHive = random.nextBoolean();
+ boolean isNamedArgument = random.nextBoolean();
+
+ // Logging the random arguments for debugging
+ LOG.info(
+ "isPartitioned:{}, icebergIsHive:{}, paimonIsHive:{},
isNamedArgument:{}",
+ isPartitioned,
+ icebergIsHive,
+ paimonIsHive,
+ isNamedArgument);
+
+ // create iceberg catalog, database, table, and insert some data to
iceberg table
+ tEnv.executeSql(icebergCatalogDdl(icebergIsHive));
+
+ String icebergTable = "iceberg_" +
UUID.randomUUID().toString().replace("-", "_");
+ tEnv.executeSql("USE CATALOG my_iceberg");
+ if (isPartitioned) {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE `default`.`%s` (id string, id2 int,
id3 int) PARTITIONED BY (id3)",
+ icebergTable));
+ } else {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE `default`.`%s` (id string, id2 int,
id3 int) WITH ('format-version'='2')",
+ icebergTable));
+ }
+ tEnv.executeSql(
+ String.format(
+ "INSERT INTO `default`.`%s` VALUES
('a',1,1),('b',2,2),('c',3,3)",
+ icebergTable))
+ .await();
+
+ tEnv.executeSql(paimonCatalogDdl(paimonIsHive));
+ tEnv.executeSql("USE CATALOG my_paimon");
+
+ String icebergOptions =
+ icebergIsHive
+ ? "metadata.iceberg.storage=hive-catalog,
metadata.iceberg.uri=thrift://localhost:"
+ + PORT
+ :
"metadata.iceberg.storage=hadoop-catalog,iceberg_warehouse=" + iceTempDir;
+ if (isNamedArgument) {
+ tEnv.executeSql(
+ String.format(
+ "CALL
sys.migrate_iceberg_table(source_table => 'default.%s', "
+ + "iceberg_options => '%s')",
+ icebergTable, icebergOptions))
+ .await();
+ } else {
+ tEnv.executeSql(
+ String.format(
+ "CALL
sys.migrate_iceberg_table('default.%s','%s')",
+ icebergTable, icebergOptions))
+ .await();
+ }
+
+ Assertions.assertThatList(
+ Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2),
Row.of("c", 3, 3)))
+ .containsExactlyInAnyOrderElementsOf(
+ ImmutableList.copyOf(
+ tEnv.executeSql(
+ String.format(
+ "SELECT * FROM
`default`.`%s`",
+ icebergTable))
+ .collect()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testMigrateIcebergTableAction(boolean isPartitioned) throws
Exception {
+ TableEnvironment tEnv =
+ TableEnvironmentImpl.create(
+
EnvironmentSettings.newInstance().inBatchMode().build());
+
+ // create iceberg catalog, database, table, and insert some data to
iceberg table
+ tEnv.executeSql(icebergCatalogDdl(true));
+
+ String icebergTable = "iceberg_" +
UUID.randomUUID().toString().replace("-", "_");
+ tEnv.executeSql("USE CATALOG my_iceberg");
+ if (isPartitioned) {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE `default`.`%s` (id string, id2 int,
id3 int) PARTITIONED BY (id3)",
+ icebergTable));
+ } else {
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE `default`.`%s` (id string, id2 int,
id3 int) WITH ('format-version'='2')",
+ icebergTable));
+ }
+ tEnv.executeSql(
+ String.format(
+ "INSERT INTO `default`.`%s` VALUES
('a',1,1),('b',2,2),('c',3,3)",
+ icebergTable))
+ .await();
+
+ String icebergOptions =
+ "metadata.iceberg.storage=hive-catalog,
metadata.iceberg.uri=thrift://localhost:"
+ + PORT;
+
+ Map<String, String> catalogConf = new HashMap<>();
+ catalogConf.put("warehouse", paiTempDir.toString());
+ catalogConf.put("metastore", "hive");
+ catalogConf.put("uri", "thrift://localhost:" + PORT);
+ catalogConf.put("cache-enabled", "false");
+
+ MigrateIcebergTableAction migrateIcebergTableAction =
+ new MigrateIcebergTableAction(
+ "default." + icebergTable, catalogConf,
icebergOptions, "", 6);
+ migrateIcebergTableAction.run();
+
+ tEnv.executeSql(paimonCatalogDdl(true));
+ tEnv.executeSql("USE CATALOG my_paimon");
+ Assertions.assertThatList(
+ Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2),
Row.of("c", 3, 3)))
+ .containsExactlyInAnyOrderElementsOf(
+ ImmutableList.copyOf(
+ tEnv.executeSql(
+ String.format(
+ "SELECT * FROM
`my_paimon`.`default`.`%s`",
+ icebergTable))
+ .collect()));
+ }
+
+ private String icebergCatalogDdl(boolean isHive) {
+ return isHive
+ ? String.format(
+ "CREATE CATALOG my_iceberg WITH "
+ + "( 'type' = 'iceberg', 'catalog-type' =
'hive', 'uri' = 'thrift://localhost:%s', "
+ + "'warehouse' = '%s', 'cache-enabled' =
'false')",
+ PORT, iceTempDir)
+ : String.format(
+ "CREATE CATALOG my_iceberg WITH "
+ + "( 'type' = 'iceberg', 'catalog-type' =
'hadoop',"
+ + "'warehouse' = '%s', 'cache-enabled' =
'false' )",
+ iceTempDir);
+ }
+
+ private String paimonCatalogDdl(boolean isHive) {
+ return isHive
+ ? String.format(
+ "CREATE CATALOG my_paimon WITH "
+ + "( 'type' = 'paimon', 'metastore' = 'hive',
'uri' = 'thrift://localhost:%s', "
+ + "'warehouse' = '%s', 'cache-enabled' =
'false' )",
+ PORT, iceTempDir)
+ : String.format(
+ "CREATE CATALOG my_paimon WITH ('type' = 'paimon',
'warehouse' = '%s')",
+ paiTempDir);
+ }
+}
diff --git a/paimon-hive/pom.xml b/paimon-hive/pom.xml
index c97aceb1b8..92f32d1336 100644
--- a/paimon-hive/pom.xml
+++ b/paimon-hive/pom.xml
@@ -50,6 +50,7 @@ under the License.
<reflections.version>0.9.8</reflections.version>
<aws.version>1.12.319</aws.version>
<iceberg.flink.version>1.19</iceberg.flink.version>
+
<iceberg.flink.dropwizard.version>1.19.0</iceberg.flink.dropwizard.version>
</properties>
<dependencies>