This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 04b251053d [core] Separate indexIncrement into dataIncrement and
compactIncrement (#6285)
04b251053d is described below
commit 04b251053d2a669b693035b6410895da59fbaad4
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Sep 19 14:36:07 2025 +0800
[core] Separate indexIncrement into dataIncrement and compactIncrement
(#6285)
---
.../apache/paimon/append/AppendCompactTask.java | 22 +--
.../org/apache/paimon/io/CompactIncrement.java | 42 ++++-
.../java/org/apache/paimon/io/DataIncrement.java | 48 +++++-
.../java/org/apache/paimon/io/IndexIncrement.java | 88 -----------
.../paimon/operation/AbstractFileStoreWrite.java | 20 ++-
.../paimon/operation/FileStoreCommitImpl.java | 93 +++++------
.../paimon/table/sink/CommitMessageImpl.java | 40 +----
.../sink/CommitMessageLegacyV2Serializer.java | 21 ++-
.../paimon/table/sink/CommitMessageSerializer.java | 68 ++++++--
.../apache/paimon/table/sink/TableCommitImpl.java | 10 +-
.../org/apache/paimon/TestAppendFileStore.java | 21 ++-
.../test/java/org/apache/paimon/TestFileStore.java | 9 +-
.../paimon/append/AppendCompactTaskTest.java | 8 +-
.../deletionvectors/BucketedDvMaintainerTest.java | 37 +++--
.../append/AppendDeletionFileMaintainerTest.java | 8 +-
.../index/DynamicBucketIndexMaintainerTest.java | 2 +-
.../paimon/index/HashBucketAssignerTest.java | 10 +-
...festCommittableSerializerCompatibilityTest.java | 175 ++++++++++++++++++---
.../paimon/operation/FileStoreCommitTest.java | 4 +-
.../paimon/table/DynamicBucketTableTest.java | 2 +-
.../table/sink/CommitMessageSerializerTest.java | 26 +--
.../apache/paimon/utils/CompatibilityUtils.java | 37 +++++
.../compatibility/manifest-committable-v10 | Bin 0 -> 3978 bytes
.../AppendPreCommitCompactCoordinatorOperator.java | 7 +-
.../ChangelogCompactCoordinateOperator.java | 9 +-
.../changelog/ChangelogCompactSortOperator.java | 14 +-
.../PostponeBucketCommittableRewriter.java | 13 +-
.../sink/listener/PartitionMarkDoneListener.java | 8 +-
...endPreCommitCompactCoordinatorOperatorTest.java | 3 +-
.../paimon/flink/sink/WriterOperatorTest.java | 4 +-
.../flink/sink/listener/ListenerTestUtils.java | 7 +-
.../spark/commands/PaimonRowLevelCommand.scala | 6 +-
.../paimon/spark/commands/PaimonSparkWriter.scala | 12 +-
33 files changed, 541 insertions(+), 333 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java
index b3b9a0cb17..42c4548842 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java
@@ -25,7 +25,6 @@ import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
@@ -73,7 +72,10 @@ public class AppendCompactTask {
Preconditions.checkArgument(
dvEnabled || compactBefore.size() > 1,
"AppendOnlyCompactionTask need more than one file input.");
- IndexIncrement indexIncrement;
+ // If compact task didn't compact all files, the remain deletion files
will be written into
+ // new deletion files.
+ List<IndexFileMeta> newIndexFiles = new ArrayList<>();
+ List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
if (dvEnabled) {
AppendDeleteFileMaintainer dvIndexFileMaintainer =
BaseAppendDeleteFileMaintainer.forUnawareAppend(
@@ -90,10 +92,6 @@ public class AppendCompactTask {
compactBefore.forEach(
f ->
dvIndexFileMaintainer.notifyRemovedDeletionVector(f.fileName()));
List<IndexManifestEntry> indexEntries =
dvIndexFileMaintainer.persist();
- // If compact task didn't compact all files, the remain deletion
files will be written
- // into new deletion files.
- List<IndexFileMeta> newIndexFiles = new ArrayList<>();
- List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
for (IndexManifestEntry entry : indexEntries) {
if (entry.kind() == FileKind.ADD) {
newIndexFiles.add(entry.indexFile());
@@ -101,15 +99,18 @@ public class AppendCompactTask {
deletedIndexFiles.add(entry.indexFile());
}
}
- indexIncrement = new IndexIncrement(newIndexFiles,
deletedIndexFiles);
} else {
compactAfter.addAll(
write.compactRewrite(partition, UNAWARE_BUCKET, null,
compactBefore));
- indexIncrement = new IndexIncrement(Collections.emptyList());
}
CompactIncrement compactIncrement =
- new CompactIncrement(compactBefore, compactAfter,
Collections.emptyList());
+ new CompactIncrement(
+ compactBefore,
+ compactAfter,
+ Collections.emptyList(),
+ newIndexFiles,
+ deletedIndexFiles);
return new CommitMessageImpl(
partition,
// bucket 0 is bucket for unaware-bucket table
@@ -117,8 +118,7 @@ public class AppendCompactTask {
0,
table.coreOptions().bucket(),
DataIncrement.emptyIncrement(),
- compactIncrement,
- indexIncrement);
+ compactIncrement);
}
public int hashCode() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
index ad51642a2a..6454c99ef8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
@@ -18,6 +18,9 @@
package org.apache.paimon.io;
+import org.apache.paimon.index.IndexFileMeta;
+
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
@@ -29,14 +32,27 @@ public class CompactIncrement {
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
private final List<DataFileMeta> changelogFiles;
+ private final List<IndexFileMeta> newIndexFiles;
+ private final List<IndexFileMeta> deletedIndexFiles;
public CompactIncrement(
List<DataFileMeta> compactBefore,
List<DataFileMeta> compactAfter,
List<DataFileMeta> changelogFiles) {
+ this(compactBefore, compactAfter, changelogFiles, new ArrayList<>(),
new ArrayList<>());
+ }
+
+ public CompactIncrement(
+ List<DataFileMeta> compactBefore,
+ List<DataFileMeta> compactAfter,
+ List<DataFileMeta> changelogFiles,
+ List<IndexFileMeta> newIndexFiles,
+ List<IndexFileMeta> deletedIndexFiles) {
this.compactBefore = compactBefore;
this.compactAfter = compactAfter;
this.changelogFiles = changelogFiles;
+ this.newIndexFiles = newIndexFiles;
+ this.deletedIndexFiles = deletedIndexFiles;
}
public List<DataFileMeta> compactBefore() {
@@ -51,8 +67,20 @@ public class CompactIncrement {
return changelogFiles;
}
+ public List<IndexFileMeta> newIndexFiles() {
+ return newIndexFiles;
+ }
+
+ public List<IndexFileMeta> deletedIndexFiles() {
+ return deletedIndexFiles;
+ }
+
public boolean isEmpty() {
- return compactBefore.isEmpty() && compactAfter.isEmpty() &&
changelogFiles.isEmpty();
+ return compactBefore.isEmpty()
+ && compactAfter.isEmpty()
+ && changelogFiles.isEmpty()
+ && newIndexFiles.isEmpty()
+ && deletedIndexFiles.isEmpty();
}
@Override
@@ -67,7 +95,9 @@ public class CompactIncrement {
CompactIncrement that = (CompactIncrement) o;
return Objects.equals(compactBefore, that.compactBefore)
&& Objects.equals(compactAfter, that.compactAfter)
- && Objects.equals(changelogFiles, that.changelogFiles);
+ && Objects.equals(changelogFiles, that.changelogFiles)
+ && Objects.equals(newIndexFiles, that.newIndexFiles)
+ && Objects.equals(deletedIndexFiles, that.deletedIndexFiles);
}
@Override
@@ -78,10 +108,14 @@ public class CompactIncrement {
@Override
public String toString() {
return String.format(
- "CompactIncrement {compactBefore = %s, compactAfter = %s,
changelogFiles = %s}",
+ "CompactIncrement {compactBefore = %s, compactAfter = %s,
changelogFiles = %s, newIndexFiles = %s, deletedIndexFiles = %s}",
compactBefore.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
compactAfter.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
-
changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()));
+
changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
+
newIndexFiles.stream().map(IndexFileMeta::fileName).collect(Collectors.toList()),
+ deletedIndexFiles.stream()
+ .map(IndexFileMeta::fileName)
+ .collect(Collectors.toList()));
}
public static CompactIncrement emptyIncrement() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
index b7860fb15e..6049c4dbbb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
@@ -18,25 +18,41 @@
package org.apache.paimon.io;
+import org.apache.paimon.index.IndexFileMeta;
+
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
-/** Newly created data files and changelog files. */
+/** Increment of data files, changelog files and index files. */
public class DataIncrement {
private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> changelogFiles;
+ private final List<IndexFileMeta> newIndexFiles;
+ private final List<IndexFileMeta> deletedIndexFiles;
public DataIncrement(
List<DataFileMeta> newFiles,
List<DataFileMeta> deletedFiles,
List<DataFileMeta> changelogFiles) {
+ this(newFiles, deletedFiles, changelogFiles, new ArrayList<>(), new
ArrayList<>());
+ }
+
+ public DataIncrement(
+ List<DataFileMeta> newFiles,
+ List<DataFileMeta> deletedFiles,
+ List<DataFileMeta> changelogFiles,
+ List<IndexFileMeta> newIndexFiles,
+ List<IndexFileMeta> deletedIndexFiles) {
this.newFiles = newFiles;
this.deletedFiles = deletedFiles;
this.changelogFiles = changelogFiles;
+ this.newIndexFiles = newIndexFiles;
+ this.deletedIndexFiles = deletedIndexFiles;
}
public static DataIncrement emptyIncrement() {
@@ -56,8 +72,20 @@ public class DataIncrement {
return changelogFiles;
}
+ public List<IndexFileMeta> newIndexFiles() {
+ return newIndexFiles;
+ }
+
+ public List<IndexFileMeta> deletedIndexFiles() {
+ return deletedIndexFiles;
+ }
+
public boolean isEmpty() {
- return newFiles.isEmpty() && changelogFiles.isEmpty();
+ return newFiles.isEmpty()
+ && deletedFiles.isEmpty()
+ && changelogFiles.isEmpty()
+ && newIndexFiles.isEmpty()
+ && deletedIndexFiles.isEmpty();
}
@Override
@@ -71,20 +99,28 @@ public class DataIncrement {
DataIncrement that = (DataIncrement) o;
return Objects.equals(newFiles, that.newFiles)
- && Objects.equals(changelogFiles, that.changelogFiles);
+ && Objects.equals(deletedFiles, that.deletedFiles)
+ && Objects.equals(changelogFiles, that.changelogFiles)
+ && Objects.equals(newIndexFiles, that.newIndexFiles)
+ && Objects.equals(deletedIndexFiles, that.deletedIndexFiles);
}
@Override
public int hashCode() {
- return Objects.hash(newFiles, changelogFiles);
+ return Objects.hash(
+ newFiles, deletedFiles, changelogFiles, newIndexFiles,
deletedIndexFiles);
}
@Override
public String toString() {
return String.format(
- "DataIncrement {newFiles = %s, deletedFiles = %s,
changelogFiles = %s}",
+ "DataIncrement {newFiles = %s, deletedFiles = %s,
changelogFiles = %s, newIndexFiles = %s, deletedIndexFiles = %s}",
newFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
deletedFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
-
changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()));
+
changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()),
+
newIndexFiles.stream().map(IndexFileMeta::fileName).collect(Collectors.toList()),
+ deletedIndexFiles.stream()
+ .map(IndexFileMeta::fileName)
+ .collect(Collectors.toList()));
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
deleted file mode 100644
index 9f985f54ed..0000000000
--- a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.io;
-
-import org.apache.paimon.index.IndexFileMeta;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/** Incremental index files. */
-public class IndexIncrement {
-
- private final List<IndexFileMeta> newIndexFiles;
-
- private final List<IndexFileMeta> deletedIndexFiles;
-
- public IndexIncrement(List<IndexFileMeta> newIndexFiles) {
- this.newIndexFiles = newIndexFiles;
- this.deletedIndexFiles = Collections.emptyList();
- }
-
- public IndexIncrement(
- List<IndexFileMeta> newIndexFiles, List<IndexFileMeta>
deletedIndexFiles) {
- this.newIndexFiles = newIndexFiles;
- this.deletedIndexFiles = deletedIndexFiles;
- }
-
- public List<IndexFileMeta> newIndexFiles() {
- return newIndexFiles;
- }
-
- public List<IndexFileMeta> deletedIndexFiles() {
- return deletedIndexFiles;
- }
-
- public boolean isEmpty() {
- return newIndexFiles.isEmpty() && deletedIndexFiles.isEmpty();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- IndexIncrement that = (IndexIncrement) o;
- return Objects.equals(newIndexFiles, that.newIndexFiles)
- && Objects.equals(deletedIndexFiles, that.deletedIndexFiles);
- }
-
- @Override
- public int hashCode() {
- List<IndexFileMeta> all = new ArrayList<>(newIndexFiles);
- all.addAll(deletedIndexFiles);
- return Objects.hash(all);
- }
-
- @Override
- public String toString() {
- return String.format(
- "IndexIncrement {newIndexFiles = %s, deletedIndexFiles = %s}",
-
newIndexFiles.stream().map(IndexFileMeta::fileName).collect(Collectors.toList()),
- deletedIndexFiles.stream()
- .map(IndexFileMeta::fileName)
- .collect(Collectors.toList()));
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 7b77fb179f..ddf2addf53 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -28,9 +28,9 @@ import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
import org.apache.paimon.index.IndexFileHandler;
-import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.CompactionMetrics;
@@ -213,22 +213,26 @@ public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
WriterContainer<T> writerContainer = entry.getValue();
CommitIncrement increment =
writerContainer.writer.prepareCommit(waitCompaction);
- List<IndexFileMeta> newIndexFiles = new ArrayList<>();
+ DataIncrement newFilesIncrement =
increment.newFilesIncrement();
+ CompactIncrement compactIncrement =
increment.compactIncrement();
if (writerContainer.dynamicBucketMaintainer != null) {
-
newIndexFiles.addAll(writerContainer.dynamicBucketMaintainer.prepareCommit());
+ newFilesIncrement
+ .newIndexFiles()
+
.addAll(writerContainer.dynamicBucketMaintainer.prepareCommit());
}
CompactDeletionFile compactDeletionFile =
increment.compactDeletionFile();
if (compactDeletionFile != null) {
-
compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add);
+ compactDeletionFile
+ .getOrCompute()
+ .ifPresent(compactIncrement.newIndexFiles()::add);
}
CommitMessageImpl committable =
new CommitMessageImpl(
partition,
bucket,
writerContainer.totalBuckets,
- increment.newFilesIncrement(),
- increment.compactIncrement(),
- new IndexIncrement(newIndexFiles));
+ newFilesIncrement,
+ compactIncrement);
result.add(committable);
if (committable.isEmpty()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 1c859fb693..a11339fe7d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -25,6 +25,7 @@ import org.apache.paimon.catalog.SnapshotCommit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileEntry;
@@ -701,51 +702,53 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
.compactIncrement()
.changelogFiles()
.forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD,
commitMessage, m)));
- commitMessage
- .indexIncrement()
- .newIndexFiles()
- .forEach(
- f -> {
- switch (f.indexType()) {
- case HASH_INDEX:
- appendHashIndexFiles.add(
- new IndexManifestEntry(
- FileKind.ADD,
-
commitMessage.partition(),
- commitMessage.bucket(),
- f));
- break;
- case DELETION_VECTORS_INDEX:
- compactDvIndexFiles.add(
- new IndexManifestEntry(
- FileKind.ADD,
-
commitMessage.partition(),
- commitMessage.bucket(),
- f));
- break;
- default:
- throw new RuntimeException(
- "Unknown index type: " +
f.indexType());
- }
- });
- commitMessage
- .indexIncrement()
- .deletedIndexFiles()
- .forEach(
- f -> {
- if
(f.indexType().equals(DELETION_VECTORS_INDEX)) {
- compactDvIndexFiles.add(
- new IndexManifestEntry(
- FileKind.DELETE,
- commitMessage.partition(),
- commitMessage.bucket(),
- f));
- } else {
- throw new RuntimeException(
- "This index type is not supported
to delete: "
- + f.indexType());
- }
- });
+
+ // todo: split them
+ List<IndexFileMeta> newIndexFiles =
+ new
ArrayList<>(commitMessage.newFilesIncrement().newIndexFiles());
+
newIndexFiles.addAll(commitMessage.compactIncrement().newIndexFiles());
+ newIndexFiles.forEach(
+ f -> {
+ switch (f.indexType()) {
+ case HASH_INDEX:
+ appendHashIndexFiles.add(
+ new IndexManifestEntry(
+ FileKind.ADD,
+ commitMessage.partition(),
+ commitMessage.bucket(),
+ f));
+ break;
+ case DELETION_VECTORS_INDEX:
+ compactDvIndexFiles.add(
+ new IndexManifestEntry(
+ FileKind.ADD,
+ commitMessage.partition(),
+ commitMessage.bucket(),
+ f));
+ break;
+ default:
+ throw new RuntimeException("Unknown index
type: " + f.indexType());
+ }
+ });
+
+ // todo: split them
+ List<IndexFileMeta> deletedIndexFiles =
+ new
ArrayList<>(commitMessage.newFilesIncrement().deletedIndexFiles());
+
deletedIndexFiles.addAll(commitMessage.compactIncrement().deletedIndexFiles());
+ deletedIndexFiles.forEach(
+ f -> {
+ if (f.indexType().equals(DELETION_VECTORS_INDEX)) {
+ compactDvIndexFiles.add(
+ new IndexManifestEntry(
+ FileKind.DELETE,
+ commitMessage.partition(),
+ commitMessage.bucket(),
+ f));
+ } else {
+ throw new RuntimeException(
+ "This index type is not supported to
delete: " + f.indexType());
+ }
+ });
}
if (!commitMessages.isEmpty()) {
List<String> msg = new ArrayList<>();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
index 5831066816..8e715462f5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java
@@ -18,20 +18,17 @@
package org.apache.paimon.table.sink;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputViewStreamWrapper;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
-import org.apache.paimon.io.IndexIncrement;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.util.Collections;
import java.util.Objects;
import static org.apache.paimon.utils.SerializationUtils.deserializedBytes;
@@ -50,37 +47,18 @@ public class CommitMessageImpl implements CommitMessage {
private transient @Nullable Integer totalBuckets;
private transient DataIncrement dataIncrement;
private transient CompactIncrement compactIncrement;
- private transient IndexIncrement indexIncrement;
- @VisibleForTesting
public CommitMessageImpl(
BinaryRow partition,
int bucket,
@Nullable Integer totalBuckets,
DataIncrement dataIncrement,
CompactIncrement compactIncrement) {
- this(
- partition,
- bucket,
- totalBuckets,
- dataIncrement,
- compactIncrement,
- new IndexIncrement(Collections.emptyList()));
- }
-
- public CommitMessageImpl(
- BinaryRow partition,
- int bucket,
- @Nullable Integer totalBuckets,
- DataIncrement dataIncrement,
- CompactIncrement compactIncrement,
- IndexIncrement indexIncrement) {
this.partition = partition;
this.bucket = bucket;
this.totalBuckets = totalBuckets;
this.dataIncrement = dataIncrement;
this.compactIncrement = compactIncrement;
- this.indexIncrement = indexIncrement;
}
@Override
@@ -106,12 +84,8 @@ public class CommitMessageImpl implements CommitMessage {
return compactIncrement;
}
- public IndexIncrement indexIncrement() {
- return indexIncrement;
- }
-
public boolean isEmpty() {
- return dataIncrement.isEmpty() && compactIncrement.isEmpty() &&
indexIncrement.isEmpty();
+ return dataIncrement.isEmpty() && compactIncrement.isEmpty();
}
private void writeObject(ObjectOutputStream out) throws IOException {
@@ -131,7 +105,6 @@ public class CommitMessageImpl implements CommitMessage {
this.totalBuckets = message.totalBuckets;
this.dataIncrement = message.dataIncrement;
this.compactIncrement = message.compactIncrement;
- this.indexIncrement = message.indexIncrement;
}
@Override
@@ -148,14 +121,12 @@ public class CommitMessageImpl implements CommitMessage {
&& Objects.equals(partition, that.partition)
&& Objects.equals(totalBuckets, that.totalBuckets)
&& Objects.equals(dataIncrement, that.dataIncrement)
- && Objects.equals(compactIncrement, that.compactIncrement)
- && Objects.equals(indexIncrement, that.indexIncrement);
+ && Objects.equals(compactIncrement, that.compactIncrement);
}
@Override
public int hashCode() {
- return Objects.hash(
- partition, bucket, totalBuckets, dataIncrement,
compactIncrement, indexIncrement);
+ return Objects.hash(partition, bucket, totalBuckets, dataIncrement,
compactIncrement);
}
@Override
@@ -166,8 +137,7 @@ public class CommitMessageImpl implements CommitMessage {
+ "bucket = %d, "
+ "totalBuckets = %s, "
+ "newFilesIncrement = %s, "
- + "compactIncrement = %s, "
- + "indexIncrement = %s}",
- partition, bucket, totalBuckets, dataIncrement,
compactIncrement, indexIncrement);
+ + "compactIncrement = %s}",
+ partition, bucket, totalBuckets, dataIncrement,
compactIncrement);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
index 88e9e3513d..5c3976af81 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -26,7 +27,6 @@ import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputView;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
@@ -64,23 +64,28 @@ public class CommitMessageLegacyV2Serializer {
}
public CommitMessage deserialize(DataInputView view) throws IOException {
+ BinaryRow partition = deserializeBinaryRow(view);
+ int bucket = view.readInt();
if (dataFileSerializer == null) {
dataFileSerializer = new DataFileMetaLegacyV2Serializer();
indexEntrySerializer = new IndexFileMetaLegacyV2Serializer();
}
- return new CommitMessageImpl(
- deserializeBinaryRow(view),
- view.readInt(),
- null,
+ DataIncrement dataIncrement =
new DataIncrement(
dataFileSerializer.deserializeList(view),
Collections.emptyList(),
- dataFileSerializer.deserializeList(view)),
+ dataFileSerializer.deserializeList(view));
+ CompactIncrement compactIncrement =
new CompactIncrement(
dataFileSerializer.deserializeList(view),
dataFileSerializer.deserializeList(view),
- dataFileSerializer.deserializeList(view)),
- new
IndexIncrement(indexEntrySerializer.deserializeList(view)));
+ dataFileSerializer.deserializeList(view));
+ if (compactIncrement.isEmpty()) {
+
dataIncrement.newIndexFiles().addAll(indexEntrySerializer.deserializeList(view));
+ } else {
+
compactIncrement.newIndexFiles().addAll(indexEntrySerializer.deserializeList(view));
+ }
+ return new CommitMessageImpl(partition, bucket, null, dataIncrement,
compactIncrement);
}
private static RowType legacyDataFileSchema() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 529b0cb369..0a34434028 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.serializer.VersionedSerializer;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.index.IndexFileMetaSerializer;
@@ -36,13 +37,11 @@ import org.apache.paimon.io.DataInputDeserializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.io.DataOutputViewStreamWrapper;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.utils.IOExceptionSupplier;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
@@ -51,7 +50,7 @@ import static
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
/** {@link VersionedSerializer} for {@link CommitMessage}. */
public class CommitMessageSerializer implements
VersionedSerializer<CommitMessage> {
- private static final int CURRENT_VERSION = 9;
+ public static final int CURRENT_VERSION = 10;
private final DataFileMetaSerializer dataFileSerializer;
private final IndexFileMetaSerializer indexEntrySerializer;
@@ -103,14 +102,19 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
view.writeBoolean(false);
}
+ // data increment
dataFileSerializer.serializeList(message.newFilesIncrement().newFiles(), view);
dataFileSerializer.serializeList(message.newFilesIncrement().deletedFiles(),
view);
dataFileSerializer.serializeList(message.newFilesIncrement().changelogFiles(),
view);
+
indexEntrySerializer.serializeList(message.newFilesIncrement().newIndexFiles(),
view);
+
indexEntrySerializer.serializeList(message.newFilesIncrement().deletedIndexFiles(),
view);
+
+ // compact increment
dataFileSerializer.serializeList(message.compactIncrement().compactBefore(),
view);
dataFileSerializer.serializeList(message.compactIncrement().compactAfter(),
view);
dataFileSerializer.serializeList(message.compactIncrement().changelogFiles(),
view);
-
indexEntrySerializer.serializeList(message.indexIncrement().newIndexFiles(),
view);
-
indexEntrySerializer.serializeList(message.indexIncrement().deletedIndexFiles(),
view);
+
indexEntrySerializer.serializeList(message.compactIncrement().newIndexFiles(),
view);
+
indexEntrySerializer.serializeList(message.compactIncrement().deletedIndexFiles(),
view);
}
@Override
@@ -132,18 +136,48 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
IOExceptionSupplier<List<DataFileMeta>> fileDeserializer =
fileDeserializer(version, view);
IOExceptionSupplier<List<IndexFileMeta>> indexEntryDeserializer =
indexEntryDeserializer(version, view);
-
- return new CommitMessageImpl(
- deserializeBinaryRow(view),
- view.readInt(),
- version >= 7 && view.readBoolean() ? view.readInt() : null,
- new DataIncrement(
- fileDeserializer.get(), fileDeserializer.get(),
fileDeserializer.get()),
- new CompactIncrement(
- fileDeserializer.get(), fileDeserializer.get(),
fileDeserializer.get()),
- new IndexIncrement(
- indexEntryDeserializer.get(),
- version <= 2 ? Collections.emptyList() :
indexEntryDeserializer.get()));
+ if (version >= 10) {
+ return new CommitMessageImpl(
+ deserializeBinaryRow(view),
+ view.readInt(),
+ view.readBoolean() ? view.readInt() : null,
+ new DataIncrement(
+ fileDeserializer.get(),
+ fileDeserializer.get(),
+ fileDeserializer.get(),
+ indexEntryDeserializer.get(),
+ indexEntryDeserializer.get()),
+ new CompactIncrement(
+ fileDeserializer.get(),
+ fileDeserializer.get(),
+ fileDeserializer.get(),
+ indexEntryDeserializer.get(),
+ indexEntryDeserializer.get()));
+ } else {
+ BinaryRow partition = deserializeBinaryRow(view);
+ int bucket = view.readInt();
+ Integer totalBuckets = version >= 7 && view.readBoolean() ?
view.readInt() : null;
+ DataIncrement dataIncrement =
+ new DataIncrement(
+ fileDeserializer.get(), fileDeserializer.get(),
fileDeserializer.get());
+ CompactIncrement compactIncrement =
+ new CompactIncrement(
+ fileDeserializer.get(), fileDeserializer.get(),
fileDeserializer.get());
+ if (compactIncrement.isEmpty()) {
+
dataIncrement.newIndexFiles().addAll(indexEntryDeserializer.get());
+ } else {
+
compactIncrement.newIndexFiles().addAll(indexEntryDeserializer.get());
+ }
+ if (version > 2) {
+ if (compactIncrement.isEmpty()) {
+
dataIncrement.deletedIndexFiles().addAll(indexEntryDeserializer.get());
+ } else {
+
compactIncrement.deletedIndexFiles().addAll(indexEntryDeserializer.get());
+ }
+ }
+ return new CommitMessageImpl(
+ partition, bucket, totalBuckets, dataIncrement,
compactIncrement);
+ }
}
private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 95b2df34f5..e549bd2e2c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -290,12 +290,18 @@ public class TableCommitImpl implements InnerTableCommit {
Consumer<DataFileMeta> collector = f ->
files.addAll(f.collectFiles(pathFactory));
msg.newFilesIncrement().newFiles().forEach(collector);
msg.newFilesIncrement().changelogFiles().forEach(collector);
+ msg.newFilesIncrement().newIndexFiles().stream()
+ .map(indexFileFactory::toPath)
+ .forEach(files::add);
+ msg.newFilesIncrement().deletedIndexFiles().stream()
+ .map(indexFileFactory::toPath)
+ .forEach(files::add);
msg.compactIncrement().compactBefore().forEach(collector);
msg.compactIncrement().compactAfter().forEach(collector);
- msg.indexIncrement().newIndexFiles().stream()
+ msg.compactIncrement().newIndexFiles().stream()
.map(indexFileFactory::toPath)
.forEach(files::add);
- msg.indexIncrement().deletedIndexFiles().stream()
+ msg.compactIncrement().deletedIndexFiles().stream()
.map(indexFileFactory::toPath)
.forEach(files::add);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 60a00ab567..3ca33a8222 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -30,7 +30,6 @@ import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.operation.FileStoreCommitImpl;
import org.apache.paimon.schema.Schema;
@@ -115,9 +114,13 @@ public class TestAppendFileStore extends
AppendOnlyFileStore {
partition,
bucket,
options().bucket(),
- DataIncrement.emptyIncrement(),
- CompactIncrement.emptyIncrement(),
- new IndexIncrement(Collections.emptyList(), indexFileMetas));
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ indexFileMetas),
+ CompactIncrement.emptyIncrement());
}
public List<IndexFileMeta> scanDVIndexFiles(BinaryRow partition, int
bucket) {
@@ -153,9 +156,13 @@ public class TestAppendFileStore extends
AppendOnlyFileStore {
partition,
bucket,
options().bucket(),
- DataIncrement.emptyIncrement(),
- CompactIncrement.emptyIncrement(),
- new IndexIncrement(indexFiles));
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ indexFiles,
+ Collections.emptyList()),
+ CompactIncrement.emptyIncrement());
}
public static TestAppendFileStore createAppendStore(
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 8e4cafc7de..dc1702c264 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -26,7 +26,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.FileSource;
@@ -346,14 +346,15 @@ public class TestFileStore extends KeyValueFileStore {
entryWithPartition.getValue().entrySet()) {
CommitIncrement increment =
entryWithBucket.getValue().prepareCommit(ignorePreviousFiles);
+ DataIncrement dataIncrement = increment.newFilesIncrement();
+ dataIncrement.newIndexFiles().addAll(indexFiles);
committable.addFileCommittable(
new CommitMessageImpl(
entryWithPartition.getKey(),
entryWithBucket.getKey(),
options().bucket(),
- increment.newFilesIncrement(),
- increment.compactIncrement(),
- new IndexIncrement(indexFiles)));
+ dataIncrement,
+ increment.compactIncrement()));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java
index e03dd2cee2..0e95135873 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java
@@ -31,7 +31,6 @@ import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.RawFileSplitRead;
@@ -105,12 +104,11 @@ public class AppendCompactTaskTest {
CommitMessageImpl compactMessage = (CommitMessageImpl)
compactTask.doCompact(table, write);
- IndexIncrement indexIncrement = compactMessage.indexIncrement();
- assertThat(indexIncrement.deletedIndexFiles()).isNotEmpty();
+
assertThat(compactMessage.compactIncrement().deletedIndexFiles()).isNotEmpty();
if (compactBeforeAllFiles) {
- assertThat(indexIncrement.newIndexFiles()).isEmpty();
+
assertThat(compactMessage.compactIncrement().newIndexFiles()).isEmpty();
} else {
- assertThat(indexIncrement.newIndexFiles()).isNotEmpty();
+
assertThat(compactMessage.compactIncrement().newIndexFiles()).isNotEmpty();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
index c195517731..10c6f45200 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
@@ -27,7 +27,6 @@ import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
@@ -104,9 +103,13 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
EMPTY_ROW,
0,
1,
- DataIncrement.emptyIncrement(),
- CompactIncrement.emptyIncrement(),
- new IndexIncrement(Collections.singletonList(file)));
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(file),
+ Collections.emptyList()),
+ CompactIncrement.emptyIncrement());
BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
commit.commit(Collections.singletonList(commitMessage));
@@ -127,9 +130,13 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
EMPTY_ROW,
0,
1,
- DataIncrement.emptyIncrement(),
- CompactIncrement.emptyIncrement(),
- new IndexIncrement(Collections.singletonList(file)));
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(file),
+ Collections.emptyList()),
+ CompactIncrement.emptyIncrement());
commit = table.newBatchWriteBuilder().newCommit();
commit.commit(Collections.singletonList(commitMessage));
@@ -202,8 +209,12 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
0,
1,
DataIncrement.emptyIncrement(),
- CompactIncrement.emptyIncrement(),
- new IndexIncrement(Collections.singletonList(file)));
+ new CompactIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(file),
+ Collections.emptyList()));
BatchTableCommit commit1 = table.newBatchWriteBuilder().newCommit();
commit1.commit(Collections.singletonList(commitMessage1));
@@ -235,8 +246,12 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
0,
1,
DataIncrement.emptyIncrement(),
- CompactIncrement.emptyIncrement(),
- new IndexIncrement(Collections.singletonList(file)));
+ new CompactIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(file),
+ Collections.emptyList()));
BatchTableCommit commit2 = table.newBatchWriteBuilder().newCommit();
commit2.commit(Collections.singletonList(commitMessage2));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
index 3cd9308d79..3a03985c88 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
@@ -71,10 +71,10 @@ class AppendDeletionFileMaintainerTest {
Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>();
dataFileToDeletionFiles.putAll(
createDeletionFileMapFromIndexFileMetas(
- indexPathFactory,
commitMessage1.indexIncrement().newIndexFiles()));
+ indexPathFactory,
commitMessage1.newFilesIncrement().newIndexFiles()));
dataFileToDeletionFiles.putAll(
createDeletionFileMapFromIndexFileMetas(
- indexPathFactory,
commitMessage2.indexIncrement().newIndexFiles()));
+ indexPathFactory,
commitMessage2.newFilesIncrement().newIndexFiles()));
AppendDeleteFileMaintainer dvIFMaintainer =
store.createDVIFMaintainer(BinaryRow.EMPTY_ROW,
dataFileToDeletionFiles);
@@ -108,7 +108,7 @@ class AppendDeletionFileMaintainerTest {
.findAny()
.get();
assertThat(entry.indexFile())
-
.isEqualTo(commitMessage1.indexIncrement().newIndexFiles().get(0));
+
.isEqualTo(commitMessage1.newFilesIncrement().newIndexFiles().get(0));
entry =
res.stream()
.filter(file -> file.kind() == FileKind.DELETE)
@@ -116,7 +116,7 @@ class AppendDeletionFileMaintainerTest {
.findAny()
.get();
assertThat(entry.indexFile())
-
.isEqualTo(commitMessage2.indexIncrement().newIndexFiles().get(0));
+
.isEqualTo(commitMessage2.newFilesIncrement().newIndexFiles().get(0));
}
private Map<String, DeletionFile> createDeletionFileMapFromIndexFileMetas(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
index 6f5e0d2ecd..b4f5363a06 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
@@ -75,7 +75,7 @@ public class DynamicBucketIndexMaintainerTest extends
PrimaryKeyTableTestBase {
Map<BinaryRow, Map<Integer, int[]>> index = new HashMap<>();
for (CommitMessage commitMessage : messages) {
CommitMessageImpl message = (CommitMessageImpl) commitMessage;
- List<IndexFileMeta> files =
message.indexIncrement().newIndexFiles();
+ List<IndexFileMeta> files =
message.newFilesIncrement().newIndexFiles();
if (files.isEmpty()) {
continue;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index 01eca82c2e..4c5415d45d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -22,7 +22,6 @@ import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableCommit;
@@ -216,10 +215,13 @@ public class HashBucketAssignerTest extends
PrimaryKeyTableTestBase {
bucket,
totalBuckets,
new DataIncrement(
- Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.singletonList(file),
+ Collections.emptyList()),
new CompactIncrement(
- Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()),
- new IndexIncrement(Collections.singletonList(file)));
+ Collections.emptyList(), Collections.emptyList(),
Collections.emptyList()));
}
@Test
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index 927da8e97d..ccc8f6c57e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -24,7 +24,6 @@ import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.IOUtils;
@@ -45,6 +44,100 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Compatibility Test for {@link ManifestCommittableSerializer}. */
public class ManifestCommittableSerializerCompatibilityTest {
+ @Test
+ public void testCompatibilityToV4CommitV10() throws IOException {
+ String fileName = "manifest-committable-v10";
+
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+ DataFileMeta dataFile =
+ DataFileMeta.create(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ FileSource.COMPACT,
+ Arrays.asList("field1", "field2", "field3"),
+ "hdfs://localhost:9000/path/to/file",
+ 1L,
+ Arrays.asList("asdf", "qwer", "zxcv"));
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+ IndexFileMeta hashIndexFile =
+ new IndexFileMeta("my_index_type", "my_index_file", 1024 *
100, 1002, null, null);
+
+ LinkedHashMap<String, DeletionVectorMeta> dvRanges = new
LinkedHashMap<>();
+ dvRanges.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L));
+ dvRanges.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L));
+ IndexFileMeta devIndexFile =
+ new IndexFileMeta(
+ "my_index_type",
+ "my_index_file",
+ 1024 * 100,
+ 1002,
+ dvRanges,
+ "external_path");
+
+ CommitMessageImpl commitMessage =
+ new CommitMessageImpl(
+ singleColumn("my_partition"),
+ 11,
+ 16,
+ new DataIncrement(
+ dataFiles,
+ dataFiles,
+ dataFiles,
+ Collections.singletonList(hashIndexFile),
+ Collections.singletonList(hashIndexFile)),
+ new CompactIncrement(
+ dataFiles,
+ dataFiles,
+ dataFiles,
+ Collections.singletonList(devIndexFile),
+ Collections.emptyList()));
+
+ ManifestCommittable manifestCommittable =
+ new ManifestCommittable(
+ 5,
+ 202020L,
+ Collections.singletonMap(5, 555L),
+ Collections.singletonList(commitMessage));
+ manifestCommittable.addProperty("k1", "v1");
+ manifestCommittable.addProperty("k2", "v2");
+
+ ManifestCommittableSerializer serializer = new
ManifestCommittableSerializer();
+ byte[] bytes = serializer.serialize(manifestCommittable);
+ ManifestCommittable deserialized =
serializer.deserialize(serializer.getVersion(), bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+
+ byte[] oldBytes =
+ IOUtils.readFully(
+ ManifestCommittableSerializerCompatibilityTest.class
+ .getClassLoader()
+ .getResourceAsStream("compatibility/" +
fileName),
+ true);
+ deserialized = serializer.deserialize(4, oldBytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+ }
+
@Test
public void testCompatibilityToV4CommitV9() throws IOException {
SimpleStats keyStats =
@@ -100,8 +193,12 @@ public class
ManifestCommittableSerializerCompatibilityTest {
11,
16,
new DataIncrement(dataFiles, dataFiles, dataFiles),
- new CompactIncrement(dataFiles, dataFiles, dataFiles),
- new IndexIncrement(indexFiles));
+ new CompactIncrement(
+ dataFiles,
+ dataFiles,
+ dataFiles,
+ indexFiles,
+ Collections.emptyList()));
ManifestCommittable manifestCommittable =
new ManifestCommittable(
@@ -178,8 +275,12 @@ public class
ManifestCommittableSerializerCompatibilityTest {
11,
16,
new DataIncrement(dataFiles, dataFiles, dataFiles),
- new CompactIncrement(dataFiles, dataFiles, dataFiles),
- new IndexIncrement(indexFiles));
+ new CompactIncrement(
+ dataFiles,
+ dataFiles,
+ dataFiles,
+ indexFiles,
+ Collections.emptyList()));
ManifestCommittable manifestCommittable =
new ManifestCommittable(
@@ -256,8 +357,12 @@ public class
ManifestCommittableSerializerCompatibilityTest {
11,
16,
new DataIncrement(dataFiles, dataFiles, dataFiles),
- new CompactIncrement(dataFiles, dataFiles, dataFiles),
- new IndexIncrement(indexFiles));
+ new CompactIncrement(
+ dataFiles,
+ dataFiles,
+ dataFiles,
+ indexFiles,
+ Collections.emptyList()));
ManifestCommittable manifestCommittable =
new ManifestCommittable(
@@ -333,8 +438,12 @@ public class
ManifestCommittableSerializerCompatibilityTest {
11,
16,
new DataIncrement(dataFiles, dataFiles, dataFiles),
- new CompactIncrement(dataFiles, dataFiles, dataFiles),
- new IndexIncrement(indexFiles));
+ new CompactIncrement(
+ dataFiles,
+ dataFiles,
+ dataFiles,
+ indexFiles,
+ Collections.emptyList()));
ManifestCommittable manifestCommittable =
new ManifestCommittable(
@@ -408,8 +517,12 @@ public class
ManifestCommittableSerializerCompatibilityTest {
11,
null,
new DataIncrement(dataFiles, dataFiles, dataFiles),
- new CompactIncrement(dataFiles, dataFiles, dataFiles),
- new IndexIncrement(indexFiles));
+ new CompactIncrement(
+ dataFiles,
+ dataFiles,
+ dataFiles,
+ indexFiles,
+ Collections.emptyList()));
ManifestCommittable manifestCommittable =
new ManifestCommittable(
@@ -483,8 +596,12 @@ public class
ManifestCommittableSerializerCompatibilityTest {
11,
null,
new DataIncrement(dataFiles, dataFiles, dataFiles),
- new CompactIncrement(dataFiles, dataFiles, dataFiles),
- new IndexIncrement(indexFiles));
+ new CompactIncrement(
+ dataFiles,
+ dataFiles,
+ dataFiles,
+ indexFiles,
+ Collections.emptyList()));
ManifestCommittable manifestCommittable =
new ManifestCommittable(
@@ -557,8 +674,12 @@ public class
ManifestCommittableSerializerCompatibilityTest {
11,
null,
new DataIncrement(dataFiles, dataFiles, dataFiles),
- new CompactIncrement(dataFiles, dataFiles, dataFiles),
- new IndexIncrement(indexFiles));
+ new CompactIncrement(
+ dataFiles,
+ dataFiles,
+ dataFiles,
+ indexFiles,
+ Collections.emptyList()));
ManifestCommittable manifestCommittable =
new ManifestCommittable(
@@ -632,8 +753,12 @@ public class
ManifestCommittableSerializerCompatibilityTest {
11,
null,
new DataIncrement(dataFiles, dataFiles, dataFiles),
- new CompactIncrement(dataFiles, dataFiles, dataFiles),
- new IndexIncrement(indexFiles));
+ new CompactIncrement(
+ dataFiles,
+ dataFiles,
+ dataFiles,
+ indexFiles,
+ Collections.emptyList()));
ManifestCommittable manifestCommittable =
new ManifestCommittable(
@@ -707,8 +832,12 @@ public class
ManifestCommittableSerializerCompatibilityTest {
11,
null,
new DataIncrement(dataFiles, dataFiles, dataFiles),
- new CompactIncrement(dataFiles, dataFiles, dataFiles),
- new IndexIncrement(indexFiles));
+ new CompactIncrement(
+ dataFiles,
+ dataFiles,
+ dataFiles,
+ indexFiles,
+ Collections.emptyList()));
ManifestCommittable manifestCommittable =
new ManifestCommittable(
@@ -778,8 +907,12 @@ public class
ManifestCommittableSerializerCompatibilityTest {
11,
null,
new DataIncrement(dataFiles, Collections.emptyList(),
dataFiles),
- new CompactIncrement(dataFiles, dataFiles, dataFiles),
- new IndexIncrement(indexFiles));
+ new CompactIncrement(
+ dataFiles,
+ dataFiles,
+ dataFiles,
+ indexFiles,
+ Collections.emptyList()));
ManifestCommittable manifestCommittable =
new ManifestCommittable(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 47a876f345..35843e0c2f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -914,8 +914,8 @@ public class FileStoreCommitTest {
store.writeDVIndexFiles(
BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2",
Arrays.asList(3)));
List<IndexFileMeta> deleted =
- new
ArrayList<>(commitMessage1.indexIncrement().newIndexFiles());
- deleted.addAll(commitMessage2.indexIncrement().newIndexFiles());
+ new
ArrayList<>(commitMessage1.newFilesIncrement().newIndexFiles());
+ deleted.addAll(commitMessage2.newFilesIncrement().newIndexFiles());
CommitMessage commitMessage4 =
store.removeIndexFiles(BinaryRow.EMPTY_ROW, 0, deleted);
store.commit(commitMessage3, commitMessage4);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
index 29e71e2b44..0767200bb1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java
@@ -61,7 +61,7 @@ public class DynamicBucketTableTest extends TableTestBase {
batchTableWrite.write(rowWithBucket.getKey(),
rowWithBucket.getValue());
assertThat(
((CommitMessageImpl)
batchTableWrite.prepareCommit().get(0))
- .indexIncrement()
+ .newFilesIncrement()
.newIndexFiles()
.get(0)
.rowCount())
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
index 351af6a3b7..c4f519e84e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
import org.junit.jupiter.api.Test;
@@ -41,22 +40,31 @@ public class CommitMessageSerializerTest {
CommitMessageSerializer serializer = new CommitMessageSerializer();
DataIncrement dataIncrement = randomNewFilesIncrement();
+ dataIncrement.newIndexFiles().addAll(Arrays.asList(randomIndexFile(),
randomIndexFile()));
+ dataIncrement
+ .deletedIndexFiles()
+ .addAll(Arrays.asList(randomIndexFile(), randomIndexFile()));
+
CompactIncrement compactIncrement = randomCompactIncrement();
- IndexIncrement indexIncrement =
- new IndexIncrement(
- Arrays.asList(randomIndexFile(), randomIndexFile()),
- Arrays.asList(randomIndexFile(), randomIndexFile()));
+ compactIncrement
+ .newIndexFiles()
+ .addAll(Arrays.asList(randomIndexFile(), randomIndexFile()));
+ compactIncrement
+ .deletedIndexFiles()
+ .addAll(Arrays.asList(randomIndexFile(), randomIndexFile()));
+
CommitMessageImpl committable =
- new CommitMessageImpl(
- row(0), 1, 2, dataIncrement, compactIncrement,
indexIncrement);
+ new CommitMessageImpl(row(0), 1, 2, dataIncrement,
compactIncrement);
CommitMessageImpl newCommittable =
- (CommitMessageImpl) serializer.deserialize(7,
serializer.serialize(committable));
+ (CommitMessageImpl)
+ serializer.deserialize(
+ CommitMessageSerializer.CURRENT_VERSION,
+ serializer.serialize(committable));
assertThat(newCommittable.partition()).isEqualTo(committable.partition());
assertThat(newCommittable.bucket()).isEqualTo(committable.bucket());
assertThat(newCommittable.totalBuckets()).isEqualTo(committable.totalBuckets());
assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement());
assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement());
-
assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/CompatibilityUtils.java
b/paimon-core/src/test/java/org/apache/paimon/utils/CompatibilityUtils.java
new file mode 100644
index 0000000000..67b8a3d511
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/CompatibilityUtils.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/** Utils for compatibility. */
+public class CompatibilityUtils {
+
+ public static final String COMPATIBILITY_FILE_DIR =
"/src/test/resources/compatibility/";
+
+ /** Write compatibility file. */
+ public static void writeCompatibilityFile(String fileName, byte[] data)
throws IOException {
+ File file = new File(System.getProperty("user.dir") +
COMPATIBILITY_FILE_DIR + fileName);
+ try (FileOutputStream fos = new FileOutputStream(file)) {
+ fos.write(data);
+ }
+ }
+}
diff --git
a/paimon-core/src/test/resources/compatibility/manifest-committable-v10
b/paimon-core/src/test/resources/compatibility/manifest-committable-v10
new file mode 100644
index 0000000000..0ae6e7bfe3
Binary files /dev/null and
b/paimon-core/src/test/resources/compatibility/manifest-committable-v10 differ
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperator.java
index 674dd59e7e..827d79ca5a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperator.java
@@ -128,9 +128,10 @@ public class AppendPreCommitCompactCoordinatorOperator
new DataIncrement(
skippedFiles,
message.newFilesIncrement().deletedFiles(),
- message.newFilesIncrement().changelogFiles()),
- message.compactIncrement(),
- message.indexIncrement());
+ message.newFilesIncrement().changelogFiles(),
+ message.newFilesIncrement().newIndexFiles(),
+
message.newFilesIncrement().deletedIndexFiles()),
+ message.compactIncrement());
if (!newMessage.isEmpty()) {
Committable newCommittable =
new Committable(committable.checkpointId(),
Committable.Kind.FILE, newMessage);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
index 0190279a8b..d336717031 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
@@ -138,12 +138,15 @@ public class ChangelogCompactCoordinateOperator
new DataIncrement(
message.newFilesIncrement().newFiles(),
message.newFilesIncrement().deletedFiles(),
- skippedNewChangelogs),
+ skippedNewChangelogs,
+ message.newFilesIncrement().newIndexFiles(),
+
message.newFilesIncrement().deletedIndexFiles()),
new CompactIncrement(
message.compactIncrement().compactBefore(),
message.compactIncrement().compactAfter(),
- skippedCompactChangelogs),
- message.indexIncrement());
+ skippedCompactChangelogs,
+ message.compactIncrement().newIndexFiles(),
+
message.compactIncrement().deletedIndexFiles()));
Committable newCommittable =
new Committable(committable.checkpointId(),
Committable.Kind.FILE, newMessage);
output.collect(new StreamRecord<>(Either.Left(newCommittable)));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
index 92e0110f27..e6183ccb8a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
@@ -23,7 +23,6 @@ import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -95,12 +94,15 @@ public class ChangelogCompactSortOperator extends
AbstractStreamOperator<Committ
new DataIncrement(
message.newFilesIncrement().newFiles(),
message.newFilesIncrement().deletedFiles(),
- Collections.emptyList()),
+ Collections.emptyList(),
+ message.newFilesIncrement().newIndexFiles(),
+
message.newFilesIncrement().deletedIndexFiles()),
new CompactIncrement(
message.compactIncrement().compactBefore(),
message.compactIncrement().compactAfter(),
- Collections.emptyList()),
- message.indexIncrement());
+ Collections.emptyList(),
+ message.compactIncrement().newIndexFiles(),
+
message.compactIncrement().deletedIndexFiles()));
if (!newMessage.isEmpty()) {
Committable newCommittable =
new Committable(committable.checkpointId(),
Committable.Kind.FILE, newMessage);
@@ -138,8 +140,8 @@ public class ChangelogCompactSortOperator extends
AbstractStreamOperator<Committ
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
-
sortedChangelogs(compactChangelogFiles, partition, bucket)),
- new IndexIncrement(Collections.emptyList()));
+ sortedChangelogs(
+ compactChangelogFiles,
partition, bucket)));
Committable newCommittable =
new Committable(checkpointId, Committable.Kind.FILE,
newMessage);
output.collect(new StreamRecord<>(newCommittable));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
index 2991fb4ecc..50eb5a0796 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java
@@ -27,7 +27,6 @@ import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -137,8 +136,8 @@ public class PostponeBucketCommittableRewriter {
changelogFiles.addAll(message.newFilesIncrement().changelogFiles());
changelogFiles.addAll(message.compactIncrement().changelogFiles());
- newIndexFiles.addAll(message.indexIncrement().newIndexFiles());
-
deletedIndexFiles.addAll(message.indexIncrement().deletedIndexFiles());
+ newIndexFiles.addAll(message.compactIncrement().newIndexFiles());
+
deletedIndexFiles.addAll(message.compactIncrement().deletedIndexFiles());
toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path));
}
@@ -151,8 +150,12 @@ public class PostponeBucketCommittableRewriter {
bucket,
totalBuckets,
DataIncrement.emptyIncrement(),
- new CompactIncrement(compactBefore, realCompactAfter,
changelogFiles),
- new IndexIncrement(newIndexFiles, deletedIndexFiles));
+ new CompactIncrement(
+ compactBefore,
+ realCompactAfter,
+ changelogFiles,
+ newIndexFiles,
+ deletedIndexFiles));
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
index 5dbd9f3a09..0283b197d2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java
@@ -147,9 +147,7 @@ public class PartitionMarkDoneListener implements
CommitListener {
for (ManifestCommittable committable : committables) {
for (CommitMessage commitMessage : committable.fileCommittables())
{
CommitMessageImpl message = (CommitMessageImpl) commitMessage;
- if (waitCompaction
- || !message.indexIncrement().isEmpty()
- || !message.newFilesIncrement().isEmpty()) {
+ if (waitCompaction || !message.newFilesIncrement().isEmpty()) {
partitions.add(message.partition());
}
}
@@ -199,9 +197,7 @@ public class PartitionMarkDoneListener implements
CommitListener {
if (watermark != null) {
for (CommitMessage commitMessage :
committable.fileCommittables()) {
CommitMessageImpl message = (CommitMessageImpl)
commitMessage;
- if (waitCompaction
- || !message.indexIncrement().isEmpty()
- || !message.newFilesIncrement().isEmpty()) {
+ if (waitCompaction ||
!message.newFilesIncrement().isEmpty()) {
partitionWatermarks.compute(
message.partition(),
(partition, old) ->
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
index 333f315df6..ad9e9355e1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java
@@ -190,7 +190,8 @@ public class AppendPreCommitCompactCoordinatorOperatorTest {
assertThat(message.newFilesIncrement().deletedFiles()).isEmpty();
assertThat(message.newFilesIncrement().changelogFiles()).isEmpty();
assertThat(message.compactIncrement().isEmpty()).isTrue();
- assertThat(message.indexIncrement().isEmpty()).isTrue();
+
assertThat(message.newFilesIncrement().newIndexFiles().isEmpty()).isTrue();
+
assertThat(message.newFilesIncrement().deletedIndexFiles().isEmpty()).isTrue();
assertThat(message.newFilesIncrement().newFiles().stream().map(DataFileMeta::fileSize))
.hasSameElementsAs(
Arrays.stream(mbs)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
index 109a3f984e..276bf45193 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -27,7 +27,6 @@ import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
@@ -596,8 +595,7 @@ public class WriterOperatorTest {
message.bucket(),
message.totalBuckets(),
message.newFilesIncrement(),
- CompactIncrement.emptyIncrement(),
- new IndexIncrement(Collections.emptyList()));
+ CompactIncrement.emptyIncrement());
commitMessages.add(newMessage);
}
commit.commit(commitIdentifier, commitMessages);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
index 3439415c2f..c873234105 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java
@@ -24,7 +24,6 @@ import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileTestUtils;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -67,8 +66,7 @@ class ListenerTestUtils {
0,
1,
new DataIncrement(emptyList(), emptyList(),
emptyList()),
- new CompactIncrement(singletonList(file),
emptyList(), emptyList()),
- new IndexIncrement(emptyList()));
+ new CompactIncrement(singletonList(file),
emptyList(), emptyList()));
} else {
compactMessage =
new CommitMessageImpl(
@@ -76,8 +74,7 @@ class ListenerTestUtils {
0,
1,
new DataIncrement(singletonList(file),
emptyList(), emptyList()),
- new CompactIncrement(emptyList(), emptyList(),
emptyList()),
- new IndexIncrement(emptyList()));
+ new CompactIncrement(emptyList(), emptyList(),
emptyList()));
}
committable.addFileCommittable(compactMessage);
if (partitionMarkDoneRecoverFromState) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
index 7aea754040..300ca20ff8 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
@@ -21,8 +21,7 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions
import org.apache.paimon.deletionvectors.{Bitmap64DeletionVector,
BitmapDeletionVector, DeletionVector}
import org.apache.paimon.fs.Path
-import org.apache.paimon.index.IndexFileMeta
-import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement,
IndexIncrement}
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
@@ -220,8 +219,7 @@ trait PaimonRowLevelCommand
new CompactIncrement(
Collections.emptyList[DataFileMeta],
Collections.emptyList[DataFileMeta],
- Collections.emptyList[DataFileMeta]),
- new IndexIncrement(Collections.emptyList[IndexFileMeta])
+ Collections.emptyList[DataFileMeta])
)
}
.toSeq
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 344cd0879b..70f4676cad 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -26,7 +26,7 @@ import org.apache.paimon.data.serializer.InternalSerializers
import org.apache.paimon.deletionvectors.DeletionVector
import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer
import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
-import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
+import org.apache.paimon.io.{CompactIncrement, DataIncrement}
import org.apache.paimon.manifest.FileKind
import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils}
import org.apache.paimon.spark.catalog.functions.BucketFunction
@@ -344,9 +344,13 @@ case class PaimonSparkWriter(table: FileStoreTable,
writeRowTracking: Boolean =
dvIndexFileMaintainer.getPartition,
dvIndexFileMaintainer.getBucket,
null,
- DataIncrement.emptyIncrement(),
- CompactIncrement.emptyIncrement(),
- new IndexIncrement(added.map(_.indexFile).asJava,
deleted.map(_.indexFile).asJava)
+ new DataIncrement(
+ java.util.Collections.emptyList(),
+ java.util.Collections.emptyList(),
+ java.util.Collections.emptyList(),
+ added.map(_.indexFile).asJava,
+ deleted.map(_.indexFile).asJava),
+ CompactIncrement.emptyIncrement()
)
val serializer = new CommitMessageSerializer
serializer.serialize(commitMessage)