This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 429827ea8631b25a224910760b99575b5dd0f0ba
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Sep 19 14:36:07 2025 +0800

    [core] Separate  indexIncrement into dataIncrement and compactIncrement 
(#6285)
---
 .../apache/paimon/append/AppendCompactTask.java    |  22 +--
 .../org/apache/paimon/io/CompactIncrement.java     |  42 ++++-
 .../java/org/apache/paimon/io/DataIncrement.java   |  48 +++++-
 .../java/org/apache/paimon/io/IndexIncrement.java  |  88 -----------
 .../paimon/operation/AbstractFileStoreWrite.java   |  20 ++-
 .../paimon/operation/FileStoreCommitImpl.java      |  93 +++++------
 .../paimon/table/sink/CommitMessageImpl.java       |  40 +----
 .../sink/CommitMessageLegacyV2Serializer.java      |  21 ++-
 .../paimon/table/sink/CommitMessageSerializer.java |  68 ++++++--
 .../apache/paimon/table/sink/TableCommitImpl.java  |  10 +-
 .../org/apache/paimon/TestAppendFileStore.java     |  21 ++-
 .../test/java/org/apache/paimon/TestFileStore.java |   9 +-
 .../paimon/append/AppendCompactTaskTest.java       |   8 +-
 .../deletionvectors/BucketedDvMaintainerTest.java  |  37 +++--
 .../append/AppendDeletionFileMaintainerTest.java   |   8 +-
 .../index/DynamicBucketIndexMaintainerTest.java    |   2 +-
 .../paimon/index/HashBucketAssignerTest.java       |  10 +-
 ...festCommittableSerializerCompatibilityTest.java | 175 ++++++++++++++++++---
 .../paimon/operation/FileStoreCommitTest.java      |   4 +-
 .../paimon/table/DynamicBucketTableTest.java       |   2 +-
 .../table/sink/CommitMessageSerializerTest.java    |  26 +--
 .../apache/paimon/utils/CompatibilityUtils.java    |  37 +++++
 .../compatibility/manifest-committable-v10         | Bin 0 -> 3978 bytes
 .../AppendPreCommitCompactCoordinatorOperator.java |   7 +-
 .../ChangelogCompactCoordinateOperator.java        |   9 +-
 .../changelog/ChangelogCompactSortOperator.java    |  14 +-
 .../PostponeBucketCommittableRewriter.java         |  13 +-
 .../sink/listener/PartitionMarkDoneListener.java   |   8 +-
 ...endPreCommitCompactCoordinatorOperatorTest.java |   3 +-
 .../paimon/flink/sink/WriterOperatorTest.java      |   4 +-
 .../flink/sink/listener/ListenerTestUtils.java     |   7 +-
 .../spark/commands/PaimonRowLevelCommand.scala     |   6 +-
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  12 +-
 33 files changed, 541 insertions(+), 333 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java
index b3b9a0cb17..42c4548842 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java
@@ -25,7 +25,6 @@ import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.operation.BaseAppendFileStoreWrite;
@@ -73,7 +72,10 @@ public class AppendCompactTask {
         Preconditions.checkArgument(
                 dvEnabled || compactBefore.size() > 1,
                 "AppendOnlyCompactionTask need more than one file input.");
-        IndexIncrement indexIncrement;
+        // If compact task didn't compact all files, the remain deletion files 
will be written into
+        // new deletion files.
+        List<IndexFileMeta> newIndexFiles = new ArrayList<>();
+        List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
         if (dvEnabled) {
             AppendDeleteFileMaintainer dvIndexFileMaintainer =
                     BaseAppendDeleteFileMaintainer.forUnawareAppend(
@@ -90,10 +92,6 @@ public class AppendCompactTask {
             compactBefore.forEach(
                     f -> 
dvIndexFileMaintainer.notifyRemovedDeletionVector(f.fileName()));
             List<IndexManifestEntry> indexEntries = 
dvIndexFileMaintainer.persist();
-            // If compact task didn't compact all files, the remain deletion 
files will be written
-            // into new deletion files.
-            List<IndexFileMeta> newIndexFiles = new ArrayList<>();
-            List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
             for (IndexManifestEntry entry : indexEntries) {
                 if (entry.kind() == FileKind.ADD) {
                     newIndexFiles.add(entry.indexFile());
@@ -101,15 +99,18 @@ public class AppendCompactTask {
                     deletedIndexFiles.add(entry.indexFile());
                 }
             }
-            indexIncrement = new IndexIncrement(newIndexFiles, 
deletedIndexFiles);
         } else {
             compactAfter.addAll(
                     write.compactRewrite(partition, UNAWARE_BUCKET, null, 
compactBefore));
-            indexIncrement = new IndexIncrement(Collections.emptyList());
         }
 
         CompactIncrement compactIncrement =
-                new CompactIncrement(compactBefore, compactAfter, 
Collections.emptyList());
+                new CompactIncrement(
+                        compactBefore,
+                        compactAfter,
+                        Collections.emptyList(),
+                        newIndexFiles,
+                        deletedIndexFiles);
         return new CommitMessageImpl(
                 partition,
                 // bucket 0 is bucket for unaware-bucket table
@@ -117,8 +118,7 @@ public class AppendCompactTask {
                 0,
                 table.coreOptions().bucket(),
                 DataIncrement.emptyIncrement(),
-                compactIncrement,
-                indexIncrement);
+                compactIncrement);
     }
 
     public int hashCode() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java 
b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
index ad51642a2a..6454c99ef8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
@@ -18,6 +18,9 @@
 
 package org.apache.paimon.io;
 
+import org.apache.paimon.index.IndexFileMeta;
+
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -29,14 +32,27 @@ public class CompactIncrement {
     private final List<DataFileMeta> compactBefore;
     private final List<DataFileMeta> compactAfter;
     private final List<DataFileMeta> changelogFiles;
+    private final List<IndexFileMeta> newIndexFiles;
+    private final List<IndexFileMeta> deletedIndexFiles;
 
     public CompactIncrement(
             List<DataFileMeta> compactBefore,
             List<DataFileMeta> compactAfter,
             List<DataFileMeta> changelogFiles) {
+        this(compactBefore, compactAfter, changelogFiles, new ArrayList<>(), 
new ArrayList<>());
+    }
+
+    public CompactIncrement(
+            List<DataFileMeta> compactBefore,
+            List<DataFileMeta> compactAfter,
+            List<DataFileMeta> changelogFiles,
+            List<IndexFileMeta> newIndexFiles,
+            List<IndexFileMeta> deletedIndexFiles) {
         this.compactBefore = compactBefore;
         this.compactAfter = compactAfter;
         this.changelogFiles = changelogFiles;
+        this.newIndexFiles = newIndexFiles;
+        this.deletedIndexFiles = deletedIndexFiles;
     }
 
     public List<DataFileMeta> compactBefore() {
@@ -51,8 +67,20 @@ public class CompactIncrement {
         return changelogFiles;
     }
 
+    public List<IndexFileMeta> newIndexFiles() {
+        return newIndexFiles;
+    }
+
+    public List<IndexFileMeta> deletedIndexFiles() {
+        return deletedIndexFiles;
+    }
+
     public boolean isEmpty() {
-        return compactBefore.isEmpty() && compactAfter.isEmpty() && 
changelogFiles.isEmpty();
+        return compactBefore.isEmpty()
+                && compactAfter.isEmpty()
+                && changelogFiles.isEmpty()
+                && newIndexFiles.isEmpty()
+                && deletedIndexFiles.isEmpty();
     }
 
     @Override
@@ -67,7 +95,9 @@ public class CompactIncrement {
         CompactIncrement that = (CompactIncrement) o;
         return Objects.equals(compactBefore, that.compactBefore)
                 && Objects.equals(compactAfter, that.compactAfter)
-                && Objects.equals(changelogFiles, that.changelogFiles);
+                && Objects.equals(changelogFiles, that.changelogFiles)
+                && Objects.equals(newIndexFiles, that.newIndexFiles)
+                && Objects.equals(deletedIndexFiles, that.deletedIndexFiles);
     }
 
     @Override
@@ -78,10 +108,14 @@ public class CompactIncrement {
     @Override
     public String toString() {
         return String.format(
-                "CompactIncrement {compactBefore = %s, compactAfter = %s, 
changelogFiles = %s}",
+                "CompactIncrement {compactBefore = %s, compactAfter = %s, 
changelogFiles = %s, newIndexFiles = %s, deletedIndexFiles = %s}",
                 
compactBefore.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
                 
compactAfter.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
-                
changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()));
+                
changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
+                
newIndexFiles.stream().map(IndexFileMeta::fileName).collect(Collectors.toList()),
+                deletedIndexFiles.stream()
+                        .map(IndexFileMeta::fileName)
+                        .collect(Collectors.toList()));
     }
 
     public static CompactIncrement emptyIncrement() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
index b7860fb15e..6049c4dbbb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
@@ -18,25 +18,41 @@
 
 package org.apache.paimon.io;
 
+import org.apache.paimon.index.IndexFileMeta;
+
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-/** Newly created data files and changelog files. */
+/** Increment of data files, changelog files and index files. */
 public class DataIncrement {
 
     private final List<DataFileMeta> newFiles;
     private final List<DataFileMeta> deletedFiles;
     private final List<DataFileMeta> changelogFiles;
+    private final List<IndexFileMeta> newIndexFiles;
+    private final List<IndexFileMeta> deletedIndexFiles;
 
     public DataIncrement(
             List<DataFileMeta> newFiles,
             List<DataFileMeta> deletedFiles,
             List<DataFileMeta> changelogFiles) {
+        this(newFiles, deletedFiles, changelogFiles, new ArrayList<>(), new 
ArrayList<>());
+    }
+
+    public DataIncrement(
+            List<DataFileMeta> newFiles,
+            List<DataFileMeta> deletedFiles,
+            List<DataFileMeta> changelogFiles,
+            List<IndexFileMeta> newIndexFiles,
+            List<IndexFileMeta> deletedIndexFiles) {
         this.newFiles = newFiles;
         this.deletedFiles = deletedFiles;
         this.changelogFiles = changelogFiles;
+        this.newIndexFiles = newIndexFiles;
+        this.deletedIndexFiles = deletedIndexFiles;
     }
 
     public static DataIncrement emptyIncrement() {
@@ -56,8 +72,20 @@ public class DataIncrement {
         return changelogFiles;
     }
 
+    public List<IndexFileMeta> newIndexFiles() {
+        return newIndexFiles;
+    }
+
+    public List<IndexFileMeta> deletedIndexFiles() {
+        return deletedIndexFiles;
+    }
+
     public boolean isEmpty() {
-        return newFiles.isEmpty() && changelogFiles.isEmpty();
+        return newFiles.isEmpty()
+                && deletedFiles.isEmpty()
+                && changelogFiles.isEmpty()
+                && newIndexFiles.isEmpty()
+                && deletedIndexFiles.isEmpty();
     }
 
     @Override
@@ -71,20 +99,28 @@ public class DataIncrement {
 
         DataIncrement that = (DataIncrement) o;
         return Objects.equals(newFiles, that.newFiles)
-                && Objects.equals(changelogFiles, that.changelogFiles);
+                && Objects.equals(deletedFiles, that.deletedFiles)
+                && Objects.equals(changelogFiles, that.changelogFiles)
+                && Objects.equals(newIndexFiles, that.newIndexFiles)
+                && Objects.equals(deletedIndexFiles, that.deletedIndexFiles);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(newFiles, changelogFiles);
+        return Objects.hash(
+                newFiles, deletedFiles, changelogFiles, newIndexFiles, 
deletedIndexFiles);
     }
 
     @Override
     public String toString() {
         return String.format(
-                "DataIncrement {newFiles = %s, deletedFiles = %s, 
changelogFiles = %s}",
+                "DataIncrement {newFiles = %s, deletedFiles = %s, 
changelogFiles = %s, newIndexFiles = %s, deletedIndexFiles = %s}",
                 
newFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
                 
deletedFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
-                
changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()));
+                
changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
+                
newIndexFiles.stream().map(IndexFileMeta::fileName).collect(Collectors.toList()),
+                deletedIndexFiles.stream()
+                        .map(IndexFileMeta::fileName)
+                        .collect(Collectors.toList()));
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java 
b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
deleted file mode 100644
index 9f985f54ed..0000000000
--- a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.index.IndexFileMeta;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/** Incremental index files. */
-public class IndexIncrement {
-
-    private final List<IndexFileMeta> newIndexFiles;
-
-    private final List<IndexFileMeta> deletedIndexFiles;
-
-    public IndexIncrement(List<IndexFileMeta> newIndexFiles) {
-        this.newIndexFiles = newIndexFiles;
-        this.deletedIndexFiles = Collections.emptyList();
-    }
-
-    public IndexIncrement(
-            List<IndexFileMeta> newIndexFiles, List<IndexFileMeta> 
deletedIndexFiles) {
-        this.newIndexFiles = newIndexFiles;
-        this.deletedIndexFiles = deletedIndexFiles;
-    }
-
-    public List<IndexFileMeta> newIndexFiles() {
-        return newIndexFiles;
-    }
-
-    public List<IndexFileMeta> deletedIndexFiles() {
-        return deletedIndexFiles;
-    }
-
-    public boolean isEmpty() {
-        return newIndexFiles.isEmpty() && deletedIndexFiles.isEmpty();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        IndexIncrement that = (IndexIncrement) o;
-        return Objects.equals(newIndexFiles, that.newIndexFiles)
-                && Objects.equals(deletedIndexFiles, that.deletedIndexFiles);
-    }
-
-    @Override
-    public int hashCode() {
-        List<IndexFileMeta> all = new ArrayList<>(newIndexFiles);
-        all.addAll(deletedIndexFiles);
-        return Objects.hash(all);
-    }
-
-    @Override
-    public String toString() {
-        return String.format(
-                "IndexIncrement {newIndexFiles = %s, deletedIndexFiles = %s}",
-                
newIndexFiles.stream().map(IndexFileMeta::fileName).collect(Collectors.toList()),
-                deletedIndexFiles.stream()
-                        .map(IndexFileMeta::fileName)
-                        .collect(Collectors.toList()));
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 7b77fb179f..ddf2addf53 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -28,9 +28,9 @@ import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.index.DynamicBucketIndexMaintainer;
 import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.metrics.CompactionMetrics;
@@ -213,22 +213,26 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
                 WriterContainer<T> writerContainer = entry.getValue();
 
                 CommitIncrement increment = 
writerContainer.writer.prepareCommit(waitCompaction);
-                List<IndexFileMeta> newIndexFiles = new ArrayList<>();
+                DataIncrement newFilesIncrement = 
increment.newFilesIncrement();
+                CompactIncrement compactIncrement = 
increment.compactIncrement();
                 if (writerContainer.dynamicBucketMaintainer != null) {
-                    
newIndexFiles.addAll(writerContainer.dynamicBucketMaintainer.prepareCommit());
+                    newFilesIncrement
+                            .newIndexFiles()
+                            
.addAll(writerContainer.dynamicBucketMaintainer.prepareCommit());
                 }
                 CompactDeletionFile compactDeletionFile = 
increment.compactDeletionFile();
                 if (compactDeletionFile != null) {
-                    
compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add);
+                    compactDeletionFile
+                            .getOrCompute()
+                            .ifPresent(compactIncrement.newIndexFiles()::add);
                 }
                 CommitMessageImpl committable =
                         new CommitMessageImpl(
                                 partition,
                                 bucket,
                                 writerContainer.totalBuckets,
-                                increment.newFilesIncrement(),
-                                increment.compactIncrement(),
-                                new IndexIncrement(newIndexFiles));
+                                newFilesIncrement,
+                                compactIncrement);
                 result.add(committable);
 
                 if (committable.isEmpty()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 1c859fb693..a11339fe7d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.SnapshotCommit;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.FileEntry;
@@ -701,51 +702,53 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     .compactIncrement()
                     .changelogFiles()
                     .forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD, 
commitMessage, m)));
-            commitMessage
-                    .indexIncrement()
-                    .newIndexFiles()
-                    .forEach(
-                            f -> {
-                                switch (f.indexType()) {
-                                    case HASH_INDEX:
-                                        appendHashIndexFiles.add(
-                                                new IndexManifestEntry(
-                                                        FileKind.ADD,
-                                                        
commitMessage.partition(),
-                                                        commitMessage.bucket(),
-                                                        f));
-                                        break;
-                                    case DELETION_VECTORS_INDEX:
-                                        compactDvIndexFiles.add(
-                                                new IndexManifestEntry(
-                                                        FileKind.ADD,
-                                                        
commitMessage.partition(),
-                                                        commitMessage.bucket(),
-                                                        f));
-                                        break;
-                                    default:
-                                        throw new RuntimeException(
-                                                "Unknown index type: " + 
f.indexType());
-                                }
-                            });
-            commitMessage
-                    .indexIncrement()
-                    .deletedIndexFiles()
-                    .forEach(
-                            f -> {
-                                if 
(f.indexType().equals(DELETION_VECTORS_INDEX)) {
-                                    compactDvIndexFiles.add(
-                                            new IndexManifestEntry(
-                                                    FileKind.DELETE,
-                                                    commitMessage.partition(),
-                                                    commitMessage.bucket(),
-                                                    f));
-                                } else {
-                                    throw new RuntimeException(
-                                            "This index type is not supported 
to delete: "
-                                                    + f.indexType());
-                                }
-                            });
+
+            // todo: split them
+            List<IndexFileMeta> newIndexFiles =
+                    new 
ArrayList<>(commitMessage.newFilesIncrement().newIndexFiles());
+            
newIndexFiles.addAll(commitMessage.compactIncrement().newIndexFiles());
+            newIndexFiles.forEach(
+                    f -> {
+                        switch (f.indexType()) {
+                            case HASH_INDEX:
+                                appendHashIndexFiles.add(
+                                        new IndexManifestEntry(
+                                                FileKind.ADD,
+                                                commitMessage.partition(),
+                                                commitMessage.bucket(),
+                                                f));
+                                break;
+                            case DELETION_VECTORS_INDEX:
+                                compactDvIndexFiles.add(
+                                        new IndexManifestEntry(
+                                                FileKind.ADD,
+                                                commitMessage.partition(),
+                                                commitMessage.bucket(),
+                                                f));
+                                break;
+                            default:
+                                throw new RuntimeException("Unknown index 
type: " + f.indexType());
+                        }
+                    });
+
+            // todo: split them
+            List<IndexFileMeta> deletedIndexFiles =
+                    new 
ArrayList<>(commitMessage.newFilesIncrement().deletedIndexFiles());
+            
deletedIndexFiles.addAll(commitMessage.compactIncrement().deletedIndexFiles());
+            deletedIndexFiles.forEach(
+                    f -> {
+                        if (f.indexType().equals(DELETION_VECTORS_INDEX)) {
+                            compactDvIndexFiles.add(
+                                    new IndexManifestEntry(
+                                            FileKind.DELETE,
+                                            commitMessage.partition(),
+                                            commitMessage.bucket(),
+                                            f));
+                        } else {
+                            throw new RuntimeException(
+                                    "This index type is not supported to 
delete: " + f.indexType());
+                        }
+                    });
         }
         if (!commitMessages.isEmpty()) {
             List<String> msg = new ArrayList<>();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
index 5831066816..8e715462f5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
@@ -18,20 +18,17 @@
 
 package org.apache.paimon.table.sink;
 
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.io.DataInputViewStreamWrapper;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
-import org.apache.paimon.io.IndexIncrement;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.Collections;
 import java.util.Objects;
 
 import static org.apache.paimon.utils.SerializationUtils.deserializedBytes;
@@ -50,37 +47,18 @@ public class CommitMessageImpl implements CommitMessage {
     private transient @Nullable Integer totalBuckets;
     private transient DataIncrement dataIncrement;
     private transient CompactIncrement compactIncrement;
-    private transient IndexIncrement indexIncrement;
 
-    @VisibleForTesting
     public CommitMessageImpl(
             BinaryRow partition,
             int bucket,
             @Nullable Integer totalBuckets,
             DataIncrement dataIncrement,
             CompactIncrement compactIncrement) {
-        this(
-                partition,
-                bucket,
-                totalBuckets,
-                dataIncrement,
-                compactIncrement,
-                new IndexIncrement(Collections.emptyList()));
-    }
-
-    public CommitMessageImpl(
-            BinaryRow partition,
-            int bucket,
-            @Nullable Integer totalBuckets,
-            DataIncrement dataIncrement,
-            CompactIncrement compactIncrement,
-            IndexIncrement indexIncrement) {
         this.partition = partition;
         this.bucket = bucket;
         this.totalBuckets = totalBuckets;
         this.dataIncrement = dataIncrement;
         this.compactIncrement = compactIncrement;
-        this.indexIncrement = indexIncrement;
     }
 
     @Override
@@ -106,12 +84,8 @@ public class CommitMessageImpl implements CommitMessage {
         return compactIncrement;
     }
 
-    public IndexIncrement indexIncrement() {
-        return indexIncrement;
-    }
-
     public boolean isEmpty() {
-        return dataIncrement.isEmpty() && compactIncrement.isEmpty() && 
indexIncrement.isEmpty();
+        return dataIncrement.isEmpty() && compactIncrement.isEmpty();
     }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
@@ -131,7 +105,6 @@ public class CommitMessageImpl implements CommitMessage {
         this.totalBuckets = message.totalBuckets;
         this.dataIncrement = message.dataIncrement;
         this.compactIncrement = message.compactIncrement;
-        this.indexIncrement = message.indexIncrement;
     }
 
     @Override
@@ -148,14 +121,12 @@ public class CommitMessageImpl implements CommitMessage {
                 && Objects.equals(partition, that.partition)
                 && Objects.equals(totalBuckets, that.totalBuckets)
                 && Objects.equals(dataIncrement, that.dataIncrement)
-                && Objects.equals(compactIncrement, that.compactIncrement)
-                && Objects.equals(indexIncrement, that.indexIncrement);
+                && Objects.equals(compactIncrement, that.compactIncrement);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(
-                partition, bucket, totalBuckets, dataIncrement, 
compactIncrement, indexIncrement);
+        return Objects.hash(partition, bucket, totalBuckets, dataIncrement, 
compactIncrement);
     }
 
     @Override
@@ -166,8 +137,7 @@ public class CommitMessageImpl implements CommitMessage {
                         + "bucket = %d, "
                         + "totalBuckets = %s, "
                         + "newFilesIncrement = %s, "
-                        + "compactIncrement = %s, "
-                        + "indexIncrement = %s}",
-                partition, bucket, totalBuckets, dataIncrement, 
compactIncrement, indexIncrement);
+                        + "compactIncrement = %s}",
+                partition, bucket, totalBuckets, dataIncrement, 
compactIncrement);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
index 88e9e3513d..5c3976af81 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.sink;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -26,7 +27,6 @@ import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.io.DataInputView;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
@@ -64,23 +64,28 @@ public class CommitMessageLegacyV2Serializer {
     }
 
     public CommitMessage deserialize(DataInputView view) throws IOException {
+        BinaryRow partition = deserializeBinaryRow(view);
+        int bucket = view.readInt();
         if (dataFileSerializer == null) {
             dataFileSerializer = new DataFileMetaLegacyV2Serializer();
             indexEntrySerializer = new IndexFileMetaLegacyV2Serializer();
         }
-        return new CommitMessageImpl(
-                deserializeBinaryRow(view),
-                view.readInt(),
-                null,
+        DataIncrement dataIncrement =
                 new DataIncrement(
                         dataFileSerializer.deserializeList(view),
                         Collections.emptyList(),
-                        dataFileSerializer.deserializeList(view)),
+                        dataFileSerializer.deserializeList(view));
+        CompactIncrement compactIncrement =
                 new CompactIncrement(
                         dataFileSerializer.deserializeList(view),
                         dataFileSerializer.deserializeList(view),
-                        dataFileSerializer.deserializeList(view)),
-                new 
IndexIncrement(indexEntrySerializer.deserializeList(view)));
+                        dataFileSerializer.deserializeList(view));
+        if (compactIncrement.isEmpty()) {
+            
dataIncrement.newIndexFiles().addAll(indexEntrySerializer.deserializeList(view));
+        } else {
+            
compactIncrement.newIndexFiles().addAll(indexEntrySerializer.deserializeList(view));
+        }
+        return new CommitMessageImpl(partition, bucket, null, dataIncrement, 
compactIncrement);
     }
 
     private static RowType legacyDataFileSchema() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 529b0cb369..0a34434028 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.sink;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.serializer.VersionedSerializer;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.index.IndexFileMetaSerializer;
@@ -36,13 +37,11 @@ import org.apache.paimon.io.DataInputDeserializer;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.io.DataOutputViewStreamWrapper;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.utils.IOExceptionSupplier;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
@@ -51,7 +50,7 @@ import static 
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 /** {@link VersionedSerializer} for {@link CommitMessage}. */
 public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessage> {
 
-    private static final int CURRENT_VERSION = 9;
+    public static final int CURRENT_VERSION = 10;
 
     private final DataFileMetaSerializer dataFileSerializer;
     private final IndexFileMetaSerializer indexEntrySerializer;
@@ -103,14 +102,19 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
             view.writeBoolean(false);
         }
 
+        // data increment
         
dataFileSerializer.serializeList(message.newFilesIncrement().newFiles(), view);
         
dataFileSerializer.serializeList(message.newFilesIncrement().deletedFiles(), 
view);
         
dataFileSerializer.serializeList(message.newFilesIncrement().changelogFiles(), 
view);
+        
indexEntrySerializer.serializeList(message.newFilesIncrement().newIndexFiles(), 
view);
+        
indexEntrySerializer.serializeList(message.newFilesIncrement().deletedIndexFiles(),
 view);
+
+        // compact increment
         
dataFileSerializer.serializeList(message.compactIncrement().compactBefore(), 
view);
         
dataFileSerializer.serializeList(message.compactIncrement().compactAfter(), 
view);
         
dataFileSerializer.serializeList(message.compactIncrement().changelogFiles(), 
view);
-        
indexEntrySerializer.serializeList(message.indexIncrement().newIndexFiles(), 
view);
-        
indexEntrySerializer.serializeList(message.indexIncrement().deletedIndexFiles(),
 view);
+        
indexEntrySerializer.serializeList(message.compactIncrement().newIndexFiles(), 
view);
+        
indexEntrySerializer.serializeList(message.compactIncrement().deletedIndexFiles(),
 view);
     }
 
     @Override
@@ -132,18 +136,48 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
         IOExceptionSupplier<List<DataFileMeta>> fileDeserializer = 
fileDeserializer(version, view);
         IOExceptionSupplier<List<IndexFileMeta>> indexEntryDeserializer =
                 indexEntryDeserializer(version, view);
-
-        return new CommitMessageImpl(
-                deserializeBinaryRow(view),
-                view.readInt(),
-                version >= 7 && view.readBoolean() ? view.readInt() : null,
-                new DataIncrement(
-                        fileDeserializer.get(), fileDeserializer.get(), 
fileDeserializer.get()),
-                new CompactIncrement(
-                        fileDeserializer.get(), fileDeserializer.get(), 
fileDeserializer.get()),
-                new IndexIncrement(
-                        indexEntryDeserializer.get(),
-                        version <= 2 ? Collections.emptyList() : 
indexEntryDeserializer.get()));
+        if (version >= 10) {
+            return new CommitMessageImpl(
+                    deserializeBinaryRow(view),
+                    view.readInt(),
+                    view.readBoolean() ? view.readInt() : null,
+                    new DataIncrement(
+                            fileDeserializer.get(),
+                            fileDeserializer.get(),
+                            fileDeserializer.get(),
+                            indexEntryDeserializer.get(),
+                            indexEntryDeserializer.get()),
+                    new CompactIncrement(
+                            fileDeserializer.get(),
+                            fileDeserializer.get(),
+                            fileDeserializer.get(),
+                            indexEntryDeserializer.get(),
+                            indexEntryDeserializer.get()));
+        } else {
+            BinaryRow partition = deserializeBinaryRow(view);
+            int bucket = view.readInt();
+            Integer totalBuckets = version >= 7 && view.readBoolean() ? 
view.readInt() : null;
+            DataIncrement dataIncrement =
+                    new DataIncrement(
+                            fileDeserializer.get(), fileDeserializer.get(), 
fileDeserializer.get());
+            CompactIncrement compactIncrement =
+                    new CompactIncrement(
+                            fileDeserializer.get(), fileDeserializer.get(), 
fileDeserializer.get());
+            if (compactIncrement.isEmpty()) {
+                
dataIncrement.newIndexFiles().addAll(indexEntryDeserializer.get());
+            } else {
+                
compactIncrement.newIndexFiles().addAll(indexEntryDeserializer.get());
+            }
+            if (version > 2) {
+                if (compactIncrement.isEmpty()) {
+                    
dataIncrement.deletedIndexFiles().addAll(indexEntryDeserializer.get());
+                } else {
+                    
compactIncrement.deletedIndexFiles().addAll(indexEntryDeserializer.get());
+                }
+            }
+            return new CommitMessageImpl(
+                    partition, bucket, totalBuckets, dataIncrement, 
compactIncrement);
+        }
     }
 
     private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 95b2df34f5..e549bd2e2c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -290,12 +290,18 @@ public class TableCommitImpl implements InnerTableCommit {
                 Consumer<DataFileMeta> collector = f -> 
files.addAll(f.collectFiles(pathFactory));
                 msg.newFilesIncrement().newFiles().forEach(collector);
                 msg.newFilesIncrement().changelogFiles().forEach(collector);
+                msg.newFilesIncrement().newIndexFiles().stream()
+                        .map(indexFileFactory::toPath)
+                        .forEach(files::add);
+                msg.newFilesIncrement().deletedIndexFiles().stream()
+                        .map(indexFileFactory::toPath)
+                        .forEach(files::add);
                 msg.compactIncrement().compactBefore().forEach(collector);
                 msg.compactIncrement().compactAfter().forEach(collector);
-                msg.indexIncrement().newIndexFiles().stream()
+                msg.compactIncrement().newIndexFiles().stream()
                         .map(indexFileFactory::toPath)
                         .forEach(files::add);
-                msg.indexIncrement().deletedIndexFiles().stream()
+                msg.compactIncrement().deletedIndexFiles().stream()
                         .map(indexFileFactory::toPath)
                         .forEach(files::add);
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 60a00ab567..3ca33a8222 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -30,7 +30,6 @@ import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.operation.FileStoreCommitImpl;
 import org.apache.paimon.schema.Schema;
@@ -115,9 +114,13 @@ public class TestAppendFileStore extends 
AppendOnlyFileStore {
                 partition,
                 bucket,
                 options().bucket(),
-                DataIncrement.emptyIncrement(),
-                CompactIncrement.emptyIncrement(),
-                new IndexIncrement(Collections.emptyList(), indexFileMetas));
+                new DataIncrement(
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        indexFileMetas),
+                CompactIncrement.emptyIncrement());
     }
 
     public List<IndexFileMeta> scanDVIndexFiles(BinaryRow partition, int 
bucket) {
@@ -153,9 +156,13 @@ public class TestAppendFileStore extends 
AppendOnlyFileStore {
                 partition,
                 bucket,
                 options().bucket(),
-                DataIncrement.emptyIncrement(),
-                CompactIncrement.emptyIncrement(),
-                new IndexIncrement(indexFiles));
+                new DataIncrement(
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        indexFiles,
+                        Collections.emptyList()),
+                CompactIncrement.emptyIncrement());
     }
 
     public static TestAppendFileStore createAppendStore(
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 8e4cafc7de..dc1702c264 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -26,7 +26,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.FileSource;
@@ -346,14 +346,15 @@ public class TestFileStore extends KeyValueFileStore {
                     entryWithPartition.getValue().entrySet()) {
                 CommitIncrement increment =
                         
entryWithBucket.getValue().prepareCommit(ignorePreviousFiles);
+                DataIncrement dataIncrement = increment.newFilesIncrement();
+                dataIncrement.newIndexFiles().addAll(indexFiles);
                 committable.addFileCommittable(
                         new CommitMessageImpl(
                                 entryWithPartition.getKey(),
                                 entryWithBucket.getKey(),
                                 options().bucket(),
-                                increment.newFilesIncrement(),
-                                increment.compactIncrement(),
-                                new IndexIncrement(indexFiles)));
+                                dataIncrement,
+                                increment.compactIncrement()));
             }
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java
index e03dd2cee2..0e95135873 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java
@@ -31,7 +31,6 @@ import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.operation.BaseAppendFileStoreWrite;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.RawFileSplitRead;
@@ -105,12 +104,11 @@ public class AppendCompactTaskTest {
 
         CommitMessageImpl compactMessage = (CommitMessageImpl) 
compactTask.doCompact(table, write);
 
-        IndexIncrement indexIncrement = compactMessage.indexIncrement();
-        assertThat(indexIncrement.deletedIndexFiles()).isNotEmpty();
+        
assertThat(compactMessage.compactIncrement().deletedIndexFiles()).isNotEmpty();
         if (compactBeforeAllFiles) {
-            assertThat(indexIncrement.newIndexFiles()).isEmpty();
+            
assertThat(compactMessage.compactIncrement().newIndexFiles()).isEmpty();
         } else {
-            assertThat(indexIncrement.newIndexFiles()).isNotEmpty();
+            
assertThat(compactMessage.compactIncrement().newIndexFiles()).isNotEmpty();
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
index c195517731..10c6f45200 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
@@ -27,7 +27,6 @@ import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.CommitMessage;
@@ -104,9 +103,13 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
                         EMPTY_ROW,
                         0,
                         1,
-                        DataIncrement.emptyIncrement(),
-                        CompactIncrement.emptyIncrement(),
-                        new IndexIncrement(Collections.singletonList(file)));
+                        new DataIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.singletonList(file),
+                                Collections.emptyList()),
+                        CompactIncrement.emptyIncrement());
         BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
         commit.commit(Collections.singletonList(commitMessage));
 
@@ -127,9 +130,13 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
                         EMPTY_ROW,
                         0,
                         1,
-                        DataIncrement.emptyIncrement(),
-                        CompactIncrement.emptyIncrement(),
-                        new IndexIncrement(Collections.singletonList(file)));
+                        new DataIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.singletonList(file),
+                                Collections.emptyList()),
+                        CompactIncrement.emptyIncrement());
         commit = table.newBatchWriteBuilder().newCommit();
         commit.commit(Collections.singletonList(commitMessage));
 
@@ -202,8 +209,12 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
                         0,
                         1,
                         DataIncrement.emptyIncrement(),
-                        CompactIncrement.emptyIncrement(),
-                        new IndexIncrement(Collections.singletonList(file)));
+                        new CompactIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.singletonList(file),
+                                Collections.emptyList()));
         BatchTableCommit commit1 = table.newBatchWriteBuilder().newCommit();
         commit1.commit(Collections.singletonList(commitMessage1));
 
@@ -235,8 +246,12 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
                         0,
                         1,
                         DataIncrement.emptyIncrement(),
-                        CompactIncrement.emptyIncrement(),
-                        new IndexIncrement(Collections.singletonList(file)));
+                        new CompactIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Collections.singletonList(file),
+                                Collections.emptyList()));
         BatchTableCommit commit2 = table.newBatchWriteBuilder().newCommit();
         commit2.commit(Collections.singletonList(commitMessage2));
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
index 3cd9308d79..3a03985c88 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
@@ -71,10 +71,10 @@ class AppendDeletionFileMaintainerTest {
         Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>();
         dataFileToDeletionFiles.putAll(
                 createDeletionFileMapFromIndexFileMetas(
-                        indexPathFactory, 
commitMessage1.indexIncrement().newIndexFiles()));
+                        indexPathFactory, 
commitMessage1.newFilesIncrement().newIndexFiles()));
         dataFileToDeletionFiles.putAll(
                 createDeletionFileMapFromIndexFileMetas(
-                        indexPathFactory, 
commitMessage2.indexIncrement().newIndexFiles()));
+                        indexPathFactory, 
commitMessage2.newFilesIncrement().newIndexFiles()));
 
         AppendDeleteFileMaintainer dvIFMaintainer =
                 store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, 
dataFileToDeletionFiles);
@@ -108,7 +108,7 @@ class AppendDeletionFileMaintainerTest {
                         .findAny()
                         .get();
         assertThat(entry.indexFile())
-                
.isEqualTo(commitMessage1.indexIncrement().newIndexFiles().get(0));
+                
.isEqualTo(commitMessage1.newFilesIncrement().newIndexFiles().get(0));
         entry =
                 res.stream()
                         .filter(file -> file.kind() == FileKind.DELETE)
@@ -116,7 +116,7 @@ class AppendDeletionFileMaintainerTest {
                         .findAny()
                         .get();
         assertThat(entry.indexFile())
-                
.isEqualTo(commitMessage2.indexIncrement().newIndexFiles().get(0));
+                
.isEqualTo(commitMessage2.newFilesIncrement().newIndexFiles().get(0));
     }
 
     private Map<String, DeletionFile> createDeletionFileMapFromIndexFileMetas(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
index 6f5e0d2ecd..b4f5363a06 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
@@ -75,7 +75,7 @@ public class DynamicBucketIndexMaintainerTest extends 
PrimaryKeyTableTestBase {
         Map<BinaryRow, Map<Integer, int[]>> index = new HashMap<>();
         for (CommitMessage commitMessage : messages) {
             CommitMessageImpl message = (CommitMessageImpl) commitMessage;
-            List<IndexFileMeta> files = 
message.indexIncrement().newIndexFiles();
+            List<IndexFileMeta> files = 
message.newFilesIncrement().newIndexFiles();
             if (files.isEmpty()) {
                 continue;
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index 01eca82c2e..4c5415d45d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -22,7 +22,6 @@ import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.StreamTableCommit;
@@ -216,10 +215,13 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
                 bucket,
                 totalBuckets,
                 new DataIncrement(
-                        Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        Collections.singletonList(file),
+                        Collections.emptyList()),
                 new CompactIncrement(
-                        Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()),
-                new IndexIncrement(Collections.singletonList(file)));
+                        Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList()));
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index 927da8e97d..ccc8f6c57e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -24,7 +24,6 @@ import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.IOUtils;
@@ -45,6 +44,100 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Compatibility Test for {@link ManifestCommittableSerializer}. */
 public class ManifestCommittableSerializerCompatibilityTest {
 
+    @Test
+    public void testCompatibilityToV4CommitV10() throws IOException {
+        String fileName = "manifest-committable-v10";
+
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+        DataFileMeta dataFile =
+                DataFileMeta.create(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        FileSource.COMPACT,
+                        Arrays.asList("field1", "field2", "field3"),
+                        "hdfs://localhost:9000/path/to/file",
+                        1L,
+                        Arrays.asList("asdf", "qwer", "zxcv"));
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+        IndexFileMeta hashIndexFile =
+                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, null, null);
+
+        LinkedHashMap<String, DeletionVectorMeta> dvRanges = new 
LinkedHashMap<>();
+        dvRanges.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L));
+        dvRanges.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L));
+        IndexFileMeta devIndexFile =
+                new IndexFileMeta(
+                        "my_index_type",
+                        "my_index_file",
+                        1024 * 100,
+                        1002,
+                        dvRanges,
+                        "external_path");
+
+        CommitMessageImpl commitMessage =
+                new CommitMessageImpl(
+                        singleColumn("my_partition"),
+                        11,
+                        16,
+                        new DataIncrement(
+                                dataFiles,
+                                dataFiles,
+                                dataFiles,
+                                Collections.singletonList(hashIndexFile),
+                                Collections.singletonList(hashIndexFile)),
+                        new CompactIncrement(
+                                dataFiles,
+                                dataFiles,
+                                dataFiles,
+                                Collections.singletonList(devIndexFile),
+                                Collections.emptyList()));
+
+        ManifestCommittable manifestCommittable =
+                new ManifestCommittable(
+                        5,
+                        202020L,
+                        Collections.singletonMap(5, 555L),
+                        Collections.singletonList(commitMessage));
+        manifestCommittable.addProperty("k1", "v1");
+        manifestCommittable.addProperty("k2", "v2");
+
+        ManifestCommittableSerializer serializer = new 
ManifestCommittableSerializer();
+        byte[] bytes = serializer.serialize(manifestCommittable);
+        ManifestCommittable deserialized = 
serializer.deserialize(serializer.getVersion(), bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+
+        byte[] oldBytes =
+                IOUtils.readFully(
+                        ManifestCommittableSerializerCompatibilityTest.class
+                                .getClassLoader()
+                                .getResourceAsStream("compatibility/" + 
fileName),
+                        true);
+        deserialized = serializer.deserialize(4, oldBytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+    }
+
     @Test
     public void testCompatibilityToV4CommitV9() throws IOException {
         SimpleStats keyStats =
@@ -100,8 +193,12 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
                         11,
                         16,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
-                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
-                        new IndexIncrement(indexFiles));
+                        new CompactIncrement(
+                                dataFiles,
+                                dataFiles,
+                                dataFiles,
+                                indexFiles,
+                                Collections.emptyList()));
 
         ManifestCommittable manifestCommittable =
                 new ManifestCommittable(
@@ -178,8 +275,12 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
                         11,
                         16,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
-                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
-                        new IndexIncrement(indexFiles));
+                        new CompactIncrement(
+                                dataFiles,
+                                dataFiles,
+                                dataFiles,
+                                indexFiles,
+                                Collections.emptyList()));
 
         ManifestCommittable manifestCommittable =
                 new ManifestCommittable(
@@ -256,8 +357,12 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
                         11,
                         16,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
-                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
-                        new IndexIncrement(indexFiles));
+                        new CompactIncrement(
+                                dataFiles,
+                                dataFiles,
+                                dataFiles,
+                                indexFiles,
+                                Collections.emptyList()));
 
         ManifestCommittable manifestCommittable =
                 new ManifestCommittable(
@@ -333,8 +438,12 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
                         11,
                         16,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
-                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
-                        new IndexIncrement(indexFiles));
+                        new CompactIncrement(
+                                dataFiles,
+                                dataFiles,
+                                dataFiles,
+                                indexFiles,
+                                Collections.emptyList()));
 
         ManifestCommittable manifestCommittable =
                 new ManifestCommittable(
@@ -408,8 +517,12 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
                         11,
                         null,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
-                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
-                        new IndexIncrement(indexFiles));
+                        new CompactIncrement(
+                                dataFiles,
+                                dataFiles,
+                                dataFiles,
+                                indexFiles,
+                                Collections.emptyList()));
 
         ManifestCommittable manifestCommittable =
                 new ManifestCommittable(
@@ -483,8 +596,12 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
                         11,
                         null,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
-                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
-                        new IndexIncrement(indexFiles));
+                        new CompactIncrement(
+                                dataFiles,
+                                dataFiles,
+                                dataFiles,
+                                indexFiles,
+                                Collections.emptyList()));
 
         ManifestCommittable manifestCommittable =
                 new ManifestCommittable(
@@ -557,8 +674,12 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
                         11,
                         null,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
-                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
-                        new IndexIncrement(indexFiles));
+                        new CompactIncrement(
+                                dataFiles,
+                                dataFiles,
+                                dataFiles,
+                                indexFiles,
+                                Collections.emptyList()));
 
         ManifestCommittable manifestCommittable =
                 new ManifestCommittable(
@@ -632,8 +753,12 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
                         11,
                         null,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
-                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
-                        new IndexIncrement(indexFiles));
+                        new CompactIncrement(
+                                dataFiles,
+                                dataFiles,
+                                dataFiles,
+                                indexFiles,
+                                Collections.emptyList()));
 
         ManifestCommittable manifestCommittable =
                 new ManifestCommittable(
@@ -707,8 +832,12 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
                         11,
                         null,
                         new DataIncrement(dataFiles, dataFiles, dataFiles),
-                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
-                        new IndexIncrement(indexFiles));
+                        new CompactIncrement(
+                                dataFiles,
+                                dataFiles,
+                                dataFiles,
+                                indexFiles,
+                                Collections.emptyList()));
 
         ManifestCommittable manifestCommittable =
                 new ManifestCommittable(
@@ -778,8 +907,12 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
                         11,
                         null,
                         new DataIncrement(dataFiles, Collections.emptyList(), 
dataFiles),
-                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
-                        new IndexIncrement(indexFiles));
+                        new CompactIncrement(
+                                dataFiles,
+                                dataFiles,
+                                dataFiles,
+                                indexFiles,
+                                Collections.emptyList()));
 
         ManifestCommittable manifestCommittable =
                 new ManifestCommittable(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 47a876f345..35843e0c2f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -914,8 +914,8 @@ public class FileStoreCommitTest {
                 store.writeDVIndexFiles(
                         BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2", 
Arrays.asList(3)));
         List<IndexFileMeta> deleted =
-                new 
ArrayList<>(commitMessage1.indexIncrement().newIndexFiles());
-        deleted.addAll(commitMessage2.indexIncrement().newIndexFiles());
+                new 
ArrayList<>(commitMessage1.newFilesIncrement().newIndexFiles());
+        deleted.addAll(commitMessage2.newFilesIncrement().newIndexFiles());
         CommitMessage commitMessage4 = 
store.removeIndexFiles(BinaryRow.EMPTY_ROW, 0, deleted);
         store.commit(commitMessage3, commitMessage4);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
index 29e71e2b44..0767200bb1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
@@ -61,7 +61,7 @@ public class DynamicBucketTableTest extends TableTestBase {
         batchTableWrite.write(rowWithBucket.getKey(), 
rowWithBucket.getValue());
         assertThat(
                         ((CommitMessageImpl) 
batchTableWrite.prepareCommit().get(0))
-                                .indexIncrement()
+                                .newFilesIncrement()
                                 .newIndexFiles()
                                 .get(0)
                                 .rowCount())
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
index 351af6a3b7..c4f519e84e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.sink;
 
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
 
 import org.junit.jupiter.api.Test;
 
@@ -41,22 +40,31 @@ public class CommitMessageSerializerTest {
         CommitMessageSerializer serializer = new CommitMessageSerializer();
 
         DataIncrement dataIncrement = randomNewFilesIncrement();
+        dataIncrement.newIndexFiles().addAll(Arrays.asList(randomIndexFile(), 
randomIndexFile()));
+        dataIncrement
+                .deletedIndexFiles()
+                .addAll(Arrays.asList(randomIndexFile(), randomIndexFile()));
+
         CompactIncrement compactIncrement = randomCompactIncrement();
-        IndexIncrement indexIncrement =
-                new IndexIncrement(
-                        Arrays.asList(randomIndexFile(), randomIndexFile()),
-                        Arrays.asList(randomIndexFile(), randomIndexFile()));
+        compactIncrement
+                .newIndexFiles()
+                .addAll(Arrays.asList(randomIndexFile(), randomIndexFile()));
+        compactIncrement
+                .deletedIndexFiles()
+                .addAll(Arrays.asList(randomIndexFile(), randomIndexFile()));
+
         CommitMessageImpl committable =
-                new CommitMessageImpl(
-                        row(0), 1, 2, dataIncrement, compactIncrement, 
indexIncrement);
+                new CommitMessageImpl(row(0), 1, 2, dataIncrement, 
compactIncrement);
 
         CommitMessageImpl newCommittable =
-                (CommitMessageImpl) serializer.deserialize(7, 
serializer.serialize(committable));
+                (CommitMessageImpl)
+                        serializer.deserialize(
+                                CommitMessageSerializer.CURRENT_VERSION,
+                                serializer.serialize(committable));
         
assertThat(newCommittable.partition()).isEqualTo(committable.partition());
         assertThat(newCommittable.bucket()).isEqualTo(committable.bucket());
         
assertThat(newCommittable.totalBuckets()).isEqualTo(committable.totalBuckets());
         
assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement());
         
assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement());
-        
assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement());
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/CompatibilityUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/CompatibilityUtils.java
new file mode 100644
index 0000000000..67b8a3d511
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/CompatibilityUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/** Utils for compatibility. */
+public class CompatibilityUtils {
+
+    public static final String COMPATIBILITY_FILE_DIR = 
"/src/test/resources/compatibility/";
+
+    /** Write compatibility file. */
+    public static void writeCompatibilityFile(String fileName, byte[] data) 
throws IOException {
+        File file = new File(System.getProperty("user.dir") + 
COMPATIBILITY_FILE_DIR + fileName);
+        try (FileOutputStream fos = new FileOutputStream(file)) {
+            fos.write(data);
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/resources/compatibility/manifest-committable-v10 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v10
new file mode 100644
index 0000000000..0ae6e7bfe3
Binary files /dev/null and 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v10 differ
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperator.java
index 674dd59e7e..827d79ca5a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperator.java
@@ -128,9 +128,10 @@ public class AppendPreCommitCompactCoordinatorOperator
                         new DataIncrement(
                                 skippedFiles,
                                 message.newFilesIncrement().deletedFiles(),
-                                message.newFilesIncrement().changelogFiles()),
-                        message.compactIncrement(),
-                        message.indexIncrement());
+                                message.newFilesIncrement().changelogFiles(),
+                                message.newFilesIncrement().newIndexFiles(),
+                                
message.newFilesIncrement().deletedIndexFiles()),
+                        message.compactIncrement());
         if (!newMessage.isEmpty()) {
             Committable newCommittable =
                     new Committable(committable.checkpointId(), 
Committable.Kind.FILE, newMessage);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
index 0190279a8b..d336717031 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
@@ -138,12 +138,15 @@ public class ChangelogCompactCoordinateOperator
                         new DataIncrement(
                                 message.newFilesIncrement().newFiles(),
                                 message.newFilesIncrement().deletedFiles(),
-                                skippedNewChangelogs),
+                                skippedNewChangelogs,
+                                message.newFilesIncrement().newIndexFiles(),
+                                
message.newFilesIncrement().deletedIndexFiles()),
                         new CompactIncrement(
                                 message.compactIncrement().compactBefore(),
                                 message.compactIncrement().compactAfter(),
-                                skippedCompactChangelogs),
-                        message.indexIncrement());
+                                skippedCompactChangelogs,
+                                message.compactIncrement().newIndexFiles(),
+                                
message.compactIncrement().deletedIndexFiles()));
         Committable newCommittable =
                 new Committable(committable.checkpointId(), 
Committable.Kind.FILE, newMessage);
         output.collect(new StreamRecord<>(Either.Left(newCommittable)));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
index 92e0110f27..e6183ccb8a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -95,12 +94,15 @@ public class ChangelogCompactSortOperator extends 
AbstractStreamOperator<Committ
                         new DataIncrement(
                                 message.newFilesIncrement().newFiles(),
                                 message.newFilesIncrement().deletedFiles(),
-                                Collections.emptyList()),
+                                Collections.emptyList(),
+                                message.newFilesIncrement().newIndexFiles(),
+                                
message.newFilesIncrement().deletedIndexFiles()),
                         new CompactIncrement(
                                 message.compactIncrement().compactBefore(),
                                 message.compactIncrement().compactAfter(),
-                                Collections.emptyList()),
-                        message.indexIncrement());
+                                Collections.emptyList(),
+                                message.compactIncrement().newIndexFiles(),
+                                
message.compactIncrement().deletedIndexFiles()));
         if (!newMessage.isEmpty()) {
             Committable newCommittable =
                     new Committable(committable.checkpointId(), 
Committable.Kind.FILE, newMessage);
@@ -138,8 +140,8 @@ public class ChangelogCompactSortOperator extends 
AbstractStreamOperator<Committ
                                 new CompactIncrement(
                                         Collections.emptyList(),
                                         Collections.emptyList(),
-                                        
sortedChangelogs(compactChangelogFiles, partition, bucket)),
-                                new IndexIncrement(Collections.emptyList()));
+                                        sortedChangelogs(
+                                                compactChangelogFiles, 
partition, bucket)));
                 Committable newCommittable =
                         new Committable(checkpointId, Committable.Kind.FILE, 
newMessage);
                 output.collect(new StreamRecord<>(newCommittable));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
index 2991fb4ecc..50eb5a0796 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
@@ -27,7 +27,6 @@ import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -137,8 +136,8 @@ public class PostponeBucketCommittableRewriter {
             
changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
             changelogFiles.addAll(message.compactIncrement().changelogFiles());
 
-            newIndexFiles.addAll(message.indexIncrement().newIndexFiles());
-            
deletedIndexFiles.addAll(message.indexIncrement().deletedIndexFiles());
+            newIndexFiles.addAll(message.compactIncrement().newIndexFiles());
+            
deletedIndexFiles.addAll(message.compactIncrement().deletedIndexFiles());
 
             toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
         }
@@ -151,8 +150,12 @@ public class PostponeBucketCommittableRewriter {
                     bucket,
                     totalBuckets,
                     DataIncrement.emptyIncrement(),
-                    new CompactIncrement(compactBefore, realCompactAfter, 
changelogFiles),
-                    new IndexIncrement(newIndexFiles, deletedIndexFiles));
+                    new CompactIncrement(
+                            compactBefore,
+                            realCompactAfter,
+                            changelogFiles,
+                            newIndexFiles,
+                            deletedIndexFiles));
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
index 5dbd9f3a09..0283b197d2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
@@ -147,9 +147,7 @@ public class PartitionMarkDoneListener implements 
CommitListener {
         for (ManifestCommittable committable : committables) {
             for (CommitMessage commitMessage : committable.fileCommittables()) 
{
                 CommitMessageImpl message = (CommitMessageImpl) commitMessage;
-                if (waitCompaction
-                        || !message.indexIncrement().isEmpty()
-                        || !message.newFilesIncrement().isEmpty()) {
+                if (waitCompaction || !message.newFilesIncrement().isEmpty()) {
                     partitions.add(message.partition());
                 }
             }
@@ -199,9 +197,7 @@ public class PartitionMarkDoneListener implements 
CommitListener {
             if (watermark != null) {
                 for (CommitMessage commitMessage : 
committable.fileCommittables()) {
                     CommitMessageImpl message = (CommitMessageImpl) 
commitMessage;
-                    if (waitCompaction
-                            || !message.indexIncrement().isEmpty()
-                            || !message.newFilesIncrement().isEmpty()) {
+                    if (waitCompaction || 
!message.newFilesIncrement().isEmpty()) {
                         partitionWatermarks.compute(
                                 message.partition(),
                                 (partition, old) ->
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
index 333f315df6..ad9e9355e1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
@@ -190,7 +190,8 @@ public class AppendPreCommitCompactCoordinatorOperatorTest {
         assertThat(message.newFilesIncrement().deletedFiles()).isEmpty();
         assertThat(message.newFilesIncrement().changelogFiles()).isEmpty();
         assertThat(message.compactIncrement().isEmpty()).isTrue();
-        assertThat(message.indexIncrement().isEmpty()).isTrue();
+        
assertThat(message.newFilesIncrement().newIndexFiles().isEmpty()).isTrue();
+        
assertThat(message.newFilesIncrement().deletedIndexFiles().isEmpty()).isTrue();
         
assertThat(message.newFilesIncrement().newFiles().stream().map(DataFileMeta::fileSize))
                 .hasSameElementsAs(
                         Arrays.stream(mbs)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index 109a3f984e..276bf45193 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -27,7 +27,6 @@ import org.apache.paimon.flink.utils.TestingMetricUtils;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.Schema;
@@ -596,8 +595,7 @@ public class WriterOperatorTest {
                             message.bucket(),
                             message.totalBuckets(),
                             message.newFilesIncrement(),
-                            CompactIncrement.emptyIncrement(),
-                            new IndexIncrement(Collections.emptyList()));
+                            CompactIncrement.emptyIncrement());
             commitMessages.add(newMessage);
         }
         commit.commit(commitIdentifier, commitMessages);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
index 3439415c2f..c873234105 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
@@ -24,7 +24,6 @@ import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileTestUtils;
 import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 
@@ -67,8 +66,7 @@ class ListenerTestUtils {
                             0,
                             1,
                             new DataIncrement(emptyList(), emptyList(), 
emptyList()),
-                            new CompactIncrement(singletonList(file), 
emptyList(), emptyList()),
-                            new IndexIncrement(emptyList()));
+                            new CompactIncrement(singletonList(file), 
emptyList(), emptyList()));
         } else {
             compactMessage =
                     new CommitMessageImpl(
@@ -76,8 +74,7 @@ class ListenerTestUtils {
                             0,
                             1,
                             new DataIncrement(singletonList(file), 
emptyList(), emptyList()),
-                            new CompactIncrement(emptyList(), emptyList(), 
emptyList()),
-                            new IndexIncrement(emptyList()));
+                            new CompactIncrement(emptyList(), emptyList(), 
emptyList()));
         }
         committable.addFileCommittable(compactMessage);
         if (partitionMarkDoneRecoverFromState) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
index 7aea754040..300ca20ff8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
@@ -21,8 +21,7 @@ package org.apache.paimon.spark.commands
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.deletionvectors.{Bitmap64DeletionVector, 
BitmapDeletionVector, DeletionVector}
 import org.apache.paimon.fs.Path
-import org.apache.paimon.index.IndexFileMeta
-import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, 
IndexIncrement}
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
@@ -220,8 +219,7 @@ trait PaimonRowLevelCommand
             new CompactIncrement(
               Collections.emptyList[DataFileMeta],
               Collections.emptyList[DataFileMeta],
-              Collections.emptyList[DataFileMeta]),
-            new IndexIncrement(Collections.emptyList[IndexFileMeta])
+              Collections.emptyList[DataFileMeta])
           )
       }
       .toSeq
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 344cd0879b..70f4676cad 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -26,7 +26,7 @@ import org.apache.paimon.data.serializer.InternalSerializers
 import org.apache.paimon.deletionvectors.DeletionVector
 import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer
 import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
-import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
+import org.apache.paimon.io.{CompactIncrement, DataIncrement}
 import org.apache.paimon.manifest.FileKind
 import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils}
 import org.apache.paimon.spark.catalog.functions.BucketFunction
@@ -344,9 +344,13 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowTracking: Boolean =
             dvIndexFileMaintainer.getPartition,
             dvIndexFileMaintainer.getBucket,
             null,
-            DataIncrement.emptyIncrement(),
-            CompactIncrement.emptyIncrement(),
-            new IndexIncrement(added.map(_.indexFile).asJava, 
deleted.map(_.indexFile).asJava)
+            new DataIncrement(
+              java.util.Collections.emptyList(),
+              java.util.Collections.emptyList(),
+              java.util.Collections.emptyList(),
+              added.map(_.indexFile).asJava,
+              deleted.map(_.indexFile).asJava),
+            CompactIncrement.emptyIncrement()
           )
           val serializer = new CommitMessageSerializer
           serializer.serialize(commitMessage)


Reply via email to