This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e01577c17 [clone][iceberg] support clone iceberg tables in hive to 
paimon (#5888)
6e01577c17 is described below

commit 6e01577c1788ee366ec5275e0040e90edd7f3129
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Jul 15 11:43:53 2025 +0800

    [clone][iceberg] support clone iceberg tables in hive to paimon (#5888)
---
 docs/content/migration/clone-to-paimon.md          |   6 +
 .../org/apache/paimon/migrate/FileMetaUtils.java   |  21 ++
 paimon-hive/paimon-hive-connector-common/pom.xml   |   1 -
 paimon-iceberg/pom.xml                             |  85 ++++++
 .../paimon/iceberg/IcebergHiveCloneExtractor.java  | 332 +++++++++++++++++++++
 ...org.apache.paimon.hive.clone.HiveCloneExtractor |  16 +
 .../paimon/flink/CloneActionForIcebergITCase.java  | 217 ++++++++++++++
 7 files changed, 677 insertions(+), 1 deletion(-)

diff --git a/docs/content/migration/clone-to-paimon.md 
b/docs/content/migration/clone-to-paimon.md
index 1bf6a6371d..493c63d28f 100644
--- a/docs/content/migration/clone-to-paimon.md
+++ b/docs/content/migration/clone-to-paimon.md
@@ -93,3 +93,9 @@ You can use excluded tables spec to specify the tables that 
don't need to be clo
 Clone Hudi needs dependency: 
[hudi-flink1.18-bundle-0.15.0.jar](https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink1.18-bundle/0.15.0/hudi-flink1.18-bundle-0.15.0.jar)
 
 The execution method is the same as the Hive table mentioned above.
+
+## Clone Iceberg Tables
+
+Clone Iceberg needs dependency: 
[iceberg-flink-runtime-1.20-1.8.1.jar](https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.20/1.8.1/iceberg-flink-runtime-1.20-1.8.1.jar)
+
+The execution method is the same as the Hive table mentioned above.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index e25957cac9..42b349d7d1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -245,6 +245,27 @@ public class FileMetaUtils {
         return binaryRow;
     }
 
+    public static BinaryRow writePartitionValue(
+            RowType partitionRowType,
+            List<Object> partitionValues,
+            List<BinaryWriter.ValueSetter> valueSetters) {
+        BinaryRow binaryRow = new BinaryRow(partitionRowType.getFieldCount());
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+
+        List<DataField> fields = partitionRowType.getFields();
+
+        for (int i = 0; i < fields.size(); i++) {
+            Object value = partitionValues.get(i);
+            if (value == null) {
+                binaryRowWriter.setNullAt(i);
+            } else {
+                valueSetters.get(i).setValue(binaryRowWriter, i, value);
+            }
+        }
+        binaryRowWriter.complete();
+        return binaryRow;
+    }
+
     public static SimpleStatsExtractor createSimpleStatsExtractor(Table table, 
String format) {
         CoreOptions options = ((FileStoreTable) table).coreOptions();
         SimpleColStatsCollector.Factory[] factories =
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml 
b/paimon-hive/paimon-hive-connector-common/pom.xml
index 0ca7e3ef0a..c9d078a2ed 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -348,7 +348,6 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-
         <dependency>
             <groupId>org.reflections</groupId>
             <artifactId>reflections</artifactId>
diff --git a/paimon-iceberg/pom.xml b/paimon-iceberg/pom.xml
index c3a7c3aa45..0ee1471127 100644
--- a/paimon-iceberg/pom.xml
+++ b/paimon-iceberg/pom.xml
@@ -36,6 +36,8 @@ under the License.
         <iceberg.version>1.8.1</iceberg.version>
         <flink.version>${paimon-flink-common.flink.version}</flink.version>
         <iceberg.flink.version>1.19</iceberg.flink.version>
+        <hive.version>2.3.10</hive.version>
+        
<iceberg.flink.dropwizard.version>1.19.0</iceberg.flink.dropwizard.version>
     </properties>
 
     <dependencies>
@@ -54,6 +56,21 @@ under the License.
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-hive-catalog</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-hive-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-test-utils</artifactId>
@@ -127,6 +144,13 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-metrics-dropwizard</artifactId>
+            <version>${iceberg.flink.dropwizard.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <!-- hadoop dependency -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>
@@ -265,6 +289,67 @@ under the License.
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>${hive.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.pentaho</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-druid</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite.avatica</groupId>
+                    <artifactId>avatica</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.calcite</groupId>
+                    <artifactId>calcite-avatica</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.commons</groupId>
+                    <artifactId>commons-compress</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
 
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
new file mode 100644
index 0000000000..d7cffa4212
--- /dev/null
+++ 
b/paimon-iceberg/src/main/java/org/apache/paimon/iceberg/IcebergHiveCloneExtractor.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryWriter;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.hive.clone.HiveCloneExtractor;
+import org.apache.paimon.hive.clone.HivePartitionFiles;
+import org.apache.paimon.hive.clone.HiveTableCloneExtractor;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableMetadataParser;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.SeekableInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iceberg.BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE;
+import static 
org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
+import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_DEFAULT;
+import static org.apache.iceberg.TableProperties.CURRENT_SCHEMA;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.DEFAULT_PARTITION_SPEC;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_DEFAULT;
+import static 
org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0;
+import static org.apache.paimon.CoreOptions.FILE_COMPRESSION;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+
+/** A {@link HiveCloneExtractor} for Iceberg tables. */
+public class IcebergHiveCloneExtractor extends HiveTableCloneExtractor {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergHiveCloneExtractor.class);
+
+    @Override
+    public boolean matches(Table table) {
+        return table.getParameters()
+                .getOrDefault(TABLE_TYPE_PROP, "")
+                .equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE);
+    }
+
+    @Override
+    public List<HivePartitionFiles> extractFiles(
+            Map<String, String> catalogOptions,
+            IMetaStoreClient client,
+            Table table,
+            FileIO fileIO,
+            Identifier identifier,
+            RowType partitionRowType,
+            String defaultPartitionName,
+            @Nullable PartitionPredicate predicate)
+            throws Exception {
+        String metadataPath = 
table.getParameters().get(METADATA_LOCATION_PROP);
+        String metadataJson = fileIO.readFileUtf8(new Path(metadataPath));
+        TableMetadata metadata = TableMetadataParser.fromJson(metadataJson);
+        Snapshot currentSnapshot = metadata.currentSnapshot();
+
+        if (metadata.schemas().size() > 1) {
+            LOG.warn("more than 1 schemas in iceberg!");
+        }
+        Preconditions.checkArgument(
+                metadata.specs().size() == 1,
+                "do not support clone iceberg table which had more than 1 
partitionSpec."
+                        + "table: %s, specs: %s",
+                identifier.toString(),
+                metadata.specs());
+
+        org.apache.iceberg.io.FileIO icebergFileIO = new IcebergFileIO(fileIO);
+        List<ManifestFile> dataManifests = 
currentSnapshot.dataManifests(icebergFileIO);
+        List<ManifestFile> deleteManifests = 
currentSnapshot.deleteManifests(icebergFileIO);
+        Preconditions.checkArgument(
+                deleteManifests.isEmpty(),
+                "do not support clone iceberg table which had 'DELETE' 
manifest file. "
+                        + "table: %s, size of deleteManifests: %s.",
+                identifier.toString(),
+                deleteManifests.size());
+
+        List<DataFile> dataFiles = readDataEntries(dataManifests, 
icebergFileIO);
+
+        if (partitionRowType.getFieldCount() == 0) {
+            // un-partition table
+            return Collections.singletonList(toHivePartitionFiles(dataFiles, 
BinaryRow.EMPTY_ROW));
+        } else {
+            // partition table
+            List<HivePartitionFiles> results = new ArrayList<>();
+            List<BinaryWriter.ValueSetter> valueSetters = new ArrayList<>();
+            partitionRowType
+                    .getFieldTypes()
+                    .forEach(type -> 
valueSetters.add(BinaryWriter.createValueSetter(type)));
+            Map<StructLike, List<DataFile>> groupedDataFiles =
+                    
dataFiles.stream().collect(Collectors.groupingBy(DataFile::partition));
+            for (Map.Entry<StructLike, List<DataFile>> entry : 
groupedDataFiles.entrySet()) {
+                List<Object> partitionValues =
+                        partitionToObjects(partitionRowType, (PartitionData) 
entry.getKey());
+                BinaryRow partitionRow =
+                        FileMetaUtils.writePartitionValue(
+                                partitionRowType, partitionValues, 
valueSetters);
+                results.add(toHivePartitionFiles(entry.getValue(), 
partitionRow));
+            }
+            return results;
+        }
+    }
+
+    @Override
+    public List<String> extractPartitionKeys(Table table) {
+        String schemaJson = table.getParameters().get(CURRENT_SCHEMA);
+        String specJson = 
table.getParameters().getOrDefault(DEFAULT_PARTITION_SPEC, "");
+        if (specJson.isEmpty()) {
+            return Collections.emptyList();
+        } else {
+            PartitionSpec spec =
+                    
PartitionSpecParser.fromJson(SchemaParser.fromJson(schemaJson), specJson);
+            return 
spec.fields().stream().map(PartitionField::name).collect(Collectors.toList());
+        }
+    }
+
+    @Override
+    public Map<String, String> extractOptions(Table table) {
+        Map<String, String> hiveTableOptions = table.getParameters();
+        Map<String, String> paimonOptions = new HashMap<>();
+        String comment = hiveTableOptions.get("comment");
+        if (comment != null) {
+            paimonOptions.put("hive.comment", comment);
+            paimonOptions.put("comment", comment);
+        }
+
+        String format =
+                table.getParameters()
+                        .getOrDefault(DEFAULT_FILE_FORMAT, 
DEFAULT_FILE_FORMAT_DEFAULT);
+        paimonOptions.put(FILE_FORMAT.key(), format);
+        Map<String, String> formatOptions =
+                getIdentifierPrefixOptions("write." + format, 
hiveTableOptions);
+        Map<String, String> sdFormatOptions =
+                getIdentifierPrefixOptions(
+                        "write." + format, 
table.getSd().getSerdeInfo().getParameters());
+        // TODO: iceberg format options should be transformed to paimon options
+        formatOptions.putAll(sdFormatOptions);
+        paimonOptions.putAll(formatOptions);
+
+        String compression = parseCompression(format, formatOptions);
+        if (compression != null) {
+            paimonOptions.put(FILE_COMPRESSION.key(), compression);
+        }
+        return paimonOptions;
+    }
+
+    private HivePartitionFiles toHivePartitionFiles(List<DataFile> dataFiles, 
BinaryRow partition) {
+        List<org.apache.paimon.fs.Path> paths = new 
ArrayList<>(dataFiles.size());
+        List<Long> fileSizes = new ArrayList<>(dataFiles.size());
+        String format = null;
+        for (DataFile file : dataFiles) {
+            // note: file.path() will be deprecated in 2.0, file.location() 
was introduced in 1.7,
+            // here using file.path() to be compatible with 1.7-
+            org.apache.paimon.fs.Path path = new 
org.apache.paimon.fs.Path(file.path().toString());
+            if (format == null) {
+                format = file.format().toString();
+            }
+            long fileSize = file.fileSizeInBytes();
+            paths.add(path);
+            fileSizes.add(fileSize);
+        }
+        return new HivePartitionFiles(partition, paths, fileSizes, format);
+    }
+
+    private List<DataFile> readDataEntries(
+            List<ManifestFile> dataManifests, org.apache.iceberg.io.FileIO io) 
{
+        List<DataFile> dateEntries = new ArrayList<>();
+        for (ManifestFile dataManifest : dataManifests) {
+            for (DataFile dataFile : ManifestFiles.read(dataManifest, io)) {
+                dateEntries.add(dataFile);
+            }
+        }
+        return dateEntries;
+    }
+
+    private List<Object> partitionToObjects(RowType partitionRowType, 
PartitionData partition) {
+        Preconditions.checkArgument(partition.size() == 
partitionRowType.getFieldCount());
+        List<Object> objects = new ArrayList<>();
+        for (int i = 0; i < partition.size(); i++) {
+            objects.add(partition.get(i));
+        }
+        return objects;
+    }
+
+    private String parseCompression(String format, Map<String, String> 
formatOptions) {
+        String compression = null;
+        if (Objects.equals(format, "avro")) {
+            compression =
+                    formatOptions.getOrDefault(
+                            "write.avro.compression-codec", 
AVRO_COMPRESSION_DEFAULT);
+        } else if (Objects.equals(format, "parquet")) {
+            compression =
+                    formatOptions.getOrDefault(
+                            "write.parquet.compression-codec",
+                            PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
+        } else if (Objects.equals(format, "orc")) {
+            compression =
+                    formatOptions.getOrDefault(
+                            "write.orc.compression-codec", 
ORC_COMPRESSION_DEFAULT);
+        }
+        return compression;
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Iceberg Class Wrapper
+    // 
-------------------------------------------------------------------------------------
+
+    private static class IcebergFileIO implements org.apache.iceberg.io.FileIO 
{
+        private final FileIO fileIO;
+
+        public IcebergFileIO(FileIO fileIO) {
+            this.fileIO = fileIO;
+        }
+
+        @Override
+        public InputFile newInputFile(String path) {
+
+            return new InputFile() {
+                @Override
+                public long getLength() {
+                    try {
+                        return fileIO.getFileSize(new Path(path));
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+
+                @Override
+                public SeekableInputStream newStream() {
+                    try {
+                        org.apache.paimon.fs.SeekableInputStream inputStream =
+                                fileIO.newInputStream(new Path(path));
+                        return new IcebergSeekableStreamAdapter(inputStream);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+
+                @Override
+                public String location() {
+                    return path;
+                }
+
+                @Override
+                public boolean exists() {
+                    return true;
+                }
+            };
+        }
+
+        @Override
+        public OutputFile newOutputFile(String path) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void deleteFile(String path) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private static class IcebergSeekableStreamAdapter extends 
SeekableInputStream {
+        private final org.apache.paimon.fs.SeekableInputStream paimonStream;
+
+        IcebergSeekableStreamAdapter(org.apache.paimon.fs.SeekableInputStream 
paimonStream) {
+            this.paimonStream = paimonStream;
+        }
+
+        @Override
+        public long getPos() throws IOException {
+            return paimonStream.getPos();
+        }
+
+        @Override
+        public void seek(long newPos) throws IOException {
+            paimonStream.seek(newPos);
+        }
+
+        @Override
+        public int read() throws IOException {
+            return paimonStream.read();
+        }
+    }
+}
diff --git 
a/paimon-iceberg/src/main/resources/META-INF/services/org.apache.paimon.hive.clone.HiveCloneExtractor
 
b/paimon-iceberg/src/main/resources/META-INF/services/org.apache.paimon.hive.clone.HiveCloneExtractor
new file mode 100644
index 0000000000..3470ee3e6a
--- /dev/null
+++ 
b/paimon-iceberg/src/main/resources/META-INF/services/org.apache.paimon.hive.clone.HiveCloneExtractor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.iceberg.IcebergHiveCloneExtractor
\ No newline at end of file
diff --git 
a/paimon-iceberg/src/test/java/org/apache/paimon/flink/CloneActionForIcebergITCase.java
 
b/paimon-iceberg/src/test/java/org/apache/paimon/flink/CloneActionForIcebergITCase.java
new file mode 100644
index 0000000000..e0b9e7a244
--- /dev/null
+++ 
b/paimon-iceberg/src/test/java/org/apache/paimon/flink/CloneActionForIcebergITCase.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.CloneAction;
+import org.apache.paimon.hive.TestHiveMetastore;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** Test clone Iceberg table. */
+public class CloneActionForIcebergITCase extends ActionITCaseBase {
+
+    private static final TestHiveMetastore TEST_HIVE_METASTORE = new 
TestHiveMetastore();
+
+    private static final int PORT = 9089;
+
+    @TempDir java.nio.file.Path iceTempDir;
+
+    @BeforeAll
+    public static void beforeAll() {
+        TEST_HIVE_METASTORE.start(PORT);
+    }
+
+    @AfterAll
+    public static void afterAll() throws Exception {
+        TEST_HIVE_METASTORE.stop();
+    }
+
+    @Test
+    public void testUnPartitionedTable() throws Exception {
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        String dbName = "icbergdb" + StringUtils.randomNumericString(10);
+        String tableName = "icebergtable" + 
StringUtils.randomNumericString(10);
+        String format = randomFormat();
+
+        sql(
+                tEnv,
+                "CREATE CATALOG my_iceberg WITH "
+                        + "( 'type' = 'iceberg', 'catalog-type' = 'hive', 
'uri' = 'thrift://localhost:%s', "
+                        + "'warehouse' = '%s', 'cache-enabled' = 'false')",
+                PORT,
+                iceTempDir);
+
+        sql(tEnv, "CREATE DATABASE my_iceberg.`%s`", dbName);
+
+        sql(
+                tEnv,
+                "CREATE TABLE my_iceberg.`%s`.`%s` ("
+                        + "  id STRING PRIMARY KEY NOT ENFORCED,"
+                        + "  name STRING,"
+                        + "  price INT"
+                        + ") WITH ("
+                        + "  'format-version'='2',"
+                        + "  'write.format.default'='%s'"
+                        + ")",
+                dbName,
+                tableName,
+                format);
+
+        List<String> insertValues = new ArrayList<>();
+        for (int i = 0; i < 50; i++) {
+            insertValues.add(String.format("('%s', '%s', %s)", i, "A", i));
+        }
+
+        sql(
+                tEnv,
+                "INSERT INTO my_iceberg.`%s`.`%s` VALUES %s",
+                dbName,
+                tableName,
+                String.join(",", insertValues));
+
+        List<Row> r1 = sql(tEnv, "SELECT * FROM my_iceberg.`%s`.`%s`", dbName, 
tableName);
+
+        sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = 
'%s')", warehouse);
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql("CREATE DATABASE test");
+
+        createAction(
+                        CloneAction.class,
+                        "clone",
+                        "--database",
+                        dbName,
+                        "--table",
+                        tableName,
+                        "--catalog_conf",
+                        "metastore=hive",
+                        "--catalog_conf",
+                        "uri=thrift://localhost:" + PORT,
+                        "--target_database",
+                        "test",
+                        "--target_table",
+                        "test_table",
+                        "--target_catalog_conf",
+                        "warehouse=" + warehouse)
+                .run();
+
+        List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
+        Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+    }
+
+    @Test
+    public void testPartitionedTable() throws Exception {
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        String dbName = "icbergdb" + StringUtils.randomNumericString(10);
+        String tableName = "icebergtable" + 
StringUtils.randomNumericString(10);
+        String format = randomFormat();
+
+        sql(
+                tEnv,
+                "CREATE CATALOG my_iceberg WITH "
+                        + "( 'type' = 'iceberg', 'catalog-type' = 'hive', 
'uri' = 'thrift://localhost:%s', "
+                        + "'warehouse' = '%s', 'cache-enabled' = 'false')",
+                PORT,
+                iceTempDir);
+
+        sql(tEnv, "CREATE DATABASE my_iceberg.`%s`", dbName);
+
+        sql(
+                tEnv,
+                "CREATE TABLE my_iceberg.`%s`.`%s` ("
+                        + "  id STRING PRIMARY KEY NOT ENFORCED,"
+                        + "  name STRING,"
+                        + "  price INT"
+                        + ") PARTITIONED BY (price) WITH ("
+                        + "  'format-version'='2',"
+                        + "  'write.format.default'='%s'"
+                        + ")",
+                dbName,
+                tableName,
+                format);
+
+        List<String> insertValues = new ArrayList<>();
+        for (int i = 0; i < 50; i++) {
+            insertValues.add(String.format("('%s', '%s', %s)", i, "A", i % 3));
+        }
+        sql(
+                tEnv,
+                "INSERT INTO my_iceberg.`%s`.`%s` VALUES %s",
+                dbName,
+                tableName,
+                String.join(",", insertValues));
+
+        List<Row> r1 = sql(tEnv, "SELECT * FROM my_iceberg.`%s`.`%s`", dbName, 
tableName);
+
+        sql(tEnv, "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse' = 
'%s')", warehouse);
+        tEnv.useCatalog("PAIMON");
+        tEnv.executeSql("CREATE DATABASE test");
+
+        createAction(
+                        CloneAction.class,
+                        "clone",
+                        "--database",
+                        dbName,
+                        "--table",
+                        tableName,
+                        "--catalog_conf",
+                        "metastore=hive",
+                        "--catalog_conf",
+                        "uri=thrift://localhost:" + PORT,
+                        "--target_database",
+                        "test",
+                        "--target_table",
+                        "test_table",
+                        "--target_catalog_conf",
+                        "warehouse=" + warehouse)
+                .run();
+
+        List<Row> r2 = sql(tEnv, "SELECT * FROM test.test_table");
+        Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2);
+    }
+
+    private List<Row> sql(TableEnvironment tEnv, String query, Object... args) 
{
+        try (CloseableIterator<Row> iter = 
tEnv.executeSql(String.format(query, args)).collect()) {
+            return ImmutableList.copyOf(iter);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private String randomFormat() {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int i = random.nextInt(3);
+        String[] formats = new String[] {"orc", "parquet", "avro"};
+        return formats[i];
+    }
+}

Reply via email to