This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6e01577c17 [clone][iceberg] support clone iceberg tables in hive to
paimon (#5888)
6e01577c17 is described below
commit 6e01577c1788ee366ec5275e0040e90edd7f3129
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Jul 15 11:43:53 2025 +0800
[clone][iceberg] support clone iceberg tables in hive to paimon (#5888)
---
docs/content/migration/clone-to-paimon.md | 6 +
.../org/apache/paimon/migrate/FileMetaUtils.java | 21 ++
paimon-hive/paimon-hive-connector-common/pom.xml | 1 -
paimon-iceberg/pom.xml | 85 ++++++
.../paimon/iceberg/IcebergHiveCloneExtractor.java | 332 +++++++++++++++++++++
...org.apache.paimon.hive.clone.HiveCloneExtractor | 16 +
.../paimon/flink/CloneActionForIcebergITCase.java | 217 ++++++++++++++
7 files changed, 677 insertions(+), 1 deletion(-)
diff --git a/docs/content/migration/clone-to-paimon.md
b/docs/content/migration/clone-to-paimon.md
index 1bf6a6371d..493c63d28f 100644
--- a/docs/content/migration/clone-to-paimon.md
+++ b/docs/content/migration/clone-to-paimon.md
@@ -93,3 +93,9 @@ You can use excluded tables spec to specify the tables that
don't need to be clo
Clone Hudi needs dependency:
[hudi-flink1.18-bundle-0.15.0.jar](https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.18-bundle/0.15.0/hudi-flink1.18-bundle-0.15.0.jar)
The execution method is the same as the Hive table mentioned above.
+
+## Clone Iceberg Tables
+
+Clone Iceberg needs dependency:
[iceberg-flink-runtime-1.20-1.8.1.jar](https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.20/1.8.1/iceberg-flink-runtime-1.20-1.8.1.jar)
+
+The execution method is the same as the Hive table mentioned above.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index e25957cac9..42b349d7d1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -245,6 +245,27 @@ public class FileMetaUtils {
return binaryRow;
}
+ public static BinaryRow writePartitionValue(
+ RowType partitionRowType,
+ List<Object> partitionValues,
+ List<BinaryWriter.ValueSetter> valueSetters) {
+ BinaryRow binaryRow = new BinaryRow(partitionRowType.getFieldCount());
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+
+ List<DataField> fields = partitionRowType.getFields();
+
+ for (int i = 0; i < fields.size(); i++) {
+ Object value = partitionValues.get(i);
+ if (value == null) {
+ binaryRowWriter.setNullAt(i);
+ } else {
+ valueSetters.get(i).setValue(binaryRowWriter, i, value);
+ }
+ }
+ binaryRowWriter.complete();
+ return binaryRow;
+ }
+
public static SimpleStatsExtractor createSimpleStatsExtractor(Table table,
String format) {
CoreOptions options = ((FileStoreTable) table).coreOptions();
SimpleColStatsCollector.Factory[] factories =
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml
b/paimon-hive/paimon-hive-connector-common/pom.xml
index 0ca7e3ef0a..c9d078a2ed 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -348,7 +348,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>
-
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
diff --git a/paimon-iceberg/pom.xml b/paimon-iceberg/pom.xml
index c3a7c3aa45..0ee1471127 100644
--- a/paimon-iceberg/pom.xml
+++ b/paimon-iceberg/pom.xml
@@ -36,6 +36,8 @@ under the License.
<iceberg.version>1.8.1</iceberg.version>
<flink.version>${paimon-flink-common.flink.version}</flink.version>
<iceberg.flink.version>1.19</iceberg.flink.version>
+ <hive.version>2.3.10</hive.version>
+
<iceberg.flink.dropwizard.version>1.19.0</iceberg.flink.dropwizard.version>
</properties>
<dependencies>
@@ -54,6 +56,21 @@ under the License.
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-hive-catalog</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-hive-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-test-utils</artifactId>
@@ -127,6 +144,13 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-metrics-dropwizard</artifactId>
+ <version>${iceberg.flink.dropwizard.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- hadoop dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -265,6 +289,67 @@ under the License.
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>${hive.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.pentaho</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-druid</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite.avatica</groupId>
+ <artifactId>avatica</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.calcite</groupId>
+ <artifactId>calcite-avatica</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
</dependencies>
</project>
\ No newline at end of file
diff --git
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
new file mode 100644
index 0000000000..d7cffa4212
--- /dev/null
+++
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.clone.HiveCloneExtractor;
+import org.apache.paimon.hive.clone.HivePartitionFiles;
+import org.apache.paimon.hive.clone.HiveTableCloneExtractor;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static
org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
+import static
org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
+import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_DEFAULT;
+import static org.apache.iceberg.TableProperties.CURRENT_SCHEMA;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.DEFAULT_PARTITION_SPEC;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_DEFAULT;
+import static
org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0;
+import static org.apache.paimon.CoreOptions.FILE_COMPRESSION;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+
+/** A {@link HiveCloneExtractor} for Iceberg tables. */
+public class IcebergHiveCloneExtractor extends HiveTableCloneExtractor {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergHiveCloneExtractor.class);
+
+ @Override
+ public boolean matches(Table table) {
+ return table.getParameters()
+ .getOrDefault(TABLE_TYPE_PROP, "")
+ .equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE);
+ }
+
+ @Override
+ public List<HivePartitionFiles> extractFiles(
+ Map<String, String> catalogOptions,
+ IMetaStoreClient client,
+ Table table,
+ FileIO fileIO,
+ Identifier identifier,
+ RowType partitionRowType,
+ String defaultPartitionName,
+ @Nullable PartitionPredicate predicate)
+ throws Exception {
+ String metadataPath =
table.getParameters().get(METADATA_LOCATION_PROP);
+ String metadataJson = fileIO.readFileUtf8(new Path(metadataPath));
+ TableMetadata metadata = TableMetadataParser.fromJson(metadataJson);
+ Snapshot currentSnapshot = metadata.currentSnapshot();
+
+ if (metadata.schemas().size() > 1) {
+ LOG.warn("more than 1 schemas in iceberg!");
+ }
+ Preconditions.checkArgument(
+ metadata.specs().size() == 1,
+ "do not support clone iceberg table which had more than 1
partitionSpec."
+ + "table: %s, specs: %s",
+ identifier.toString(),
+ metadata.specs());
+
+ org.apache.iceberg.io.FileIO icebergFileIO = new IcebergFileIO(fileIO);
+ List<ManifestFile> dataManifests =
currentSnapshot.dataManifests(icebergFileIO);
+ List<ManifestFile> deleteManifests =
currentSnapshot.deleteManifests(icebergFileIO);
+ Preconditions.checkArgument(
+ deleteManifests.isEmpty(),
+ "do not support clone iceberg table which had 'DELETE'
manifest file. "
+ + "table: %s, size of deleteManifests: %s.",
+ identifier.toString(),
+ deleteManifests.size());
+
+ List<DataFile> dataFiles = readDataEntries(dataManifests,
icebergFileIO);
+
+ if (partitionRowType.getFieldCount() == 0) {
+ // un-partition table
+ return Collections.singletonList(toHivePartitionFiles(dataFiles,
BinaryRow.EMPTY_ROW));
+ } else {
+ // partition table
+ List<HivePartitionFiles> results = new ArrayList<>();
+ List<BinaryWriter.ValueSetter> valueSetters = new ArrayList<>();
+ partitionRowType
+ .getFieldTypes()
+ .forEach(type ->
valueSetters.add(BinaryWriter.createValueSetter(type)));
+ Map<StructLike, List<DataFile>> groupedDataFiles =
+
dataFiles.stream().collect(Collectors.groupingBy(DataFile::partition));
+ for (Map.Entry<StructLike, List<DataFile>> entry :
groupedDataFiles.entrySet()) {
+ List<Object> partitionValues =
+ partitionToObjects(partitionRowType, (PartitionData)
entry.getKey());
+ BinaryRow partitionRow =
+ FileMetaUtils.writePartitionValue(
+ partitionRowType, partitionValues,
valueSetters);
+ results.add(toHivePartitionFiles(entry.getValue(),
partitionRow));
+ }
+ return results;
+ }
+ }
+
+ @Override
+ public List<String> extractPartitionKeys(Table table) {
+ String schemaJson = table.getParameters().get(CURRENT_SCHEMA);
+ String specJson =
table.getParameters().getOrDefault(DEFAULT_PARTITION_SPEC, "");
+ if (specJson.isEmpty()) {
+ return Collections.emptyList();
+ } else {
+ PartitionSpec spec =
+
PartitionSpecParser.fromJson(SchemaParser.fromJson(schemaJson), specJson);
+ return
spec.fields().stream().map(PartitionField::name).collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ public Map<String, String> extractOptions(Table table) {
+ Map<String, String> hiveTableOptions = table.getParameters();
+ Map<String, String> paimonOptions = new HashMap<>();
+ String comment = hiveTableOptions.get("comment");
+ if (comment != null) {
+ paimonOptions.put("hive.comment", comment);
+ paimonOptions.put("comment", comment);
+ }
+
+ String format =
+ table.getParameters()
+ .getOrDefault(DEFAULT_FILE_FORMAT,
DEFAULT_FILE_FORMAT_DEFAULT);
+ paimonOptions.put(FILE_FORMAT.key(), format);
+ Map<String, String> formatOptions =
+ getIdentifierPrefixOptions("write." + format,
hiveTableOptions);
+ Map<String, String> sdFormatOptions =
+ getIdentifierPrefixOptions(
+ "write." + format,
table.getSd().getSerdeInfo().getParameters());
+ // TODO: iceberg format options should be transformed to paimon options
+ formatOptions.putAll(sdFormatOptions);
+ paimonOptions.putAll(formatOptions);
+
+ String compression = parseCompression(format, formatOptions);
+ if (compression != null) {
+ paimonOptions.put(FILE_COMPRESSION.key(), compression);
+ }
+ return paimonOptions;
+ }
+
+ private HivePartitionFiles toHivePartitionFiles(List<DataFile> dataFiles,
BinaryRow partition) {
+ List<org.apache.paimon.fs.Path> paths = new
ArrayList<>(dataFiles.size());
+ List<Long> fileSizes = new ArrayList<>(dataFiles.size());
+ String format = null;
+ for (DataFile file : dataFiles) {
+ // note: file.path() will be deprecated in 2.0, file.location()
was introduced in 1.7,
+ // here using file.path() to be compatible with 1.7-
+ org.apache.paimon.fs.Path path = new
org.apache.paimon.fs.Path(file.path().toString());
+ if (format == null) {
+ format = file.format().toString();
+ }
+ long fileSize = file.fileSizeInBytes();
+ paths.add(path);
+ fileSizes.add(fileSize);
+ }
+ return new HivePartitionFiles(partition, paths, fileSizes, format);
+ }
+
+ private List<DataFile> readDataEntries(
+ List<ManifestFile> dataManifests, org.apache.iceberg.io.FileIO io)
{
+ List<DataFile> dateEntries = new ArrayList<>();
+ for (ManifestFile dataManifest : dataManifests) {
+ for (DataFile dataFile : ManifestFiles.read(dataManifest, io)) {
+ dateEntries.add(dataFile);
+ }
+ }
+ return dateEntries;
+ }
+
+ private List<Object> partitionToObjects(RowType partitionRowType,
PartitionData partition) {
+ Preconditions.checkArgument(partition.size() ==
partitionRowType.getFieldCount());
+ List<Object> objects = new ArrayList<>();
+ for (int i = 0; i < partition.size(); i++) {
+ objects.add(partition.get(i));
+ }
+ return objects;
+ }
+
+ private String parseCompression(String format, Map<String, String>
formatOptions) {
+ String compression = null;
+ if (Objects.equals(format, "avro")) {
+ compression =
+ formatOptions.getOrDefault(
+ "write.avro.compression-codec",
AVRO_COMPRESSION_DEFAULT);
+ } else if (Objects.equals(format, "parquet")) {
+ compression =
+ formatOptions.getOrDefault(
+ "write.parquet.compression-codec",
+ PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
+ } else if (Objects.equals(format, "orc")) {
+ compression =
+ formatOptions.getOrDefault(
+ "write.orc.compression-codec",
ORC_COMPRESSION_DEFAULT);
+ }
+ return compression;
+ }
+
+ //
-------------------------------------------------------------------------------------
+ // Iceberg Class Wrapper
+ //
-------------------------------------------------------------------------------------
+
+ private static class IcebergFileIO implements org.apache.iceberg.io.FileIO
{
+ private final FileIO fileIO;
+
+ public IcebergFileIO(FileIO fileIO) {
+ this.fileIO = fileIO;
+ }
+
+ @Override
+ public InputFile newInputFile(String path) {
+
+ return new InputFile() {
+ @Override
+ public long getLength() {
+ try {
+ return fileIO.getFileSize(new Path(path));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ try {
+ org.apache.paimon.fs.SeekableInputStream inputStream =
+ fileIO.newInputStream(new Path(path));
+ return new IcebergSeekableStreamAdapter(inputStream);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String location() {
+ return path;
+ }
+
+ @Override
+ public boolean exists() {
+ return true;
+ }
+ };
+ }
+
+ @Override
+ public OutputFile newOutputFile(String path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class IcebergSeekableStreamAdapter extends
SeekableInputStream {
+ private final org.apache.paimon.fs.SeekableInputStream paimonStream;
+
+ IcebergSeekableStreamAdapter(org.apache.paimon.fs.SeekableInputStream
paimonStream) {
+ this.paimonStream = paimonStream;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return paimonStream.getPos();
+ }
+
+ @Override
+ public void seek(long newPos) throws IOException {
+ paimonStream.seek(newPos);
+ }
+
+ @Override
+ public int read() throws IOException {
+ return paimonStream.read();
+ }
+ }
+}
diff --git
a/paimon-iceberg/src/main/resources/META-INF/services/org.apache.paimon.hive.clone.HiveCloneExtractor
b/paimon-iceberg/src/main/resources/META-INF/services/org.apache.paimon.hive.clone.HiveCloneExtractor
new file mode 100644
index 0000000000..3470ee3e6a
--- /dev/null
+++
b/paimon-iceberg/src/main/resources/META-INF/services/org.apache.paimon.hive.clone.HiveCloneExtractor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.iceberg.IcebergHiveCloneExtractor
\ No newline at end of file
diff --git
a/paimon-iceberg/src/test/java/org/apache/paimon/flink/CloneActionForIcebergITCase.java
b/paimon-iceberg/src/test/java/org/apache/paimon/flink/CloneActionForIcebergITCase.java
new file mode 100644
index 0000000000..e0b9e7a244
--- /dev/null
+++
b/paimon-iceberg/src/test/java/org/apache/paimon/flink/CloneActionForIcebergITCase.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.CloneAction;
+import org.apache.paimon.hive.TestHiveMetastore;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** Test clone Iceberg table. */
+public class CloneActionForIcebergITCase extends ActionITCaseBase {
+
+ private static final TestHiveMetastore TEST_HIVE_METASTORE = new
TestHiveMetastore();
+
+ private static final int PORT = 9089;
+
+ @TempDir java.nio.file.Path iceTempDir;
+
+ @BeforeAll
+ public static void beforeAll() {
+ TEST_HIVE_METASTORE.start(PORT);
+ }
+
+ @AfterAll
+ public static void afterAll() throws Exception {
+ TEST_HIVE_METASTORE.stop();
+ }
+
+ @Test
+ public void testUnPartitionedTable() throws Exception {
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ String dbName = "icbergdb" + StringUtils.randomNumericString(10);
+ String tableName = "icebergtable" +
StringUtils.randomNumericString(10);
+ String format = randomFormat();
+
+ sql(
+ tEnv,
+ "CREATE CATALOG my_iceberg WITH "
+ + "( 'type' = 'iceberg', 'catalog-type' = 'hive',
'uri' = 'thrift://localhost:%s', "
+ + "'warehouse' = '%s', 'cache-enabled' = 'false')",
+ PORT,
+ iceTempDir);
+
+ sql(tEnv, "CREATE DATABASE my_iceberg.`%s`", dbName);
+
+ sql(
+ tEnv,
+ "CREATE TABLE my_iceberg.`%s`.`%s` ("
+ + " id STRING PRIMARY KEY NOT ENFORCED,"
+ + " name STRING,"
+ + " price INT"
+ + ") WITH ("
+ + " 'format-version'='2',"
+ + " 'write.format.default'='%s'"
+ + ")",
+ dbName,
+ tableName,
+ format);
+
+ List<String> insertValues = new ArrayList<>();
+ for (int i = 0; i < 50; i++) {
+ insertValues.add(String.format("('%s', '%s', %s)", i, "A", i));
+ }
+
+ sql(
+ tEnv,
+ "INSERT INTO my_iceberg.`%s`.`%s` VALUES %s",
+ dbName,
+ tableName,
+ String.join(",", insertValues));
+
+ List<Row> r1 = sql(tEnv, "SELECT * FROM my_iceberg.`%s`.`%s`", dbName,
tableName);
+
+ sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' =
'%s')", warehouse);
+ tEnv.useCatalog("PAIMON");
+ tEnv.executeSql("CREATE DATABASE test");
+
+ createAction(
+ CloneAction.class,
+ "clone",
+ "--database",
+ dbName,
+ "--table",
+ tableName,
+ "--catalog_conf",
+ "metastore=hive",
+ "--catalog_conf",
+ "uri=thrift://localhost:" + PORT,
+ "--target_database",
+ "test",
+ "--target_table",
+ "test_table",
+ "--target_catalog_conf",
+ "warehouse=" + warehouse)
+ .run();
+
+ List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
+ Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+ }
+
+ @Test
+ public void testPartitionedTable() throws Exception {
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ String dbName = "icbergdb" + StringUtils.randomNumericString(10);
+ String tableName = "icebergtable" +
StringUtils.randomNumericString(10);
+ String format = randomFormat();
+
+ sql(
+ tEnv,
+ "CREATE CATALOG my_iceberg WITH "
+ + "( 'type' = 'iceberg', 'catalog-type' = 'hive',
'uri' = 'thrift://localhost:%s', "
+ + "'warehouse' = '%s', 'cache-enabled' = 'false')",
+ PORT,
+ iceTempDir);
+
+ sql(tEnv, "CREATE DATABASE my_iceberg.`%s`", dbName);
+
+ sql(
+ tEnv,
+ "CREATE TABLE my_iceberg.`%s`.`%s` ("
+ + " id STRING PRIMARY KEY NOT ENFORCED,"
+ + " name STRING,"
+ + " price INT"
+ + ") PARTITIONED BY (price) WITH ("
+ + " 'format-version'='2',"
+ + " 'write.format.default'='%s'"
+ + ")",
+ dbName,
+ tableName,
+ format);
+
+ List<String> insertValues = new ArrayList<>();
+ for (int i = 0; i < 50; i++) {
+ insertValues.add(String.format("('%s', '%s', %s)", i, "A", i % 3));
+ }
+ sql(
+ tEnv,
+ "INSERT INTO my_iceberg.`%s`.`%s` VALUES %s",
+ dbName,
+ tableName,
+ String.join(",", insertValues));
+
+ List<Row> r1 = sql(tEnv, "SELECT * FROM my_iceberg.`%s`.`%s`", dbName,
tableName);
+
+ sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' =
'%s')", warehouse);
+ tEnv.useCatalog("PAIMON");
+ tEnv.executeSql("CREATE DATABASE test");
+
+ createAction(
+ CloneAction.class,
+ "clone",
+ "--database",
+ dbName,
+ "--table",
+ tableName,
+ "--catalog_conf",
+ "metastore=hive",
+ "--catalog_conf",
+ "uri=thrift://localhost:" + PORT,
+ "--target_database",
+ "test",
+ "--target_table",
+ "test_table",
+ "--target_catalog_conf",
+ "warehouse=" + warehouse)
+ .run();
+
+ List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
+ Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+ }
+
+ private List<Row> sql(TableEnvironment tEnv, String query, Object... args)
{
+ try (CloseableIterator<Row> iter =
tEnv.executeSql(String.format(query, args)).collect()) {
+ return ImmutableList.copyOf(iter);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String randomFormat() {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int i = random.nextInt(3);
+ String[] formats = new String[] {"orc", "parquet", "avro"};
+ return formats[i];
+ }
+}