This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ada017e7dc [iceberg] support deletion vector in Iceberg compatibility 
(#5670)
ada017e7dc is described below

commit ada017e7dc49d17364dc9ee5cbd83ca331d4d44a
Author: LsomeYeah <[email protected]>
AuthorDate: Fri May 30 13:07:28 2025 +0800

    [iceberg] support deletion vector in Iceberg compatibility (#5670)
---
 .github/workflows/check-licensing.yml              |   2 +-
 .github/workflows/utitcase-jdk11.yml               |  14 +-
 .github/workflows/utitcase-spark-3.x.yml           |   2 +-
 .github/workflows/utitcase.yml                     |   4 +-
 docs/content/migration/iceberg-compatibility.md    | 122 ++++++++
 .../generated/iceberg_configuration.html           |   6 +
 .../paimon/iceberg/IcebergCommitCallback.java      | 301 ++++++++++++++++++--
 .../org/apache/paimon/iceberg/IcebergOptions.java  |   8 +
 .../iceberg/manifest/IcebergDataFileMeta.java      |  88 +++++-
 .../manifest/IcebergDataFileMetaSerializer.java    |  10 +-
 .../iceberg/manifest/IcebergManifestFile.java      |  20 +-
 .../paimon/iceberg/metadata/IcebergMetadata.java   |   6 +-
 .../org/apache/paimon/index/IndexFileHandler.java  |   5 +-
 .../metastore/AddPartitionCommitCallback.java      |   6 +-
 .../paimon/metastore/TagPreviewCommitCallback.java |   6 +-
 .../paimon/operation/FileStoreCommitImpl.java      |   2 +-
 .../apache/paimon/table/sink/CommitCallback.java   |   6 +-
 .../paimon/iceberg/IcebergCompatibilityTest.java   |  50 ++++
 .../apache/paimon/table/sink/TableCommitTest.java  |   6 +-
 paimon-iceberg/pom.xml                             | 187 ++++++++++++
 .../paimon/core/IcebergDVCompatibilityTest.java    | 314 +++++++++++++++++++++
 .../apache/paimon/flink/FlinkIcebergITCase.java    | 128 +++++++++
 .../org/apache/paimon/spark/PaimonCommitTest.scala |   8 +-
 pom.xml                                            |   1 +
 24 files changed, 1239 insertions(+), 63 deletions(-)

diff --git a/.github/workflows/check-licensing.yml 
b/.github/workflows/check-licensing.yml
index 101d331e90..ab72928779 100644
--- a/.github/workflows/check-licensing.yml
+++ b/.github/workflows/check-licensing.yml
@@ -41,7 +41,7 @@ jobs:
         run: |
           set -o pipefail
 
-          mvn clean deploy ${{ env.MVN_COMMON_OPTIONS }} -DskipTests \
+          mvn clean deploy ${{ env.MVN_COMMON_OPTIONS }} -DskipTests -pl 
"!paimon-iceberg" \
             -DaltDeploymentRepository=validation_repository::default::file:${{ 
env.MVN_VALIDATION_DIR }} \
             | tee ${{ env.MVN_BUILD_OUTPUT_FILE }}
 
diff --git a/.github/workflows/utitcase-jdk11.yml 
b/.github/workflows/utitcase-jdk11.yml
index e640f0921d..26105a3b01 100644
--- a/.github/workflows/utitcase-jdk11.yml
+++ b/.github/workflows/utitcase-jdk11.yml
@@ -19,21 +19,17 @@
 name: UTCase and ITCase Others on JDK 11
 
 on:
-  issue_comment:
-    types: [created, edited, deleted]
-
-  # daily run
-  schedule:
-    - cron: "0 0 * * *"
+  push:
+  pull_request:
+    paths-ignore:
+      - 'docs/**'
+      - '**/*.md'
 
 env:
   JDK_VERSION: 11
 
 jobs:
   build:
-    if: |
-      github.event_name == 'schedule' ||
-      (contains(github.event.comment.html_url, '/pull/') && 
contains(github.event.comment.body, '/jdk11'))
     runs-on: ubuntu-latest
 
     steps:
diff --git a/.github/workflows/utitcase-spark-3.x.yml 
b/.github/workflows/utitcase-spark-3.x.yml
index d06554108a..06ca1d88d0 100644
--- a/.github/workflows/utitcase-spark-3.x.yml
+++ b/.github/workflows/utitcase-spark-3.x.yml
@@ -45,7 +45,7 @@ jobs:
           java-version: ${{ env.JDK_VERSION }}
           distribution: 'temurin'
       - name: Build Spark
-        run:  mvn -T 2C -B clean install -DskipTests
+        run:  mvn -T 2C -B clean install -DskipTests -pl "!paimon-iceberg"
       - name: Test Spark
         timeout-minutes: 60
         run: |
diff --git a/.github/workflows/utitcase.yml b/.github/workflows/utitcase.yml
index 012e6a68c8..1a1f927806 100644
--- a/.github/workflows/utitcase.yml
+++ b/.github/workflows/utitcase.yml
@@ -49,7 +49,7 @@ jobs:
       - name: Build Others
         run: |
           echo "Start compiling modules"
-          mvn -T 2C -B clean install -DskipTests -Pflink1,spark3
+          mvn -T 2C -B clean install -DskipTests -Pflink1,spark3 -pl 
"!paimon-iceberg"
 
       - name: Test Others
         timeout-minutes: 60
@@ -58,7 +58,7 @@ jobs:
           jvm_timezone=$(random_timezone)
           echo "JVM timezone is set to $jvm_timezone"
           
-          TEST_MODULES="!paimon-e2e-tests,"
+          TEST_MODULES="!paimon-e2e-tests,!paimon-iceberg,"
           for suffix in 3.5 3.4 3.3 3.2 ut; do
             TEST_MODULES+="!org.apache.paimon:paimon-spark-${suffix},"
           done
diff --git a/docs/content/migration/iceberg-compatibility.md 
b/docs/content/migration/iceberg-compatibility.md
index f624b6ff54..12d34c8dc6 100644
--- a/docs/content/migration/iceberg-compatibility.md
+++ b/docs/content/migration/iceberg-compatibility.md
@@ -348,6 +348,128 @@ You can configure the following table option, so that 
Paimon is forced to perfor
 Note that full compaction is a resource-consuming process, so the value of 
this table option should not be too small.
 We recommend full compaction to be performed once or twice per hour.
 
+## Deletion Vector Support
+
+[Deletion vectors]({{< ref "concepts/spec/tableindex#deletion-vectors" >}}) in 
Paimon are used to store deleted records for each file. 
+Under deletion-vector mode, paimon readers can directly filter out unnecessary 
records during reading phase without merging data.
+Fortunately, Iceberg has supported [deletion 
vectors](https://iceberg.apache.org/spec/?h=deletion#deletion-vectors) in 
[Version 3](https://iceberg.apache.org/spec/?h=deletion#version-3).
+This means that if the Iceberg reader can recognize Paimon's deletion vectors, 
it will be able to read all of Paimon's data, even without the ability to merge 
data files.
+With Paimon's deletion vectors synchronized to Iceberg, Iceberg reader and 
Paimon reader can achieve true real-time synchronization.
+
+
+If the following conditions are met, it will construct metadata about Paimon's 
deletion vectors for Iceberg.
+* '`deletion-vectors.enabled`' and '`deletion-vectors.bitmap64`' should be set 
to true. Because only 64-bit bitmap implementation of deletion vector in Paimon 
is compatible with Iceberg.
+* '`metadata.iceberg.format-version`'(default value is 2) should be set to 3. 
Because Iceberg only supports deletion vector in V3.
+* Version of Iceberg should be 1.8.0+.
+* JDK version should be 11+. Iceberg has stopped supporting JDK 8 since 
version 1.7.0.
+
+Here is an example:
+{{< tabs "deletion-vector-table" >}}
+
+{{< tab "Flink SQL" >}}
+```sql
+-- flink version: 1.20.1
+
+CREATE CATALOG paimon_catalog WITH (
+    'type' = 'paimon',
+    'warehouse' = '<path-to-warehouse>'
+);
+
+-- Create a paimon table with primary key and enable deletion vector
+CREATE TABLE paimon_catalog.`default`.T
+(
+    pt  INT
+    ,k  INT
+    ,v  INT
+    ,PRIMARY KEY (pt, k) NOT ENFORCED
+)PARTITIONED BY (pt)
+WITH (
+    'metadata.iceberg.storage' = 'hadoop-catalog'
+    ,'metadata.iceberg.format-version' = '3'
+    ,'deletion-vectors.enabled' = 'true'
+    ,'deletion-vectors.bitmap64' = 'true'
+);
+
+INSERT INTO paimon_catalog.`default`.T
+VALUES (1, 9, 90), (1, 10, 100), (1, 11, 110), (2, 20, 200)
+;
+
+-- iceberg version: 1.8.1
+CREATE CATALOG iceberg_catalog WITH (
+    'type' = 'iceberg',
+    'catalog-type' = 'hadoop',
+    'warehouse' = '<path-to-warehouse>/iceberg',
+    'cache-enabled' = 'false' -- disable iceberg catalog caching to quickly 
see the result
+);
+
+SELECT * FROM iceberg_catalog.`default`.T;
+/*
++------------+------------+------------+
+|         pt |          k |          v |
++------------+------------+------------+
+|          2 |         20 |        200 |
+|          1 |          9 |         90 |
+|          1 |         10 |        100 |
+|          1 |         11 |        110 |
++------------+------------+------------+
+*/
+
+-- insert some data again, this will generate deletion vectors
+INSERT INTO paimon_catalog.`default`.T
+VALUES (1, 10, 101), (2, 20, 201), (1, 12, 121)
+;
+
+-- select deletion-vector index in paimon
+SELECT * FROM paimon_catalog.`default`.`T$table_indexes` WHERE 
index_type='DELETION_VECTORS';
+/*
++------------+-----------+-------------------+------------------------   
-----+------------+------------+--------------------------------+
+|  partition |    bucket |        index_type |                      file_name 
|  file_size |  row_count |                      dv_ranges |
++------------+-----------+-------------------+------------------------   
-----+------------+------------+--------------------------------+
+|        {1} |         0 |  DELETION_VECTORS | index-4ae44c5d-2fc6-40b0-9ff0~ 
|         43 |          1 | [(data-968fdf3a-2f44-41df-89b~ |
++------------+-----------+-------------------+------------------------   
-----+------------+------------+--------------------------------+
+*/
+
+-- select in iceberg, the updates was successfully read by iceberg
+SELECT * FROM iceberg_catalog.`default`.T;
+/*
++------------+------------+------------+
+|         pt |          k |          v |
++------------+------------+------------+
+|          1 |          9 |         90 |
+|          1 |         11 |        110 |
+|          2 |         20 |        201 |
+|          1 |         10 |        101 |
+|          1 |         12 |        121 |
++------------+------------+------------+
+*/
+
+```
+{{< /tab >}}
+
+{{< /tabs >}}
+
+{{< hint info >}}
+
+note1: Upgrade the implementation of deletion vector to 64-bit bitmap if 
necessary.
+
+{{< /hint >}}
+
+If your paimon table has already been in deletion-vector mode, but 32-bit 
bitmap was used for deletion vector. 
+You need to upgrade the implementation of deletion vector to 64-bit bitmap if 
you want to synchronize deletion-vector metadata to iceberg.
+You can follow the following steps to upgrade to 64-bit deletion-vector:
+1. stop all the writing jobs of your paimon table.
+2. perform a [full compaction]({{< ref 
"maintenance/dedicated-compaction#dedicated-compaction-job" >}}) to your paimon 
table.
+3. run `ALTER TABLE tableName SET ('deletion-vectors.bitmap64' = 'true')` to 
upgrade to 64-bit deletion vector.
+4. restart your writing job. If meeting the all the conditions mentioned 
above, deletion vector metadata will be synchronized to iceberg.
+
+{{< hint info >}}
+
+note2: Upgrade the format version of iceberg to 3 if necessary.
+
+{{< /hint >}}
+You can upgrade the format version of iceberg from 2 to 3 by setting 
`'metadata.iceberg.format-version' = '3'`. 
+This will recreate the iceberg metadata without using the base metadata.
+
 ## Hive Catalog
 
 When creating Paimon table, set `'metadata.iceberg.storage' = 'hive-catalog'`.
diff --git a/docs/layouts/shortcodes/generated/iceberg_configuration.html 
b/docs/layouts/shortcodes/generated/iceberg_configuration.html
index 7eef90574c..634f622d71 100644
--- a/docs/layouts/shortcodes/generated/iceberg_configuration.html
+++ b/docs/layouts/shortcodes/generated/iceberg_configuration.html
@@ -50,6 +50,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether to delete old metadata files after each table 
commit</td>
         </tr>
+        <tr>
+            <td><h5>metadata.iceberg.format-version</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Integer</td>
+            <td>The format version of iceberg table, the value can be 2 or 3. 
Note that only version 3 supports deletion vector.</td>
+        </tr>
         <tr>
             <td><h5>metadata.iceberg.glue.skip-archive</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index dc45fbe4f5..053e6206f1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -41,8 +41,12 @@ import org.apache.paimon.iceberg.metadata.IcebergRef;
 import org.apache.paimon.iceberg.metadata.IcebergSchema;
 import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
 import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
+import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.Options;
@@ -52,6 +56,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.RawFile;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
@@ -61,6 +66,7 @@ import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.ManifestReadThreadPool;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
@@ -72,6 +78,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -83,6 +90,9 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 
 /**
  * A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg 
readers can read
@@ -95,6 +105,8 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
     // see org.apache.iceberg.hadoop.Util
     private static final String VERSION_HINT_FILENAME = "version-hint.text";
 
+    private static final String PUFFIN_FORMAT = "puffin";
+
     private final FileStoreTable table;
     private final String commitUser;
 
@@ -104,6 +116,10 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
     private final FileStorePathFactory fileStorePathFactory;
     private final IcebergManifestFile manifestFile;
     private final IcebergManifestList manifestList;
+    private final int formatVersion;
+
+    private final IndexFileHandler indexFileHandler;
+    private final boolean needAddDvToIceberg;
 
     // 
-------------------------------------------------------------------------------------
     // Public interface
@@ -133,6 +149,17 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
         this.fileStorePathFactory = table.store().pathFactory();
         this.manifestFile = IcebergManifestFile.create(table, pathFactory);
         this.manifestList = IcebergManifestList.create(table, pathFactory);
+
+        this.formatVersion =
+                
table.coreOptions().toConfiguration().get(IcebergOptions.FORMAT_VERSION);
+        Preconditions.checkArgument(
+                formatVersion == IcebergMetadata.FORMAT_VERSION_V2
+                        || formatVersion == IcebergMetadata.FORMAT_VERSION_V3,
+                "Unsupported iceberg format version! Only version 2 or version 
3 is valid, but current version is ",
+                formatVersion);
+
+        this.indexFileHandler = table.store().newIndexFileHandler();
+        this.needAddDvToIceberg = needAddDvToIceberg();
     }
 
     public static Path catalogTableMetadataPath(FileStoreTable table) {
@@ -192,23 +219,26 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
     public void close() throws Exception {}
 
     @Override
-    public void call(List<ManifestEntry> committedEntries, Snapshot snapshot) {
+    public void call(
+            List<ManifestEntry> committedEntries,
+            List<IndexManifestEntry> indexFiles,
+            Snapshot snapshot) {
         createMetadata(
-                snapshot.id(),
+                snapshot,
                 (removedFiles, addedFiles) ->
-                        collectFileChanges(committedEntries, removedFiles, 
addedFiles));
+                        collectFileChanges(committedEntries, removedFiles, 
addedFiles),
+                indexFiles);
     }
 
     @Override
     public void retry(ManifestCommittable committable) {
         SnapshotManager snapshotManager = table.snapshotManager();
-        long snapshotId =
+        Snapshot snapshot =
                 snapshotManager
                         .findSnapshotsForIdentifiers(
                                 commitUser, 
Collections.singletonList(committable.identifier()))
                         .stream()
-                        .mapToLong(Snapshot::id)
-                        .max()
+                        .max(Comparator.comparingLong(Snapshot::id))
                         .orElseThrow(
                                 () ->
                                         new RuntimeException(
@@ -217,13 +247,19 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                                                         + " and identifier "
                                                         + 
committable.identifier()
                                                         + ". This is 
unexpected."));
+        long snapshotId = snapshot.id();
         createMetadata(
-                snapshotId,
+                snapshot,
                 (removedFiles, addedFiles) ->
-                        collectFileChanges(snapshotId, removedFiles, 
addedFiles));
+                        collectFileChanges(snapshotId, removedFiles, 
addedFiles),
+                indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX));
     }
 
-    private void createMetadata(long snapshotId, FileChangesCollector 
fileChangesCollector) {
+    private void createMetadata(
+            Snapshot snapshot,
+            FileChangesCollector fileChangesCollector,
+            List<IndexManifestEntry> indexFiles) {
+        long snapshotId = snapshot.id();
         try {
             if (snapshotId == Snapshot.FIRST_SNAPSHOT_ID) {
                 // If Iceberg metadata is stored separately in another 
directory, dropping the table
@@ -237,8 +273,19 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
             }
 
             Path baseMetadataPath = pathFactory.toMetadataPath(snapshotId - 1);
+
             if (table.fileIO().exists(baseMetadataPath)) {
-                createMetadataWithBase(fileChangesCollector, snapshotId, 
baseMetadataPath);
+                createMetadataWithBase(
+                        fileChangesCollector,
+                        indexFiles.stream()
+                                .filter(
+                                        index ->
+                                                index.indexFile()
+                                                        .indexType()
+                                                        
.equals(DELETION_VECTORS_INDEX))
+                                .collect(Collectors.toList()),
+                        snapshot,
+                        baseMetadataPath);
             } else {
                 createMetadataWithoutBase(snapshotId);
             }
@@ -254,16 +301,31 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
     private void createMetadataWithoutBase(long snapshotId) throws IOException 
{
         SnapshotReader snapshotReader = 
table.newSnapshotReader().withSnapshot(snapshotId);
         SchemaCache schemaCache = new SchemaCache();
-        Iterator<IcebergManifestEntry> entryIterator =
+        List<IcebergManifestEntry> dataFileEntries = new ArrayList<>();
+        List<IcebergManifestEntry> dvFileEntries = new ArrayList<>();
+
+        List<DataSplit> filteredDataSplits =
                 snapshotReader.read().dataSplits().stream()
                         .filter(DataSplit::rawConvertible)
-                        .flatMap(
-                                s ->
-                                        dataSplitToManifestEntries(s, 
snapshotId, schemaCache)
-                                                .stream())
-                        .iterator();
-        List<IcebergManifestFileMeta> manifestFileMetas =
-                manifestFile.rollingWrite(entryIterator, snapshotId);
+                        .collect(Collectors.toList());
+        for (DataSplit dataSplit : filteredDataSplits) {
+            dataSplitToManifestEntries(
+                    dataSplit, snapshotId, schemaCache, dataFileEntries, 
dvFileEntries);
+        }
+
+        List<IcebergManifestFileMeta> manifestFileMetas = new ArrayList<>();
+        if (!dataFileEntries.isEmpty()) {
+            manifestFileMetas.addAll(
+                    manifestFile.rollingWrite(dataFileEntries.iterator(), 
snapshotId));
+        }
+        if (!dvFileEntries.isEmpty()) {
+            manifestFileMetas.addAll(
+                    manifestFile.rollingWrite(
+                            dvFileEntries.iterator(),
+                            snapshotId,
+                            IcebergManifestFileMeta.Content.DELETES));
+        }
+
         String manifestListFileName = 
manifestList.writeWithoutRolling(manifestFileMetas);
 
         int schemaId = (int) table.schema().id();
@@ -289,6 +351,7 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
         String tableUuid = UUID.randomUUID().toString();
         IcebergMetadata metadata =
                 new IcebergMetadata(
+                        formatVersion,
                         tableUuid,
                         table.location().toString(),
                         snapshotId,
@@ -320,10 +383,14 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
         }
     }
 
-    private List<IcebergManifestEntry> dataSplitToManifestEntries(
-            DataSplit dataSplit, long snapshotId, SchemaCache schemaCache) {
-        List<IcebergManifestEntry> result = new ArrayList<>();
+    private void dataSplitToManifestEntries(
+            DataSplit dataSplit,
+            long snapshotId,
+            SchemaCache schemaCache,
+            List<IcebergManifestEntry> dataFileEntries,
+            List<IcebergManifestEntry> dvFileEntries) {
         List<RawFile> rawFiles = dataSplit.convertToRawFiles().get();
+
         for (int i = 0; i < dataSplit.dataFiles().size(); i++) {
             DataFileMeta paimonFileMeta = dataSplit.dataFiles().get(i);
             RawFile rawFile = rawFiles.get(i);
@@ -338,15 +405,52 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                             schemaCache.get(paimonFileMeta.schemaId()),
                             paimonFileMeta.valueStats(),
                             paimonFileMeta.valueStatsCols());
-            result.add(
+            dataFileEntries.add(
                     new IcebergManifestEntry(
                             IcebergManifestEntry.Status.ADDED,
                             snapshotId,
                             snapshotId,
                             snapshotId,
                             fileMeta));
+
+            if (needAddDvToIceberg
+                    && dataSplit.deletionFiles().isPresent()
+                    && dataSplit.deletionFiles().get().get(i) != null) {
+                DeletionFile deletionFile = 
dataSplit.deletionFiles().get().get(i);
+
+                // Iceberg will check the cardinality between deserialized dv 
and iceberg deletion
+                // file, so if deletionFile.cardinality() is null, we should 
stop synchronizing all
+                // dvs.
+                Preconditions.checkState(
+                        deletionFile.cardinality() != null,
+                        "cardinality in DeletionFile is null, stop generating 
dv for iceberg. "
+                                + "dataFile path is {}, deletionFile is {}",
+                        rawFile.path(),
+                        deletionFile);
+
+                // We can not get the file size of the complete DV index file 
from the DeletionFile,
+                // so we set 'fileSizeInBytes' to -1(default in iceberg)
+                IcebergDataFileMeta deleteFileMeta =
+                        IcebergDataFileMeta.createForDeleteFile(
+                                IcebergDataFileMeta.Content.POSITION_DELETES,
+                                deletionFile.path(),
+                                PUFFIN_FORMAT,
+                                dataSplit.partition(),
+                                deletionFile.cardinality(),
+                                -1,
+                                rawFile.path(),
+                                deletionFile.offset(),
+                                deletionFile.length());
+
+                dvFileEntries.add(
+                        new IcebergManifestEntry(
+                                IcebergManifestEntry.Status.ADDED,
+                                snapshotId,
+                                snapshotId,
+                                snapshotId,
+                                deleteFileMeta));
+            }
         }
-        return result;
     }
 
     private List<IcebergPartitionField> getPartitionFields(
@@ -370,12 +474,35 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
     // 
-------------------------------------------------------------------------------------
 
     private void createMetadataWithBase(
-            FileChangesCollector fileChangesCollector, long snapshotId, Path 
baseMetadataPath)
+            FileChangesCollector fileChangesCollector,
+            List<IndexManifestEntry> indexFiles,
+            Snapshot snapshot,
+            Path baseMetadataPath)
             throws IOException {
+        long snapshotId = snapshot.id();
         IcebergMetadata baseMetadata = 
IcebergMetadata.fromPath(table.fileIO(), baseMetadataPath);
+
+        if (!isSameFormatVersion(baseMetadata.formatVersion())) {
+            // we need to recreate iceberg metadata if format version changed
+            createMetadataWithoutBase(snapshot.id());
+            return;
+        }
+
         List<IcebergManifestFileMeta> baseManifestFileMetas =
                 
manifestList.read(baseMetadata.currentSnapshot().manifestList());
 
+        // base manifest file for data files
+        List<IcebergManifestFileMeta> baseDataManifestFileMetas =
+                baseManifestFileMetas.stream()
+                        .filter(meta -> meta.content() == 
IcebergManifestFileMeta.Content.DATA)
+                        .collect(Collectors.toList());
+
+        // base manifest file for deletion vector index files
+        List<IcebergManifestFileMeta> baseDVManifestFileMetas =
+                baseManifestFileMetas.stream()
+                        .filter(meta -> meta.content() == 
IcebergManifestFileMeta.Content.DELETES)
+                        .collect(Collectors.toList());
+
         Map<String, BinaryRow> removedFiles = new LinkedHashMap<>();
         Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles = new 
LinkedHashMap<>();
         boolean isAddOnly = fileChangesCollector.collect(removedFiles, 
addedFiles);
@@ -388,13 +515,14 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
         // because if a file's level is changed, it will first be removed and 
then added.
         // In this case, if `baseMetadata` already contains this file, we 
should not add a
         // duplicate.
-        List<IcebergManifestFileMeta> newManifestFileMetas;
+        List<IcebergManifestFileMeta> newDataManifestFileMetas;
         IcebergSnapshotSummary snapshotSummary;
         if (isAddOnly) {
             // Fast case. We don't need to remove files from `baseMetadata`. 
We only need to append
             // new metadata files.
-            newManifestFileMetas = new ArrayList<>(baseManifestFileMetas);
-            
newManifestFileMetas.addAll(createNewlyAddedManifestFileMetas(addedFiles, 
snapshotId));
+            newDataManifestFileMetas = new 
ArrayList<>(baseDataManifestFileMetas);
+            newDataManifestFileMetas.addAll(
+                    createNewlyAddedManifestFileMetas(addedFiles, snapshotId));
             snapshotSummary = IcebergSnapshotSummary.APPEND;
         } else {
             Pair<List<IcebergManifestFileMeta>, IcebergSnapshotSummary> result 
=
@@ -402,14 +530,32 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
                             removedFiles,
                             addedFiles,
                             modifiedPartitions,
-                            baseManifestFileMetas,
+                            baseDataManifestFileMetas,
                             snapshotId);
-            newManifestFileMetas = result.getLeft();
+            newDataManifestFileMetas = result.getLeft();
             snapshotSummary = result.getRight();
         }
+
+        List<IcebergManifestFileMeta> newDVManifestFileMetas = new 
ArrayList<>();
+        if (needAddDvToIceberg) {
+            if (!indexFiles.isEmpty()) {
+                // reconstruct the dv index
+                
newDVManifestFileMetas.addAll(createDvManifestFileMetas(snapshot));
+            } else {
+                // no new dv index, reuse the old one
+                newDVManifestFileMetas.addAll(baseDVManifestFileMetas);
+            }
+        }
+
+        // compact data manifest file if needed
+        newDataManifestFileMetas = 
compactMetadataIfNeeded(newDataManifestFileMetas, snapshotId);
+
         String manifestListFileName =
                 manifestList.writeWithoutRolling(
-                        compactMetadataIfNeeded(newManifestFileMetas, 
snapshotId));
+                        Stream.concat(
+                                        newDataManifestFileMetas.stream(),
+                                        newDVManifestFileMetas.stream())
+                                .collect(Collectors.toList()));
 
         // add new schema if needed
         SchemaCache schemaCache = new SchemaCache();
@@ -445,6 +591,7 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
 
         IcebergMetadata metadata =
                 new IcebergMetadata(
+                        baseMetadata.formatVersion(),
                         baseMetadata.tableUuid(),
                         baseMetadata.location(),
                         snapshotId,
@@ -532,6 +679,9 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
         if (table.primaryKeys().isEmpty()) {
             return true;
         } else {
+            if (needAddDvToIceberg) {
+                return meta.level() > 0;
+            }
             int maxLevel = table.coreOptions().numLevels() - 1;
             return meta.level() == maxLevel;
         }
@@ -849,6 +999,7 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
 
             IcebergMetadata metadata =
                     new IcebergMetadata(
+                            baseMetadata.formatVersion(),
                             baseMetadata.tableUuid(),
                             baseMetadata.location(),
                             baseMetadata.currentSnapshotId(),
@@ -906,6 +1057,7 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
 
             IcebergMetadata metadata =
                     new IcebergMetadata(
+                            baseMetadata.formatVersion(),
                             baseMetadata.tableUuid(),
                             baseMetadata.location(),
                             baseMetadata.currentSnapshotId(),
@@ -935,10 +1087,99 @@ public class IcebergCommitCallback implements 
CommitCallback, TagCallback {
         }
     }
 
+    // 
-------------------------------------------------------------------------------------
+    // Deletion vectors
+    // 
-------------------------------------------------------------------------------------
+
+    private boolean needAddDvToIceberg() {
+        CoreOptions options = table.coreOptions();
+        // there may be dv indexes using bitmap32 in index files even if 
'deletion-vectors.bitmap64'
+        // is true, but analyzing all deletion vectors is very costly, so we 
do not check exactly
+        // currently.
+        return options.deletionVectorsEnabled()
+                && options.deletionVectorBitmap64()
+                && formatVersion == IcebergMetadata.FORMAT_VERSION_V3;
+    }
+
+    private List<IcebergManifestFileMeta> createDvManifestFileMetas(Snapshot 
snapshot) {
+        List<IcebergManifestEntry> icebergDvEntries = new ArrayList<>();
+
+        long snapshotId = snapshot.id();
+        List<IndexManifestEntry> newIndexes =
+                indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX);
+        if (newIndexes.isEmpty()) {
+            return Collections.emptyList();
+        }
+        for (IndexManifestEntry entry : newIndexes) {
+            IndexFileMeta indexFileMeta = entry.indexFile();
+            LinkedHashMap<String, DeletionVectorMeta> dvMetas = 
indexFileMeta.deletionVectorMetas();
+            Path bucketPath = 
fileStorePathFactory.bucketPath(entry.partition(), entry.bucket());
+            if (dvMetas != null) {
+                for (DeletionVectorMeta dvMeta : dvMetas.values()) {
+
+                    // Iceberg will check the cardinality between deserialized 
dv and iceberg
+                    // deletion file, so if deletionFile.cardinality() is 
null, we should stop
+                    // synchronizing all dvs.
+                    Preconditions.checkState(
+                            dvMeta.cardinality() != null,
+                            "cardinality in DeletionVector is null, stop 
generate dv for iceberg. "
+                                    + "dataFile path is {}, indexFile path is 
{}",
+                            new Path(bucketPath, dvMeta.dataFileName()),
+                            
indexFileHandler.filePath(indexFileMeta).toString());
+
+                    IcebergDataFileMeta deleteFileMeta =
+                            IcebergDataFileMeta.createForDeleteFile(
+                                    
IcebergDataFileMeta.Content.POSITION_DELETES,
+                                    
indexFileHandler.filePath(indexFileMeta).toString(),
+                                    PUFFIN_FORMAT,
+                                    entry.partition(),
+                                    dvMeta.cardinality(),
+                                    indexFileMeta.fileSize(),
+                                    new Path(bucketPath, 
dvMeta.dataFileName()).toString(),
+                                    (long) dvMeta.offset(),
+                                    (long) dvMeta.length());
+
+                    icebergDvEntries.add(
+                            new IcebergManifestEntry(
+                                    IcebergManifestEntry.Status.ADDED,
+                                    snapshotId,
+                                    snapshotId,
+                                    snapshotId,
+                                    deleteFileMeta));
+                }
+            }
+        }
+
+        if (icebergDvEntries.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        return manifestFile.rollingWrite(
+                icebergDvEntries.iterator(), snapshotId, 
IcebergManifestFileMeta.Content.DELETES);
+    }
+
     // 
-------------------------------------------------------------------------------------
     // Utils
     // 
-------------------------------------------------------------------------------------
 
+    private boolean isSameFormatVersion(int baseFormatVersion) {
+        if (baseFormatVersion != formatVersion) {
+            Preconditions.checkArgument(
+                    formatVersion > baseFormatVersion,
+                    "format version in base metadata is {}, and it's bigger 
than the current format version {}, "
+                            + "this is not allowed!");
+
+            LOG.info(
+                    "format version in base metadata is {}, and it's different 
from the current format version {}. "
+                            + "New metadata will be recreated using format 
version {}.",
+                    baseFormatVersion,
+                    formatVersion,
+                    formatVersion);
+            return false;
+        }
+        return true;
+    }
+
     private class SchemaCache {
 
         SchemaManager schemaManager = new SchemaManager(table.fileIO(), 
table.location());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
index e5166b9394..a817364973 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java
@@ -44,6 +44,14 @@ public class IcebergOptions {
                     .withDescription(
                             "To store Iceberg metadata in a separate directory 
or under table location");
 
+    public static final ConfigOption<Integer> FORMAT_VERSION =
+            ConfigOptions.key("metadata.iceberg.format-version")
+                    .intType()
+                    .defaultValue(2)
+                    .withDescription(
+                            "The format version of iceberg table, the value 
can be 2 or 3. "
+                                    + "Note that only version 3 supports 
deletion vector.");
+
     public static final ConfigOption<Integer> COMPACT_MIN_FILE_NUM =
             ConfigOptions.key("metadata.iceberg.compaction.min.file-num")
                     .intType()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
index 590359aba3..9167497257 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java
@@ -83,6 +83,10 @@ public class IcebergDataFileMeta {
     private final InternalMap lowerBounds;
     private final InternalMap upperBounds;
 
+    private final String referencedDataFile;
+    private final Long contentOffset;
+    private final Long contentSizeInBytes;
+
     // only used for iceberg migrate
     private long schemaId = 0;
 
@@ -96,6 +100,34 @@ public class IcebergDataFileMeta {
             InternalMap nullValueCounts,
             InternalMap lowerBounds,
             InternalMap upperBounds) {
+        this(
+                content,
+                filePath,
+                fileFormat,
+                partition,
+                recordCount,
+                fileSizeInBytes,
+                nullValueCounts,
+                lowerBounds,
+                upperBounds,
+                null,
+                null,
+                null);
+    }
+
+    IcebergDataFileMeta(
+            Content content,
+            String filePath,
+            String fileFormat,
+            BinaryRow partition,
+            long recordCount,
+            long fileSizeInBytes,
+            InternalMap nullValueCounts,
+            InternalMap lowerBounds,
+            InternalMap upperBounds,
+            String referencedDataFile,
+            Long contentOffset,
+            Long contentSizeInBytes) {
         this.content = content;
         this.filePath = filePath;
         this.fileFormat = fileFormat;
@@ -105,6 +137,10 @@ public class IcebergDataFileMeta {
         this.nullValueCounts = nullValueCounts;
         this.lowerBounds = lowerBounds;
         this.upperBounds = upperBounds;
+
+        this.referencedDataFile = referencedDataFile;
+        this.contentOffset = contentOffset;
+        this.contentSizeInBytes = contentSizeInBytes;
     }
 
     public static IcebergDataFileMeta create(
@@ -168,6 +204,33 @@ public class IcebergDataFileMeta {
                 new GenericMap(upperBounds));
     }
 
+    public static IcebergDataFileMeta createForDeleteFile(
+            Content content,
+            String filePath,
+            String fileFormat,
+            BinaryRow partition,
+            long recordCount,
+            long fileSizeInBytes,
+            String referencedDataFile,
+            Long contentOffset,
+            Long contentSizeInBytes) {
+
+        // reference org.apache.iceberg.deletes.BaseDVFileWriter#createDV
+        return new IcebergDataFileMeta(
+                content,
+                filePath,
+                fileFormat,
+                partition,
+                recordCount,
+                fileSizeInBytes,
+                null,
+                null,
+                null,
+                referencedDataFile,
+                contentOffset,
+                contentSizeInBytes);
+    }
+
     public Content content() {
         return content;
     }
@@ -204,6 +267,18 @@ public class IcebergDataFileMeta {
         return upperBounds;
     }
 
+    public String referencedDataFile() {
+        return referencedDataFile;
+    }
+
+    public Long contentOffset() {
+        return contentOffset;
+    }
+
+    public Long contentSizeInBytes() {
+        return contentSizeInBytes;
+    }
+
     public long schemaId() {
         return schemaId;
     }
@@ -236,6 +311,9 @@ public class IcebergDataFileMeta {
                         128,
                         "upper_bounds",
                         DataTypes.MAP(DataTypes.INT().notNull(), 
DataTypes.BYTES().notNull())));
+        fields.add(new DataField(143, "referenced_data_file", 
DataTypes.STRING()));
+        fields.add(new DataField(144, "content_offset", DataTypes.BIGINT()));
+        fields.add(new DataField(145, "content_size_in_bytes", 
DataTypes.BIGINT()));
         return new RowType(false, fields);
     }
 
@@ -256,7 +334,10 @@ public class IcebergDataFileMeta {
                 && Objects.equals(partition, that.partition)
                 && Objects.equals(nullValueCounts, that.nullValueCounts)
                 && Objects.equals(lowerBounds, that.lowerBounds)
-                && Objects.equals(upperBounds, that.upperBounds);
+                && Objects.equals(upperBounds, that.upperBounds)
+                && Objects.equals(referencedDataFile, that.referencedDataFile)
+                && Objects.equals(contentOffset, that.contentOffset)
+                && Objects.equals(contentSizeInBytes, that.contentSizeInBytes);
     }
 
     @Override
@@ -270,6 +351,9 @@ public class IcebergDataFileMeta {
                 fileSizeInBytes,
                 nullValueCounts,
                 lowerBounds,
-                upperBounds);
+                upperBounds,
+                referencedDataFile,
+                contentOffset,
+                contentSizeInBytes);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
index 9b97425936..020b5e1b44 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMetaSerializer.java
@@ -57,7 +57,10 @@ public class IcebergDataFileMetaSerializer extends 
ObjectSerializer<IcebergDataF
                 file.fileSizeInBytes(),
                 file.nullValueCounts(),
                 file.lowerBounds(),
-                file.upperBounds());
+                file.upperBounds(),
+                BinaryString.fromString(file.referencedDataFile()),
+                file.contentOffset(),
+                file.contentSizeInBytes());
     }
 
     @Override
@@ -71,6 +74,9 @@ public class IcebergDataFileMetaSerializer extends 
ObjectSerializer<IcebergDataF
                 row.getLong(5),
                 nullValueCountsSerializer.copy(row.getMap(6)),
                 lowerBoundsSerializer.copy(row.getMap(7)),
-                upperBoundsSerializer.copy(row.getMap(8)));
+                upperBoundsSerializer.copy(row.getMap(8)),
+                row.isNullAt(9) ? null : row.getString(9).toString(),
+                row.isNullAt(10) ? null : row.getLong(10),
+                row.isNullAt(11) ? null : row.getLong(11));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index 6e69602eb8..43ce1016a9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -161,9 +161,15 @@ public class IcebergManifestFile extends 
ObjectsFile<IcebergManifestEntry> {
 
     public List<IcebergManifestFileMeta> rollingWrite(
             Iterator<IcebergManifestEntry> entries, long sequenceNumber) {
+
+        return rollingWrite(entries, sequenceNumber, Content.DATA);
+    }
+
+    public List<IcebergManifestFileMeta> rollingWrite(
+            Iterator<IcebergManifestEntry> entries, long sequenceNumber, 
Content content) {
         RollingFileWriter<IcebergManifestEntry, IcebergManifestFileMeta> 
writer =
                 new RollingFileWriter<>(
-                        () -> createWriter(sequenceNumber), 
targetFileSize.getBytes());
+                        () -> createWriter(sequenceNumber, content), 
targetFileSize.getBytes());
         try {
             writer.write(entries);
             writer.close();
@@ -174,9 +180,9 @@ public class IcebergManifestFile extends 
ObjectsFile<IcebergManifestEntry> {
     }
 
     public SingleFileWriter<IcebergManifestEntry, IcebergManifestFileMeta> 
createWriter(
-            long sequenceNumber) {
+            long sequenceNumber, Content content) {
         return new IcebergManifestEntryWriter(
-                writerFactory, pathFactory.newPath(), compression, 
sequenceNumber);
+                writerFactory, pathFactory.newPath(), compression, 
sequenceNumber, content);
     }
 
     private class IcebergManifestEntryWriter
@@ -193,11 +199,14 @@ public class IcebergManifestFile extends 
ObjectsFile<IcebergManifestEntry> {
         private long deletedRowsCount = 0;
         private Long minSequenceNumber = null;
 
+        private final Content content;
+
         IcebergManifestEntryWriter(
                 FormatWriterFactory factory,
                 Path path,
                 String fileCompression,
-                long sequenceNumber) {
+                long sequenceNumber,
+                Content content) {
             super(
                     IcebergManifestFile.this.fileIO,
                     factory,
@@ -207,6 +216,7 @@ public class IcebergManifestFile extends 
ObjectsFile<IcebergManifestEntry> {
                     false);
             this.partitionStatsCollector = new 
SimpleStatsCollector(partitionType);
             this.sequenceNumber = sequenceNumber;
+            this.content = content;
         }
 
         @Override
@@ -253,7 +263,7 @@ public class IcebergManifestFile extends 
ObjectsFile<IcebergManifestEntry> {
                     path.toString(),
                     outputBytes,
                     IcebergPartitionSpec.SPEC_ID,
-                    Content.DATA,
+                    content,
                     sequenceNumber,
                     minSequenceNumber != null ? minSequenceNumber : 
UNASSIGNED_SEQ,
                     sequenceNumber,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
index 00c4c3de61..c755e32c7d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
@@ -44,7 +44,8 @@ import java.util.Objects;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class IcebergMetadata {
 
-    public static final int CURRENT_FORMAT_VERSION = 2;
+    public static final int FORMAT_VERSION_V2 = 2;
+    public static final int FORMAT_VERSION_V3 = 3;
 
     private static final String FIELD_FORMAT_VERSION = "format-version";
     private static final String FIELD_TABLE_UUID = "table-uuid";
@@ -118,6 +119,7 @@ public class IcebergMetadata {
     private final Map<String, IcebergRef> refs;
 
     public IcebergMetadata(
+            int formatVersion,
             String tableUuid,
             String location,
             long lastSequenceNumber,
@@ -130,7 +132,7 @@ public class IcebergMetadata {
             long currentSnapshotId,
             @Nullable Map<String, IcebergRef> refs) {
         this(
-                CURRENT_FORMAT_VERSION,
+                formatVersion,
                 tableUuid,
                 location,
                 lastSequenceNumber,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index f04b40b2ff..efd081c0bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -117,7 +117,10 @@ public class IndexFileHandler {
     }
 
     public List<IndexManifestEntry> scan(String indexType) {
-        Snapshot snapshot = snapshotManager.latestSnapshot();
+        return scan(snapshotManager.latestSnapshot(), indexType);
+    }
+
+    public List<IndexManifestEntry> scan(Snapshot snapshot, String indexType) {
         if (snapshot == null) {
             return Collections.emptyList();
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
index dbd42d5466..cb6edc9d6d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/AddPartitionCommitCallback.java
@@ -21,6 +21,7 @@ package org.apache.paimon.metastore;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.table.PartitionHandler;
@@ -59,7 +60,10 @@ public class AddPartitionCommitCallback implements 
CommitCallback {
     }
 
     @Override
-    public void call(List<ManifestEntry> committedEntries, Snapshot snapshot) {
+    public void call(
+            List<ManifestEntry> committedEntries,
+            List<IndexManifestEntry> indexFiles,
+            Snapshot snapshot) {
         Set<BinaryRow> partitions =
                 committedEntries.stream()
                         .filter(e -> FileKind.ADD.equals(e.kind()))
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
index 7e2e00dce3..2d9e44c39a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/TagPreviewCommitCallback.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.metastore;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.table.sink.CommitCallback;
@@ -39,7 +40,10 @@ public class TagPreviewCommitCallback implements 
CommitCallback {
     }
 
     @Override
-    public void call(List<ManifestEntry> committedEntries, Snapshot snapshot) {
+    public void call(
+            List<ManifestEntry> committedEntries,
+            List<IndexManifestEntry> indexFiles,
+            Snapshot snapshot) {
         long currentMillis = System.currentTimeMillis();
         Optional<String> tagOptional = tagPreview.extractTag(currentMillis, 
snapshot.watermark());
         tagOptional.ifPresent(tagCallback::notifyCreation);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 7046d970a4..f85bf6b2c7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -1053,7 +1053,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     commitUser,
                     identifier,
                     commitKind.name());
-            commitCallbacks.forEach(callback -> callback.call(deltaFiles, 
newSnapshot));
+            commitCallbacks.forEach(callback -> callback.call(deltaFiles, 
indexFiles, newSnapshot));
             return new SuccessResult();
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
index c4b606441d..40e615198e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitCallback.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.sink;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 
@@ -37,7 +38,10 @@ import java.util.List;
  */
 public interface CommitCallback extends AutoCloseable {
 
-    void call(List<ManifestEntry> committedEntries, Snapshot snapshot);
+    void call(
+            List<ManifestEntry> committedEntries,
+            List<IndexManifestEntry> indexFiles,
+            Snapshot snapshot);
 
     void retry(ManifestCommittable committable);
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
index 7b0105fa85..668894f23d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java
@@ -25,10 +25,12 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.DataFormatTestUtil;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.fs.FileIO;
@@ -41,6 +43,7 @@ import org.apache.paimon.iceberg.metadata.IcebergMetadata;
 import org.apache.paimon.iceberg.metadata.IcebergRef;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -48,6 +51,8 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeRoot;
@@ -67,6 +72,8 @@ import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructLikeSet;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -1159,4 +1166,47 @@ public class IcebergCompatibilityTest {
         result.close();
         return actual;
     }
+
+    private void validateIcebergResult(List<Object[]> expected) throws 
Exception {
+        HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), 
tempDir.toString());
+        TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t");
+        org.apache.iceberg.Table icebergTable = 
icebergCatalog.loadTable(icebergIdentifier);
+
+        Types.StructType type = icebergTable.schema().asStruct();
+
+        StructLikeSet actualSet = StructLikeSet.create(type);
+        StructLikeSet expectSet = StructLikeSet.create(type);
+
+        try (CloseableIterable<Record> reader = 
IcebergGenerics.read(icebergTable).build()) {
+            reader.forEach(actualSet::add);
+        }
+        expectSet.addAll(
+                expected.stream().map(r -> icebergRecord(type, 
r)).collect(Collectors.toList()));
+
+        assertThat(actualSet).isEqualTo(expectSet);
+    }
+
+    private org.apache.iceberg.data.GenericRecord icebergRecord(
+            Types.StructType type, Object[] row) {
+        org.apache.iceberg.data.GenericRecord record =
+                org.apache.iceberg.data.GenericRecord.create(type);
+        for (int i = 0; i < row.length; i++) {
+            record.set(i, row[i]);
+        }
+        return record;
+    }
+
+    private List<String> getPaimonResult(FileStoreTable paimonTable) throws 
Exception {
+        List<Split> splits = 
paimonTable.newReadBuilder().newScan().plan().splits();
+        TableRead read = paimonTable.newReadBuilder().newRead();
+        try (RecordReader<InternalRow> recordReader = 
read.createReader(splits)) {
+            List<String> result = new ArrayList<>();
+            recordReader.forEachRemaining(
+                    row ->
+                            result.add(
+                                    DataFormatTestUtil.toStringWithRowKind(
+                                            row, paimonTable.rowType())));
+            return result;
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index 227a3b58ee..6dc77799ae 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.Options;
@@ -179,7 +180,10 @@ public class TableCommitTest {
         }
 
         @Override
-        public void call(List<ManifestEntry> entries, Snapshot snapshot) {
+        public void call(
+                List<ManifestEntry> entries,
+                List<IndexManifestEntry> indexFiles,
+                Snapshot snapshot) {
             commitCallbackResult.get(testId).add(snapshot.commitIdentifier());
         }
 
diff --git a/paimon-iceberg/pom.xml b/paimon-iceberg/pom.xml
new file mode 100644
index 0000000000..61f0f45ff1
--- /dev/null
+++ b/paimon-iceberg/pom.xml
@@ -0,0 +1,187 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>paimon-parent</artifactId>
+        <groupId>org.apache.paimon</groupId>
+        <version>1.2-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>paimon-iceberg</artifactId>
+    <name>Paimon : Iceberg</name>
+
+    <properties>
+        <target.java.version>11</target.java.version>
+        <iceberg.version>1.8.1</iceberg.version>
+        <flink.version>${paimon-flink-common.flink.version}</flink.version>
+        <iceberg.flink.version>1.19</iceberg.flink.version>
+    </properties>
+
+    <dependencies>
+
+        <!-- paimon test dependency -->
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-bundle</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-flink-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- hadoop dependency -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>jdk.tools</groupId>
+                    <artifactId>jdk.tools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.protobuf</groupId>
+                    <artifactId>protobuf-java</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-io</groupId>
+                    <artifactId>commons-io</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-compress</artifactId>
+            <version>1.21</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- iceberg dependency -->
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>parquet-hadoop</artifactId>
+                    <groupId>org.apache.parquet</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-flink-${iceberg.flink.version}</artifactId>
+            <version>${iceberg.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- flink dependency -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/paimon-iceberg/src/test/java/org/apache/paimon/core/IcebergDVCompatibilityTest.java
 
b/paimon-iceberg/src/test/java/org/apache/paimon/core/IcebergDVCompatibilityTest.java
new file mode 100644
index 0000000000..e24109371e
--- /dev/null
+++ 
b/paimon-iceberg/src/test/java/org/apache/paimon/core/IcebergDVCompatibilityTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.core;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
+import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructLikeSet;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for Iceberg DV compatibility. */
+public class IcebergDVCompatibilityTest {
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testSyncDVWithoutBase() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});
+        Map<String, String> customOptions = new HashMap<>();
+        customOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
+        // must set deletion-vectors.bitmap64 = true
+        customOptions.put(CoreOptions.DELETION_VECTOR_BITMAP64.key(), "true");
+        // must set metadata.iceberg.format-version = 3
+        customOptions.put(IcebergOptions.FORMAT_VERSION.key(), "3");
+
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        1,
+                        customOptions);
+
+        // disable iceberg metadata storage
+        Map<String, String> options = new HashMap<>();
+        options.put(
+                IcebergOptions.METADATA_ICEBERG_STORAGE.key(),
+                IcebergOptions.StorageType.DISABLED.toString());
+        table = table.copy(options);
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write =
+                table.newWrite(commitUser)
+                        .withIOManager(new IOManagerImpl(tempDir.toString() + 
"/tmp"));
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 10));
+        write.write(GenericRow.of(2, 20));
+        commit.commit(1, write.prepareCommit(false, 1));
+
+        write.write(GenericRow.ofKind(RowKind.DELETE, 2, 20));
+        commit.commit(2, write.prepareCommit(false, 2));
+
+        // produce dv
+        write.compact(BinaryRow.EMPTY_ROW, 0, false);
+        commit.commit(3, write.prepareCommit(true, 3));
+
+        assertThat(
+                        table.store()
+                                .newIndexFileHandler()
+                                
.scan(DeletionVectorsIndexFile.DELETION_VECTORS_INDEX)
+                                .size())
+                .isGreaterThan(0);
+        // enable iceberg metadata storage and commit changes to iceberg
+        options.put(
+                IcebergOptions.METADATA_ICEBERG_STORAGE.key(),
+                IcebergOptions.StorageType.TABLE_LOCATION.toString());
+        table = table.copy(options);
+        write =
+                table.newWrite(commitUser)
+                        .withIOManager(new IOManagerImpl(tempDir.toString() + 
"/tmp"));
+        commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
+        commit.commit(4, write.prepareCommit(false, 4));
+
+        validateIcebergResult(Collections.singletonList(new Object[] {1, 10}));
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testSyncDVWithBase() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});
+        Map<String, String> customOptions = new HashMap<>();
+        customOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
+        // must set deletion-vectors.bitmap64 = true
+        customOptions.put(CoreOptions.DELETION_VECTOR_BITMAP64.key(), "true");
+        // must set metadata.iceberg.format-version = 3
+        customOptions.put(IcebergOptions.FORMAT_VERSION.key(), "3");
+
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        1,
+                        customOptions);
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write =
+                table.newWrite(commitUser)
+                        .withIOManager(new IOManagerImpl(tempDir.toString() + 
"/tmp"));
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 1));
+        write.write(GenericRow.of(2, 2));
+        write.write(GenericRow.of(3, 3));
+        commit.commit(1, write.prepareCommit(false, 1));
+        validateIcebergResult(
+                Arrays.asList(new Object[] {1, 1}, new Object[] {2, 2}, new 
Object[] {3, 3}));
+
+        write.write(GenericRow.of(1, 11));
+        write.write(GenericRow.of(4, 4));
+        // compact to generate dv index
+        write.compact(BinaryRow.EMPTY_ROW, 0, false);
+        commit.commit(2, write.prepareCommit(true, 2));
+        validateIcebergResult(
+                Arrays.asList(
+                        new Object[] {1, 11},
+                        new Object[] {2, 2},
+                        new Object[] {3, 3},
+                        new Object[] {4, 4}));
+
+        // level-0 file will not be added to iceberg
+        write.write(GenericRow.of(2, 22));
+        write.write(GenericRow.of(5, 5));
+        commit.commit(3, write.prepareCommit(false, 3));
+        validateIcebergResult(
+                Arrays.asList(
+                        new Object[] {1, 11},
+                        new Object[] {2, 2},
+                        new Object[] {3, 3},
+                        new Object[] {4, 4}));
+
+        // compact to generate dv index
+        write.compact(BinaryRow.EMPTY_ROW, 0, false);
+        commit.commit(4, write.prepareCommit(true, 4));
+        validateIcebergResult(
+                Arrays.asList(
+                        new Object[] {1, 11},
+                        new Object[] {2, 22},
+                        new Object[] {3, 3},
+                        new Object[] {4, 4},
+                        new Object[] {5, 5}));
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testUpgradeFormatVersion() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"k", "v"});
+        Map<String, String> customOptions = new HashMap<>();
+        customOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
+        // must set deletion-vectors.bitmap64 = true
+        customOptions.put(CoreOptions.DELETION_VECTOR_BITMAP64.key(), "true");
+
+        FileStoreTable table =
+                createPaimonTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        1,
+                        customOptions);
+
+        String commitUser = UUID.randomUUID().toString();
+        TableWriteImpl<?> write =
+                table.newWrite(commitUser)
+                        .withIOManager(new IOManagerImpl(tempDir.toString() + 
"/tmp"));
+        TableCommitImpl commit = table.newCommit(commitUser);
+
+        write.write(GenericRow.of(1, 1));
+        write.write(GenericRow.of(2, 2));
+        commit.commit(1, write.prepareCommit(false, 1));
+        validateIcebergResult(Arrays.asList(new Object[] {1, 1}, new Object[] 
{2, 2}));
+
+        // upgrade format version to 3
+        Map<String, String> options = new HashMap<>();
+        options.put(IcebergOptions.FORMAT_VERSION.key(), "3");
+        table = table.copy(options);
+        write =
+                table.newWrite(commitUser)
+                        .withIOManager(new IOManagerImpl(tempDir.toString() + 
"/tmp"));
+        commit = table.newCommit(commitUser);
+        write.write(GenericRow.of(1, 11));
+        write.write(GenericRow.of(3, 3));
+        // compact to generate dv index
+        write.compact(BinaryRow.EMPTY_ROW, 0, false);
+        commit.commit(2, write.prepareCommit(true, 2));
+        validateIcebergResult(
+                Arrays.asList(new Object[] {1, 11}, new Object[] {2, 2}, new 
Object[] {3, 3}));
+
+        write.close();
+        commit.close();
+    }
+
+    private FileStoreTable createPaimonTable(
+            RowType rowType,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            int numBuckets,
+            Map<String, String> customOptions)
+            throws Exception {
+        LocalFileIO fileIO = LocalFileIO.create();
+        Path path = new Path(tempDir.toString());
+
+        Options options = new Options(customOptions);
+        options.set(CoreOptions.BUCKET, numBuckets);
+        options.set(
+                IcebergOptions.METADATA_ICEBERG_STORAGE, 
IcebergOptions.StorageType.TABLE_LOCATION);
+        options.set(CoreOptions.FILE_FORMAT, "avro");
+        options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32));
+        options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 4);
+        options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 8);
+        options.set(IcebergOptions.METADATA_DELETE_AFTER_COMMIT, true);
+        options.set(IcebergOptions.METADATA_PREVIOUS_VERSIONS_MAX, 1);
+        options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE, 
MemorySize.ofKibiBytes(8));
+        Schema schema =
+                new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
options.toMap(), "");
+
+        try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, 
path)) {
+            paimonCatalog.createDatabase("mydb", false);
+            Identifier paimonIdentifier = Identifier.create("mydb", "t");
+            paimonCatalog.createTable(paimonIdentifier, schema, false);
+            return (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
+        }
+    }
+
+    private void validateIcebergResult(List<Object[]> expected) throws 
Exception {
+        HadoopCatalog icebergCatalog = new HadoopCatalog(new Configuration(), 
tempDir.toString());
+        TableIdentifier icebergIdentifier = TableIdentifier.of("mydb.db", "t");
+        org.apache.iceberg.Table icebergTable = 
icebergCatalog.loadTable(icebergIdentifier);
+
+        Types.StructType type = icebergTable.schema().asStruct();
+
+        StructLikeSet actualSet = StructLikeSet.create(type);
+        StructLikeSet expectSet = StructLikeSet.create(type);
+
+        try (CloseableIterable<Record> reader = 
IcebergGenerics.read(icebergTable).build()) {
+            reader.forEach(actualSet::add);
+        }
+        expectSet.addAll(
+                expected.stream().map(r -> icebergRecord(type, 
r)).collect(Collectors.toList()));
+
+        assertThat(actualSet).isEqualTo(expectSet);
+    }
+
+    private org.apache.iceberg.data.GenericRecord icebergRecord(
+            Types.StructType type, Object[] row) {
+        org.apache.iceberg.data.GenericRecord record =
+                org.apache.iceberg.data.GenericRecord.create(type);
+        for (int i = 0; i < row.length; i++) {
+            record.set(i, row[i]);
+        }
+        return record;
+    }
+}
diff --git 
a/paimon-iceberg/src/test/java/org/apache/paimon/flink/FlinkIcebergITCase.java 
b/paimon-iceberg/src/test/java/org/apache/paimon/flink/FlinkIcebergITCase.java
new file mode 100644
index 0000000000..a71ee78d1e
--- /dev/null
+++ 
b/paimon-iceberg/src/test/java/org/apache/paimon/flink/FlinkIcebergITCase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.util.AbstractTestBase;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for Paimon Iceberg compatibility. */
+public class FlinkIcebergITCase extends AbstractTestBase {
+
+    @ParameterizedTest
+    @ValueSource(strings = {"avro"})
+    public void testDeletionVector(String format) throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv = 
tableEnvironmentBuilder().batchMode().parallelism(2).build();
+        tEnv.executeSql(
+                "CREATE CATALOG paimon WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql(
+                "CREATE TABLE paimon.`default`.T (\n"
+                        + "  pt INT,\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (pt, k) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'metadata.iceberg.storage' = 'hadoop-catalog',\n"
+                        + "  'metadata.iceberg.format-version' = '3',\n"
+                        + "  'deletion-vectors.enabled' = 'true',\n"
+                        + "  'deletion-vectors.bitmap64' = 'true',\n"
+                        + "  'file.format' = '"
+                        + format
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql(
+                        "INSERT INTO paimon.`default`.T VALUES "
+                                + "(1, 9, 90), "
+                                + "(1, 10, 100), "
+                                + "(1, 11, 110), "
+                                + "(2, 20, 200)")
+                .await();
+
+        tEnv.executeSql(
+                "CREATE CATALOG iceberg WITH (\n"
+                        + "  'type' = 'iceberg',\n"
+                        + "  'catalog-type' = 'hadoop',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "/iceberg',\n"
+                        + "  'cache-enabled' = 'false'\n"
+                        + ")");
+        assertThat(
+                        collect(
+                                tEnv.executeSql(
+                                        "SELECT v, k, pt FROM 
iceberg.`default`.T ORDER BY pt, k")))
+                .containsExactly(
+                        Row.of(90, 9, 1),
+                        Row.of(100, 10, 1),
+                        Row.of(110, 11, 1),
+                        Row.of(200, 20, 2));
+
+        tEnv.executeSql(
+                        "INSERT INTO paimon.`default`.T VALUES "
+                                + "(1, 10, 101), "
+                                + "(2, 20, 201), "
+                                + "(1, 12, 121)")
+                .await();
+
+        // make sure that there are dv indexes generated
+        CloseableIterator<Row> iter =
+                tEnv.executeSql(
+                                "SELECT * FROM 
paimon.`default`.`T$table_indexes` WHERE index_type='DELETION_VECTORS'")
+                        .collect();
+        assertThat(ImmutableList.copyOf(iter).size()).isGreaterThan(0);
+        assertThat(
+                        collect(
+                                tEnv.executeSql(
+                                        "SELECT v, k, pt FROM 
iceberg.`default`.T ORDER BY pt, k")))
+                .containsExactly(
+                        Row.of(90, 9, 1),
+                        Row.of(101, 10, 1),
+                        Row.of(110, 11, 1),
+                        Row.of(121, 12, 1),
+                        Row.of(201, 20, 2));
+    }
+
+    private List<Row> collect(TableResult result) throws Exception {
+        List<Row> rows = new ArrayList<>();
+        try (CloseableIterator<Row> it = result.collect()) {
+            while (it.hasNext()) {
+                rows.add(it.next());
+            }
+        }
+        return rows;
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
index c93e10ef90..d44164da3c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonCommitTest.scala
@@ -19,12 +19,11 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.{CoreOptions, Snapshot}
-import org.apache.paimon.manifest.{ManifestCommittable, ManifestEntry}
+import org.apache.paimon.manifest.{IndexManifestEntry, ManifestCommittable, 
ManifestEntry}
 import org.apache.paimon.table.sink.CommitCallback
 
 import org.junit.jupiter.api.Assertions
 
-import java.lang
 import java.util.List
 
 class PaimonCommitTest extends PaimonSparkTestBase {
@@ -64,7 +63,10 @@ object PaimonCommitTest {
 
 case class CustomCommitCallback(testId: String) extends CommitCallback {
 
-  override def call(committedEntries: List[ManifestEntry], snapshot: 
Snapshot): Unit = {
+  override def call(
+      committedEntries: List[ManifestEntry],
+      indexFiles: List[IndexManifestEntry],
+      snapshot: Snapshot): Unit = {
     PaimonCommitTest.id = testId
   }
 
diff --git a/pom.xml b/pom.xml
index 28bb674e11..da22b98e8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,6 +71,7 @@ under the License.
         <module>tools/ci/paimon-ci-tools</module>
         <module>paimon-hudi</module>
         <module>paimon-api</module>
+        <module>paimon-iceberg</module>
     </modules>
 
     <properties>


Reply via email to