This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 429827ea8631b25a224910760b99575b5dd0f0ba Author: Zouxxyy <[email protected]> AuthorDate: Fri Sep 19 14:36:07 2025 +0800 [core] Separate indexIncrement into dataIncrement and compactIncrement (#6285) --- .../apache/paimon/append/AppendCompactTask.java | 22 +-- .../org/apache/paimon/io/CompactIncrement.java | 42 ++++- .../java/org/apache/paimon/io/DataIncrement.java | 48 +++++- .../java/org/apache/paimon/io/IndexIncrement.java | 88 ----------- .../paimon/operation/AbstractFileStoreWrite.java | 20 ++- .../paimon/operation/FileStoreCommitImpl.java | 93 +++++------ .../paimon/table/sink/CommitMessageImpl.java | 40 +---- .../sink/CommitMessageLegacyV2Serializer.java | 21 ++- .../paimon/table/sink/CommitMessageSerializer.java | 68 ++++++-- .../apache/paimon/table/sink/TableCommitImpl.java | 10 +- .../org/apache/paimon/TestAppendFileStore.java | 21 ++- .../test/java/org/apache/paimon/TestFileStore.java | 9 +- .../paimon/append/AppendCompactTaskTest.java | 8 +- .../deletionvectors/BucketedDvMaintainerTest.java | 37 +++-- .../append/AppendDeletionFileMaintainerTest.java | 8 +- .../index/DynamicBucketIndexMaintainerTest.java | 2 +- .../paimon/index/HashBucketAssignerTest.java | 10 +- ...festCommittableSerializerCompatibilityTest.java | 175 ++++++++++++++++++--- .../paimon/operation/FileStoreCommitTest.java | 4 +- .../paimon/table/DynamicBucketTableTest.java | 2 +- .../table/sink/CommitMessageSerializerTest.java | 26 +-- .../apache/paimon/utils/CompatibilityUtils.java | 37 +++++ .../compatibility/manifest-committable-v10 | Bin 0 -> 3978 bytes .../AppendPreCommitCompactCoordinatorOperator.java | 7 +- .../ChangelogCompactCoordinateOperator.java | 9 +- .../changelog/ChangelogCompactSortOperator.java | 14 +- .../PostponeBucketCommittableRewriter.java | 13 +- .../sink/listener/PartitionMarkDoneListener.java | 8 +- ...endPreCommitCompactCoordinatorOperatorTest.java | 3 +- .../paimon/flink/sink/WriterOperatorTest.java | 4 +- .../flink/sink/listener/ListenerTestUtils.java | 7 +- .../spark/commands/PaimonRowLevelCommand.scala | 6 +- .../paimon/spark/commands/PaimonSparkWriter.scala | 12 +- 33 files changed, 541 insertions(+), 333 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java index b3b9a0cb17..42c4548842 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java @@ -25,7 +25,6 @@ import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.operation.BaseAppendFileStoreWrite; @@ -73,7 +72,10 @@ public class AppendCompactTask { Preconditions.checkArgument( dvEnabled || compactBefore.size() > 1, "AppendOnlyCompactionTask need more than one file input."); - IndexIncrement indexIncrement; + // If compact task didn't compact all files, the remain deletion files will be written into + // new deletion files. + List<IndexFileMeta> newIndexFiles = new ArrayList<>(); + List<IndexFileMeta> deletedIndexFiles = new ArrayList<>(); if (dvEnabled) { AppendDeleteFileMaintainer dvIndexFileMaintainer = BaseAppendDeleteFileMaintainer.forUnawareAppend( @@ -90,10 +92,6 @@ public class AppendCompactTask { compactBefore.forEach( f -> dvIndexFileMaintainer.notifyRemovedDeletionVector(f.fileName())); List<IndexManifestEntry> indexEntries = dvIndexFileMaintainer.persist(); - // If compact task didn't compact all files, the remain deletion files will be written - // into new deletion files. - List<IndexFileMeta> newIndexFiles = new ArrayList<>(); - List<IndexFileMeta> deletedIndexFiles = new ArrayList<>(); for (IndexManifestEntry entry : indexEntries) { if (entry.kind() == FileKind.ADD) { newIndexFiles.add(entry.indexFile()); @@ -101,15 +99,18 @@ public class AppendCompactTask { deletedIndexFiles.add(entry.indexFile()); } } - indexIncrement = new IndexIncrement(newIndexFiles, deletedIndexFiles); } else { compactAfter.addAll( write.compactRewrite(partition, UNAWARE_BUCKET, null, compactBefore)); - indexIncrement = new IndexIncrement(Collections.emptyList()); } CompactIncrement compactIncrement = - new CompactIncrement(compactBefore, compactAfter, Collections.emptyList()); + new CompactIncrement( + compactBefore, + compactAfter, + Collections.emptyList(), + newIndexFiles, + deletedIndexFiles); return new CommitMessageImpl( partition, // bucket 0 is bucket for unaware-bucket table @@ -117,8 +118,7 @@ public class AppendCompactTask { 0, table.coreOptions().bucket(), DataIncrement.emptyIncrement(), - compactIncrement, - indexIncrement); + compactIncrement); } public int hashCode() { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java index ad51642a2a..6454c99ef8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java @@ -18,6 +18,9 @@ package org.apache.paimon.io; +import org.apache.paimon.index.IndexFileMeta; + +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -29,14 +32,27 @@ public class CompactIncrement { private final List<DataFileMeta> compactBefore; private final List<DataFileMeta> compactAfter; private final List<DataFileMeta> changelogFiles; + private final List<IndexFileMeta> newIndexFiles; + private final List<IndexFileMeta> deletedIndexFiles; public CompactIncrement( List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter, List<DataFileMeta> changelogFiles) { + this(compactBefore, compactAfter, changelogFiles, new ArrayList<>(), new ArrayList<>()); + } + + public CompactIncrement( + List<DataFileMeta> compactBefore, + List<DataFileMeta> compactAfter, + List<DataFileMeta> changelogFiles, + List<IndexFileMeta> newIndexFiles, + List<IndexFileMeta> deletedIndexFiles) { this.compactBefore = compactBefore; this.compactAfter = compactAfter; this.changelogFiles = changelogFiles; + this.newIndexFiles = newIndexFiles; + this.deletedIndexFiles = deletedIndexFiles; } public List<DataFileMeta> compactBefore() { @@ -51,8 +67,20 @@ public class CompactIncrement { return changelogFiles; } + public List<IndexFileMeta> newIndexFiles() { + return newIndexFiles; + } + + public List<IndexFileMeta> deletedIndexFiles() { + return deletedIndexFiles; + } + public boolean isEmpty() { - return compactBefore.isEmpty() && compactAfter.isEmpty() && changelogFiles.isEmpty(); + return compactBefore.isEmpty() + && compactAfter.isEmpty() + && changelogFiles.isEmpty() + && newIndexFiles.isEmpty() + && deletedIndexFiles.isEmpty(); } @Override @@ -67,7 +95,9 @@ public class CompactIncrement { CompactIncrement that = (CompactIncrement) o; return Objects.equals(compactBefore, that.compactBefore) && Objects.equals(compactAfter, that.compactAfter) - && Objects.equals(changelogFiles, that.changelogFiles); + && Objects.equals(changelogFiles, that.changelogFiles) + && Objects.equals(newIndexFiles, that.newIndexFiles) + && Objects.equals(deletedIndexFiles, that.deletedIndexFiles); } @Override @@ -78,10 +108,14 @@ public class CompactIncrement { @Override public String toString() { return String.format( - "CompactIncrement {compactBefore = %s, compactAfter = %s, changelogFiles = %s}", + "CompactIncrement {compactBefore = %s, compactAfter = %s, changelogFiles = %s, newIndexFiles = %s, deletedIndexFiles = %s}", compactBefore.stream().map(DataFileMeta::fileName).collect(Collectors.toList()), compactAfter.stream().map(DataFileMeta::fileName).collect(Collectors.toList()), - changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList())); + changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()), + newIndexFiles.stream().map(IndexFileMeta::fileName).collect(Collectors.toList()), + deletedIndexFiles.stream() + .map(IndexFileMeta::fileName) + .collect(Collectors.toList())); } public static CompactIncrement emptyIncrement() { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java b/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java index b7860fb15e..6049c4dbbb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java @@ -18,25 +18,41 @@ package org.apache.paimon.io; +import org.apache.paimon.index.IndexFileMeta; + +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -/** Newly created data files and changelog files. */ +/** Increment of data files, changelog files and index files. */ public class DataIncrement { private final List<DataFileMeta> newFiles; private final List<DataFileMeta> deletedFiles; private final List<DataFileMeta> changelogFiles; + private final List<IndexFileMeta> newIndexFiles; + private final List<IndexFileMeta> deletedIndexFiles; public DataIncrement( List<DataFileMeta> newFiles, List<DataFileMeta> deletedFiles, List<DataFileMeta> changelogFiles) { + this(newFiles, deletedFiles, changelogFiles, new ArrayList<>(), new ArrayList<>()); + } + + public DataIncrement( + List<DataFileMeta> newFiles, + List<DataFileMeta> deletedFiles, + List<DataFileMeta> changelogFiles, + List<IndexFileMeta> newIndexFiles, + List<IndexFileMeta> deletedIndexFiles) { this.newFiles = newFiles; this.deletedFiles = deletedFiles; this.changelogFiles = changelogFiles; + this.newIndexFiles = newIndexFiles; + this.deletedIndexFiles = deletedIndexFiles; } public static DataIncrement emptyIncrement() { @@ -56,8 +72,20 @@ public class DataIncrement { return changelogFiles; } + public List<IndexFileMeta> newIndexFiles() { + return newIndexFiles; + } + + public List<IndexFileMeta> deletedIndexFiles() { + return deletedIndexFiles; + } + public boolean isEmpty() { - return newFiles.isEmpty() && changelogFiles.isEmpty(); + return newFiles.isEmpty() + && deletedFiles.isEmpty() + && changelogFiles.isEmpty() + && newIndexFiles.isEmpty() + && deletedIndexFiles.isEmpty(); } @Override @@ -71,20 +99,28 @@ public class DataIncrement { DataIncrement that = (DataIncrement) o; return Objects.equals(newFiles, that.newFiles) - && Objects.equals(changelogFiles, that.changelogFiles); + && Objects.equals(deletedFiles, that.deletedFiles) + && Objects.equals(changelogFiles, that.changelogFiles) + && Objects.equals(newIndexFiles, that.newIndexFiles) + && Objects.equals(deletedIndexFiles, that.deletedIndexFiles); } @Override public int hashCode() { - return Objects.hash(newFiles, changelogFiles); + return Objects.hash( + newFiles, deletedFiles, changelogFiles, newIndexFiles, deletedIndexFiles); } @Override public String toString() { return String.format( - "DataIncrement {newFiles = %s, deletedFiles = %s, changelogFiles = %s}", + "DataIncrement {newFiles = %s, deletedFiles = %s, changelogFiles = %s, newIndexFiles = %s, deletedIndexFiles = %s}", newFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()), deletedFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()), - changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList())); + changelogFiles.stream().map(DataFileMeta::fileName).collect(Collectors.toList()), + newIndexFiles.stream().map(IndexFileMeta::fileName).collect(Collectors.toList()), + deletedIndexFiles.stream() + .map(IndexFileMeta::fileName) + .collect(Collectors.toList())); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java deleted file mode 100644 index 9f985f54ed..0000000000 --- a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.io; - -import org.apache.paimon.index.IndexFileMeta; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; - -/** Incremental index files. */ -public class IndexIncrement { - - private final List<IndexFileMeta> newIndexFiles; - - private final List<IndexFileMeta> deletedIndexFiles; - - public IndexIncrement(List<IndexFileMeta> newIndexFiles) { - this.newIndexFiles = newIndexFiles; - this.deletedIndexFiles = Collections.emptyList(); - } - - public IndexIncrement( - List<IndexFileMeta> newIndexFiles, List<IndexFileMeta> deletedIndexFiles) { - this.newIndexFiles = newIndexFiles; - this.deletedIndexFiles = deletedIndexFiles; - } - - public List<IndexFileMeta> newIndexFiles() { - return newIndexFiles; - } - - public List<IndexFileMeta> deletedIndexFiles() { - return deletedIndexFiles; - } - - public boolean isEmpty() { - return newIndexFiles.isEmpty() && deletedIndexFiles.isEmpty(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - IndexIncrement that = (IndexIncrement) o; - return Objects.equals(newIndexFiles, that.newIndexFiles) - && Objects.equals(deletedIndexFiles, that.deletedIndexFiles); - } - - @Override - public int hashCode() { - List<IndexFileMeta> all = new ArrayList<>(newIndexFiles); - all.addAll(deletedIndexFiles); - return Objects.hash(all); - } - - @Override - public String toString() { - return String.format( - "IndexIncrement {newIndexFiles = %s, deletedIndexFiles = %s}", - newIndexFiles.stream().map(IndexFileMeta::fileName).collect(Collectors.toList()), - deletedIndexFiles.stream() - .map(IndexFileMeta::fileName) - .collect(Collectors.toList())); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 7b77fb179f..ddf2addf53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -28,9 +28,9 @@ import org.apache.paimon.deletionvectors.BucketedDvMaintainer; import org.apache.paimon.disk.IOManager; import org.apache.paimon.index.DynamicBucketIndexMaintainer; import org.apache.paimon.index.IndexFileHandler; -import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.io.DataIncrement; import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.metrics.CompactionMetrics; @@ -213,22 +213,26 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { WriterContainer<T> writerContainer = entry.getValue(); CommitIncrement increment = writerContainer.writer.prepareCommit(waitCompaction); - List<IndexFileMeta> newIndexFiles = new ArrayList<>(); + DataIncrement newFilesIncrement = increment.newFilesIncrement(); + CompactIncrement compactIncrement = increment.compactIncrement(); if (writerContainer.dynamicBucketMaintainer != null) { - newIndexFiles.addAll(writerContainer.dynamicBucketMaintainer.prepareCommit()); + newFilesIncrement + .newIndexFiles() + .addAll(writerContainer.dynamicBucketMaintainer.prepareCommit()); } CompactDeletionFile compactDeletionFile = increment.compactDeletionFile(); if (compactDeletionFile != null) { - compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add); + compactDeletionFile + .getOrCompute() + .ifPresent(compactIncrement.newIndexFiles()::add); } CommitMessageImpl committable = new CommitMessageImpl( partition, bucket, writerContainer.totalBuckets, - increment.newFilesIncrement(), - increment.compactIncrement(), - new IndexIncrement(newIndexFiles)); + newFilesIncrement, + compactIncrement); result.add(committable); if (committable.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 1c859fb693..a11339fe7d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -25,6 +25,7 @@ import org.apache.paimon.catalog.SnapshotCommit; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.FileEntry; @@ -701,51 +702,53 @@ public class FileStoreCommitImpl implements FileStoreCommit { .compactIncrement() .changelogFiles() .forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD, commitMessage, m))); - commitMessage - .indexIncrement() - .newIndexFiles() - .forEach( - f -> { - switch (f.indexType()) { - case HASH_INDEX: - appendHashIndexFiles.add( - new IndexManifestEntry( - FileKind.ADD, - commitMessage.partition(), - commitMessage.bucket(), - f)); - break; - case DELETION_VECTORS_INDEX: - compactDvIndexFiles.add( - new IndexManifestEntry( - FileKind.ADD, - commitMessage.partition(), - commitMessage.bucket(), - f)); - break; - default: - throw new RuntimeException( - "Unknown index type: " + f.indexType()); - } - }); - commitMessage - .indexIncrement() - .deletedIndexFiles() - .forEach( - f -> { - if (f.indexType().equals(DELETION_VECTORS_INDEX)) { - compactDvIndexFiles.add( - new IndexManifestEntry( - FileKind.DELETE, - commitMessage.partition(), - commitMessage.bucket(), - f)); - } else { - throw new RuntimeException( - "This index type is not supported to delete: " - + f.indexType()); - } - }); + + // todo: split them + List<IndexFileMeta> newIndexFiles = + new ArrayList<>(commitMessage.newFilesIncrement().newIndexFiles()); + newIndexFiles.addAll(commitMessage.compactIncrement().newIndexFiles()); + newIndexFiles.forEach( + f -> { + switch (f.indexType()) { + case HASH_INDEX: + appendHashIndexFiles.add( + new IndexManifestEntry( + FileKind.ADD, + commitMessage.partition(), + commitMessage.bucket(), + f)); + break; + case DELETION_VECTORS_INDEX: + compactDvIndexFiles.add( + new IndexManifestEntry( + FileKind.ADD, + commitMessage.partition(), + commitMessage.bucket(), + f)); + break; + default: + throw new RuntimeException("Unknown index type: " + f.indexType()); + } + }); + + // todo: split them + List<IndexFileMeta> deletedIndexFiles = + new ArrayList<>(commitMessage.newFilesIncrement().deletedIndexFiles()); + deletedIndexFiles.addAll(commitMessage.compactIncrement().deletedIndexFiles()); + deletedIndexFiles.forEach( + f -> { + if (f.indexType().equals(DELETION_VECTORS_INDEX)) { + compactDvIndexFiles.add( + new IndexManifestEntry( + FileKind.DELETE, + commitMessage.partition(), + commitMessage.bucket(), + f)); + } else { + throw new RuntimeException( + "This index type is not supported to delete: " + f.indexType()); + } + }); } if (!commitMessages.isEmpty()) { List<String> msg = new ArrayList<>(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java index 5831066816..8e715462f5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java @@ -18,20 +18,17 @@ package org.apache.paimon.table.sink; -import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.DataInputViewStreamWrapper; import org.apache.paimon.io.DataOutputViewStreamWrapper; -import org.apache.paimon.io.IndexIncrement; import javax.annotation.Nullable; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.util.Collections; import java.util.Objects; import static org.apache.paimon.utils.SerializationUtils.deserializedBytes; @@ -50,37 +47,18 @@ public class CommitMessageImpl implements CommitMessage { private transient @Nullable Integer totalBuckets; private transient DataIncrement dataIncrement; private transient CompactIncrement compactIncrement; - private transient IndexIncrement indexIncrement; - @VisibleForTesting public CommitMessageImpl( BinaryRow partition, int bucket, @Nullable Integer totalBuckets, DataIncrement dataIncrement, CompactIncrement compactIncrement) { - this( - partition, - bucket, - totalBuckets, - dataIncrement, - compactIncrement, - new IndexIncrement(Collections.emptyList())); - } - - public CommitMessageImpl( - BinaryRow partition, - int bucket, - @Nullable Integer totalBuckets, - DataIncrement dataIncrement, - CompactIncrement compactIncrement, - IndexIncrement indexIncrement) { this.partition = partition; this.bucket = bucket; this.totalBuckets = totalBuckets; this.dataIncrement = dataIncrement; this.compactIncrement = compactIncrement; - this.indexIncrement = indexIncrement; } @Override @@ -106,12 +84,8 @@ public class CommitMessageImpl implements CommitMessage { return compactIncrement; } - public IndexIncrement indexIncrement() { - return indexIncrement; - } - public boolean isEmpty() { - return dataIncrement.isEmpty() && compactIncrement.isEmpty() && indexIncrement.isEmpty(); + return dataIncrement.isEmpty() && compactIncrement.isEmpty(); } private void writeObject(ObjectOutputStream out) throws IOException { @@ -131,7 +105,6 @@ public class CommitMessageImpl implements CommitMessage { this.totalBuckets = message.totalBuckets; this.dataIncrement = message.dataIncrement; this.compactIncrement = message.compactIncrement; - this.indexIncrement = message.indexIncrement; } @Override @@ -148,14 +121,12 @@ public class CommitMessageImpl implements CommitMessage { && Objects.equals(partition, that.partition) && Objects.equals(totalBuckets, that.totalBuckets) && Objects.equals(dataIncrement, that.dataIncrement) - && Objects.equals(compactIncrement, that.compactIncrement) - && Objects.equals(indexIncrement, that.indexIncrement); + && Objects.equals(compactIncrement, that.compactIncrement); } @Override public int hashCode() { - return Objects.hash( - partition, bucket, totalBuckets, dataIncrement, compactIncrement, indexIncrement); + return Objects.hash(partition, bucket, totalBuckets, dataIncrement, compactIncrement); } @Override @@ -166,8 +137,7 @@ public class CommitMessageImpl implements CommitMessage { + "bucket = %d, " + "totalBuckets = %s, " + "newFilesIncrement = %s, " - + "compactIncrement = %s, " - + "indexIncrement = %s}", - partition, bucket, totalBuckets, dataIncrement, compactIncrement, indexIncrement); + + "compactIncrement = %s}", + partition, bucket, totalBuckets, dataIncrement, compactIncrement); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java index 88e9e3513d..5c3976af81 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageLegacyV2Serializer.java @@ -18,6 +18,7 @@ package org.apache.paimon.table.sink; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -26,7 +27,6 @@ import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.DataInputView; -import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.BigIntType; @@ -64,23 +64,28 @@ public class CommitMessageLegacyV2Serializer { } public CommitMessage deserialize(DataInputView view) throws IOException { + BinaryRow partition = deserializeBinaryRow(view); + int bucket = view.readInt(); if (dataFileSerializer == null) { dataFileSerializer = new DataFileMetaLegacyV2Serializer(); indexEntrySerializer = new IndexFileMetaLegacyV2Serializer(); } - return new CommitMessageImpl( - deserializeBinaryRow(view), - view.readInt(), - null, + DataIncrement dataIncrement = new DataIncrement( dataFileSerializer.deserializeList(view), Collections.emptyList(), - dataFileSerializer.deserializeList(view)), + dataFileSerializer.deserializeList(view)); + CompactIncrement compactIncrement = new CompactIncrement( dataFileSerializer.deserializeList(view), dataFileSerializer.deserializeList(view), - dataFileSerializer.deserializeList(view)), - new IndexIncrement(indexEntrySerializer.deserializeList(view))); + dataFileSerializer.deserializeList(view)); + if (compactIncrement.isEmpty()) { + dataIncrement.newIndexFiles().addAll(indexEntrySerializer.deserializeList(view)); + } else { + compactIncrement.newIndexFiles().addAll(indexEntrySerializer.deserializeList(view)); + } + return new CommitMessageImpl(partition, bucket, null, dataIncrement, compactIncrement); } private static RowType legacyDataFileSchema() { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index 529b0cb369..0a34434028 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -18,6 +18,7 @@ package org.apache.paimon.table.sink; +import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.serializer.VersionedSerializer; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.index.IndexFileMetaSerializer; @@ -36,13 +37,11 @@ import org.apache.paimon.io.DataInputDeserializer; import org.apache.paimon.io.DataInputView; import org.apache.paimon.io.DataOutputView; import org.apache.paimon.io.DataOutputViewStreamWrapper; -import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.utils.IOExceptionSupplier; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; @@ -51,7 +50,7 @@ import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; /** {@link VersionedSerializer} for {@link CommitMessage}. */ public class CommitMessageSerializer implements VersionedSerializer<CommitMessage> { - private static final int CURRENT_VERSION = 9; + public static final int CURRENT_VERSION = 10; private final DataFileMetaSerializer dataFileSerializer; private final IndexFileMetaSerializer indexEntrySerializer; @@ -103,14 +102,19 @@ public class CommitMessageSerializer implements VersionedSerializer<CommitMessag view.writeBoolean(false); } + // data increment dataFileSerializer.serializeList(message.newFilesIncrement().newFiles(), view); dataFileSerializer.serializeList(message.newFilesIncrement().deletedFiles(), view); dataFileSerializer.serializeList(message.newFilesIncrement().changelogFiles(), view); + indexEntrySerializer.serializeList(message.newFilesIncrement().newIndexFiles(), view); + indexEntrySerializer.serializeList(message.newFilesIncrement().deletedIndexFiles(), view); + + // compact increment dataFileSerializer.serializeList(message.compactIncrement().compactBefore(), view); dataFileSerializer.serializeList(message.compactIncrement().compactAfter(), view); dataFileSerializer.serializeList(message.compactIncrement().changelogFiles(), view); - indexEntrySerializer.serializeList(message.indexIncrement().newIndexFiles(), view); - indexEntrySerializer.serializeList(message.indexIncrement().deletedIndexFiles(), view); + indexEntrySerializer.serializeList(message.compactIncrement().newIndexFiles(), view); + indexEntrySerializer.serializeList(message.compactIncrement().deletedIndexFiles(), view); } @Override @@ -132,18 +136,48 @@ public class CommitMessageSerializer implements VersionedSerializer<CommitMessag IOExceptionSupplier<List<DataFileMeta>> fileDeserializer = fileDeserializer(version, view); IOExceptionSupplier<List<IndexFileMeta>> indexEntryDeserializer = indexEntryDeserializer(version, view); - - return new CommitMessageImpl( - deserializeBinaryRow(view), - view.readInt(), - version >= 7 && view.readBoolean() ? view.readInt() : null, - new DataIncrement( - fileDeserializer.get(), fileDeserializer.get(), fileDeserializer.get()), - new CompactIncrement( - fileDeserializer.get(), fileDeserializer.get(), fileDeserializer.get()), - new IndexIncrement( - indexEntryDeserializer.get(), - version <= 2 ? Collections.emptyList() : indexEntryDeserializer.get())); + if (version >= 10) { + return new CommitMessageImpl( + deserializeBinaryRow(view), + view.readInt(), + view.readBoolean() ? view.readInt() : null, + new DataIncrement( + fileDeserializer.get(), + fileDeserializer.get(), + fileDeserializer.get(), + indexEntryDeserializer.get(), + indexEntryDeserializer.get()), + new CompactIncrement( + fileDeserializer.get(), + fileDeserializer.get(), + fileDeserializer.get(), + indexEntryDeserializer.get(), + indexEntryDeserializer.get())); + } else { + BinaryRow partition = deserializeBinaryRow(view); + int bucket = view.readInt(); + Integer totalBuckets = version >= 7 && view.readBoolean() ? view.readInt() : null; + DataIncrement dataIncrement = + new DataIncrement( + fileDeserializer.get(), fileDeserializer.get(), fileDeserializer.get()); + CompactIncrement compactIncrement = + new CompactIncrement( + fileDeserializer.get(), fileDeserializer.get(), fileDeserializer.get()); + if (compactIncrement.isEmpty()) { + dataIncrement.newIndexFiles().addAll(indexEntryDeserializer.get()); + } else { + compactIncrement.newIndexFiles().addAll(indexEntryDeserializer.get()); + } + if (version > 2) { + if (compactIncrement.isEmpty()) { + dataIncrement.deletedIndexFiles().addAll(indexEntryDeserializer.get()); + } else { + compactIncrement.deletedIndexFiles().addAll(indexEntryDeserializer.get()); + } + } + return new CommitMessageImpl( + partition, bucket, totalBuckets, dataIncrement, compactIncrement); + } } private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 95b2df34f5..e549bd2e2c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -290,12 +290,18 @@ public class TableCommitImpl implements InnerTableCommit { Consumer<DataFileMeta> collector = f -> files.addAll(f.collectFiles(pathFactory)); msg.newFilesIncrement().newFiles().forEach(collector); msg.newFilesIncrement().changelogFiles().forEach(collector); + msg.newFilesIncrement().newIndexFiles().stream() + .map(indexFileFactory::toPath) + .forEach(files::add); + msg.newFilesIncrement().deletedIndexFiles().stream() + .map(indexFileFactory::toPath) + .forEach(files::add); msg.compactIncrement().compactBefore().forEach(collector); msg.compactIncrement().compactAfter().forEach(collector); - msg.indexIncrement().newIndexFiles().stream() + msg.compactIncrement().newIndexFiles().stream() .map(indexFileFactory::toPath) .forEach(files::add); - msg.indexIncrement().deletedIndexFiles().stream() + msg.compactIncrement().deletedIndexFiles().stream() .map(indexFileFactory::toPath) .forEach(files::add); } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java index 60a00ab567..3ca33a8222 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java @@ -30,7 +30,6 @@ import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.operation.FileStoreCommitImpl; import org.apache.paimon.schema.Schema; @@ -115,9 +114,13 @@ public class TestAppendFileStore extends AppendOnlyFileStore { partition, bucket, options().bucket(), - DataIncrement.emptyIncrement(), - CompactIncrement.emptyIncrement(), - new IndexIncrement(Collections.emptyList(), indexFileMetas)); + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + indexFileMetas), + CompactIncrement.emptyIncrement()); } public List<IndexFileMeta> scanDVIndexFiles(BinaryRow partition, int bucket) { @@ -153,9 +156,13 @@ public class TestAppendFileStore extends AppendOnlyFileStore { partition, bucket, options().bucket(), - DataIncrement.emptyIncrement(), - CompactIncrement.emptyIncrement(), - new IndexIncrement(indexFiles)); + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + indexFiles, + Collections.emptyList()), + CompactIncrement.emptyIncrement()); } public static TestAppendFileStore createAppendStore( diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 8e4cafc7de..dc1702c264 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -26,7 +26,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; -import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.io.DataIncrement; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.FileSource; @@ -346,14 +346,15 @@ public class TestFileStore extends KeyValueFileStore { entryWithPartition.getValue().entrySet()) { CommitIncrement increment = entryWithBucket.getValue().prepareCommit(ignorePreviousFiles); + DataIncrement dataIncrement = increment.newFilesIncrement(); + dataIncrement.newIndexFiles().addAll(indexFiles); committable.addFileCommittable( new CommitMessageImpl( entryWithPartition.getKey(), entryWithBucket.getKey(), options().bucket(), - increment.newFilesIncrement(), - increment.compactIncrement(), - new IndexIncrement(indexFiles))); + dataIncrement, + increment.compactIncrement())); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java index e03dd2cee2..0e95135873 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendCompactTaskTest.java @@ -31,7 +31,6 @@ import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.operation.BaseAppendFileStoreWrite; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.RawFileSplitRead; @@ -105,12 +104,11 @@ public class AppendCompactTaskTest { CommitMessageImpl compactMessage = (CommitMessageImpl) compactTask.doCompact(table, write); - IndexIncrement indexIncrement = compactMessage.indexIncrement(); - assertThat(indexIncrement.deletedIndexFiles()).isNotEmpty(); + assertThat(compactMessage.compactIncrement().deletedIndexFiles()).isNotEmpty(); if (compactBeforeAllFiles) { - assertThat(indexIncrement.newIndexFiles()).isEmpty(); + assertThat(compactMessage.compactIncrement().newIndexFiles()).isEmpty(); } else { - assertThat(indexIncrement.newIndexFiles()).isNotEmpty(); + assertThat(compactMessage.compactIncrement().newIndexFiles()).isNotEmpty(); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java index c195517731..10c6f45200 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java @@ -27,7 +27,6 @@ import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.CommitMessage; @@ -104,9 +103,13 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { EMPTY_ROW, 0, 1, - DataIncrement.emptyIncrement(), - CompactIncrement.emptyIncrement(), - new IndexIncrement(Collections.singletonList(file))); + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(file), + Collections.emptyList()), + CompactIncrement.emptyIncrement()); BatchTableCommit commit = table.newBatchWriteBuilder().newCommit(); commit.commit(Collections.singletonList(commitMessage)); @@ -127,9 +130,13 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { EMPTY_ROW, 0, 1, - DataIncrement.emptyIncrement(), - CompactIncrement.emptyIncrement(), - new IndexIncrement(Collections.singletonList(file))); + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(file), + Collections.emptyList()), + CompactIncrement.emptyIncrement()); commit = table.newBatchWriteBuilder().newCommit(); commit.commit(Collections.singletonList(commitMessage)); @@ -202,8 +209,12 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { 0, 1, DataIncrement.emptyIncrement(), - CompactIncrement.emptyIncrement(), - new IndexIncrement(Collections.singletonList(file))); + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(file), + Collections.emptyList())); BatchTableCommit commit1 = table.newBatchWriteBuilder().newCommit(); commit1.commit(Collections.singletonList(commitMessage1)); @@ -235,8 +246,12 @@ public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase { 0, 1, DataIncrement.emptyIncrement(), - CompactIncrement.emptyIncrement(), - new IndexIncrement(Collections.singletonList(file))); + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(file), + Collections.emptyList())); BatchTableCommit commit2 = table.newBatchWriteBuilder().newCommit(); commit2.commit(Collections.singletonList(commitMessage2)); diff --git a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java index 3cd9308d79..3a03985c88 100644 --- a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java @@ -71,10 +71,10 @@ class AppendDeletionFileMaintainerTest { Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>(); dataFileToDeletionFiles.putAll( createDeletionFileMapFromIndexFileMetas( - indexPathFactory, commitMessage1.indexIncrement().newIndexFiles())); + indexPathFactory, commitMessage1.newFilesIncrement().newIndexFiles())); dataFileToDeletionFiles.putAll( createDeletionFileMapFromIndexFileMetas( - indexPathFactory, commitMessage2.indexIncrement().newIndexFiles())); + indexPathFactory, commitMessage2.newFilesIncrement().newIndexFiles())); AppendDeleteFileMaintainer dvIFMaintainer = store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, dataFileToDeletionFiles); @@ -108,7 +108,7 @@ class AppendDeletionFileMaintainerTest { .findAny() .get(); assertThat(entry.indexFile()) - .isEqualTo(commitMessage1.indexIncrement().newIndexFiles().get(0)); + .isEqualTo(commitMessage1.newFilesIncrement().newIndexFiles().get(0)); entry = res.stream() .filter(file -> file.kind() == FileKind.DELETE) @@ -116,7 +116,7 @@ class AppendDeletionFileMaintainerTest { .findAny() .get(); assertThat(entry.indexFile()) - .isEqualTo(commitMessage2.indexIncrement().newIndexFiles().get(0)); + .isEqualTo(commitMessage2.newFilesIncrement().newIndexFiles().get(0)); } private Map<String, DeletionFile> createDeletionFileMapFromIndexFileMetas( diff --git a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java index 6f5e0d2ecd..b4f5363a06 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java @@ -75,7 +75,7 @@ public class DynamicBucketIndexMaintainerTest extends PrimaryKeyTableTestBase { Map<BinaryRow, Map<Integer, int[]>> index = new HashMap<>(); for (CommitMessage commitMessage : messages) { CommitMessageImpl message = (CommitMessageImpl) commitMessage; - List<IndexFileMeta> files = message.indexIncrement().newIndexFiles(); + List<IndexFileMeta> files = message.newFilesIncrement().newIndexFiles(); if (files.isEmpty()) { continue; } diff --git a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java index 01eca82c2e..4c5415d45d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java @@ -22,7 +22,6 @@ import org.apache.paimon.catalog.PrimaryKeyTableTestBase; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.sink.StreamTableCommit; @@ -216,10 +215,13 @@ public class HashBucketAssignerTest extends PrimaryKeyTableTestBase { bucket, totalBuckets, new DataIncrement( - Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(file), + Collections.emptyList()), new CompactIncrement( - Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), - new IndexIncrement(Collections.singletonList(file))); + Collections.emptyList(), Collections.emptyList(), Collections.emptyList())); } @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java index 927da8e97d..ccc8f6c57e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -24,7 +24,6 @@ import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.utils.IOUtils; @@ -45,6 +44,100 @@ import static org.assertj.core.api.Assertions.assertThat; /** Compatibility Test for {@link ManifestCommittableSerializer}. */ public class ManifestCommittableSerializerCompatibilityTest { + @Test + public void testCompatibilityToV4CommitV10() throws IOException { + String fileName = "manifest-committable-v10"; + + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + DataFileMeta dataFile = + DataFileMeta.create( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + FileSource.COMPACT, + Arrays.asList("field1", "field2", "field3"), + "hdfs://localhost:9000/path/to/file", + 1L, + Arrays.asList("asdf", "qwer", "zxcv")); + List<DataFileMeta> dataFiles = Collections.singletonList(dataFile); + IndexFileMeta hashIndexFile = + new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, null, null); + + LinkedHashMap<String, DeletionVectorMeta> dvRanges = new LinkedHashMap<>(); + dvRanges.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L)); + dvRanges.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L)); + IndexFileMeta devIndexFile = + new IndexFileMeta( + "my_index_type", + "my_index_file", + 1024 * 100, + 1002, + dvRanges, + "external_path"); + + CommitMessageImpl commitMessage = + new CommitMessageImpl( + singleColumn("my_partition"), + 11, + 16, + new DataIncrement( + dataFiles, + dataFiles, + dataFiles, + Collections.singletonList(hashIndexFile), + Collections.singletonList(hashIndexFile)), + new CompactIncrement( + dataFiles, + dataFiles, + dataFiles, + Collections.singletonList(devIndexFile), + Collections.emptyList())); + + ManifestCommittable manifestCommittable = + new ManifestCommittable( + 5, + 202020L, + Collections.singletonMap(5, 555L), + Collections.singletonList(commitMessage)); + manifestCommittable.addProperty("k1", "v1"); + manifestCommittable.addProperty("k2", "v2"); + + ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); + byte[] bytes = serializer.serialize(manifestCommittable); + ManifestCommittable deserialized = serializer.deserialize(serializer.getVersion(), bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + + byte[] oldBytes = + IOUtils.readFully( + ManifestCommittableSerializerCompatibilityTest.class + .getClassLoader() + .getResourceAsStream("compatibility/" + fileName), + true); + deserialized = serializer.deserialize(4, oldBytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + } + @Test public void testCompatibilityToV4CommitV9() throws IOException { SimpleStats keyStats = @@ -100,8 +193,12 @@ public class ManifestCommittableSerializerCompatibilityTest { 11, 16, new DataIncrement(dataFiles, dataFiles, dataFiles), - new CompactIncrement(dataFiles, dataFiles, dataFiles), - new IndexIncrement(indexFiles)); + new CompactIncrement( + dataFiles, + dataFiles, + dataFiles, + indexFiles, + Collections.emptyList())); ManifestCommittable manifestCommittable = new ManifestCommittable( @@ -178,8 +275,12 @@ public class ManifestCommittableSerializerCompatibilityTest { 11, 16, new DataIncrement(dataFiles, dataFiles, dataFiles), - new CompactIncrement(dataFiles, dataFiles, dataFiles), - new IndexIncrement(indexFiles)); + new CompactIncrement( + dataFiles, + dataFiles, + dataFiles, + indexFiles, + Collections.emptyList())); ManifestCommittable manifestCommittable = new ManifestCommittable( @@ -256,8 +357,12 @@ public class ManifestCommittableSerializerCompatibilityTest { 11, 16, new DataIncrement(dataFiles, dataFiles, dataFiles), - new CompactIncrement(dataFiles, dataFiles, dataFiles), - new IndexIncrement(indexFiles)); + new CompactIncrement( + dataFiles, + dataFiles, + dataFiles, + indexFiles, + Collections.emptyList())); ManifestCommittable manifestCommittable = new ManifestCommittable( @@ -333,8 +438,12 @@ public class ManifestCommittableSerializerCompatibilityTest { 11, 16, new DataIncrement(dataFiles, dataFiles, dataFiles), - new CompactIncrement(dataFiles, dataFiles, dataFiles), - new IndexIncrement(indexFiles)); + new CompactIncrement( + dataFiles, + dataFiles, + dataFiles, + indexFiles, + Collections.emptyList())); ManifestCommittable manifestCommittable = new ManifestCommittable( @@ -408,8 +517,12 @@ public class ManifestCommittableSerializerCompatibilityTest { 11, null, new DataIncrement(dataFiles, dataFiles, dataFiles), - new CompactIncrement(dataFiles, dataFiles, dataFiles), - new IndexIncrement(indexFiles)); + new CompactIncrement( + dataFiles, + dataFiles, + dataFiles, + indexFiles, + Collections.emptyList())); ManifestCommittable manifestCommittable = new ManifestCommittable( @@ -483,8 +596,12 @@ public class ManifestCommittableSerializerCompatibilityTest { 11, null, new DataIncrement(dataFiles, dataFiles, dataFiles), - new CompactIncrement(dataFiles, dataFiles, dataFiles), - new IndexIncrement(indexFiles)); + new CompactIncrement( + dataFiles, + dataFiles, + dataFiles, + indexFiles, + Collections.emptyList())); ManifestCommittable manifestCommittable = new ManifestCommittable( @@ -557,8 +674,12 @@ public class ManifestCommittableSerializerCompatibilityTest { 11, null, new DataIncrement(dataFiles, dataFiles, dataFiles), - new CompactIncrement(dataFiles, dataFiles, dataFiles), - new IndexIncrement(indexFiles)); + new CompactIncrement( + dataFiles, + dataFiles, + dataFiles, + indexFiles, + Collections.emptyList())); ManifestCommittable manifestCommittable = new ManifestCommittable( @@ -632,8 +753,12 @@ public class ManifestCommittableSerializerCompatibilityTest { 11, null, new DataIncrement(dataFiles, dataFiles, dataFiles), - new CompactIncrement(dataFiles, dataFiles, dataFiles), - new IndexIncrement(indexFiles)); + new CompactIncrement( + dataFiles, + dataFiles, + dataFiles, + indexFiles, + Collections.emptyList())); ManifestCommittable manifestCommittable = new ManifestCommittable( @@ -707,8 +832,12 @@ public class ManifestCommittableSerializerCompatibilityTest { 11, null, new DataIncrement(dataFiles, dataFiles, dataFiles), - new CompactIncrement(dataFiles, dataFiles, dataFiles), - new IndexIncrement(indexFiles)); + new CompactIncrement( + dataFiles, + dataFiles, + dataFiles, + indexFiles, + Collections.emptyList())); ManifestCommittable manifestCommittable = new ManifestCommittable( @@ -778,8 +907,12 @@ public class ManifestCommittableSerializerCompatibilityTest { 11, null, new DataIncrement(dataFiles, Collections.emptyList(), dataFiles), - new CompactIncrement(dataFiles, dataFiles, dataFiles), - new IndexIncrement(indexFiles)); + new CompactIncrement( + dataFiles, + dataFiles, + dataFiles, + indexFiles, + Collections.emptyList())); ManifestCommittable manifestCommittable = new ManifestCommittable( diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index 47a876f345..35843e0c2f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -914,8 +914,8 @@ public class FileStoreCommitTest { store.writeDVIndexFiles( BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2", Arrays.asList(3))); List<IndexFileMeta> deleted = - new ArrayList<>(commitMessage1.indexIncrement().newIndexFiles()); - deleted.addAll(commitMessage2.indexIncrement().newIndexFiles()); + new ArrayList<>(commitMessage1.newFilesIncrement().newIndexFiles()); + deleted.addAll(commitMessage2.newFilesIncrement().newIndexFiles()); CommitMessage commitMessage4 = store.removeIndexFiles(BinaryRow.EMPTY_ROW, 0, deleted); store.commit(commitMessage3, commitMessage4); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java index 29e71e2b44..0767200bb1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/DynamicBucketTableTest.java @@ -61,7 +61,7 @@ public class DynamicBucketTableTest extends TableTestBase { batchTableWrite.write(rowWithBucket.getKey(), rowWithBucket.getValue()); assertThat( ((CommitMessageImpl) batchTableWrite.prepareCommit().get(0)) - .indexIncrement() + .newFilesIncrement() .newIndexFiles() .get(0) .rowCount()) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java index 351af6a3b7..c4f519e84e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java @@ -20,7 +20,6 @@ package org.apache.paimon.table.sink; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.io.IndexIncrement; import org.junit.jupiter.api.Test; @@ -41,22 +40,31 @@ public class CommitMessageSerializerTest { CommitMessageSerializer serializer = new CommitMessageSerializer(); DataIncrement dataIncrement = randomNewFilesIncrement(); + dataIncrement.newIndexFiles().addAll(Arrays.asList(randomIndexFile(), randomIndexFile())); + dataIncrement + .deletedIndexFiles() + .addAll(Arrays.asList(randomIndexFile(), randomIndexFile())); + CompactIncrement compactIncrement = randomCompactIncrement(); - IndexIncrement indexIncrement = - new IndexIncrement( - Arrays.asList(randomIndexFile(), randomIndexFile()), - Arrays.asList(randomIndexFile(), randomIndexFile())); + compactIncrement + .newIndexFiles() + .addAll(Arrays.asList(randomIndexFile(), randomIndexFile())); + compactIncrement + .deletedIndexFiles() + .addAll(Arrays.asList(randomIndexFile(), randomIndexFile())); + CommitMessageImpl committable = - new CommitMessageImpl( - row(0), 1, 2, dataIncrement, compactIncrement, indexIncrement); + new CommitMessageImpl(row(0), 1, 2, dataIncrement, compactIncrement); CommitMessageImpl newCommittable = - (CommitMessageImpl) serializer.deserialize(7, serializer.serialize(committable)); + (CommitMessageImpl) + serializer.deserialize( + CommitMessageSerializer.CURRENT_VERSION, + serializer.serialize(committable)); assertThat(newCommittable.partition()).isEqualTo(committable.partition()); assertThat(newCommittable.bucket()).isEqualTo(committable.bucket()); assertThat(newCommittable.totalBuckets()).isEqualTo(committable.totalBuckets()); assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement()); assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement()); - assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement()); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/CompatibilityUtils.java b/paimon-core/src/test/java/org/apache/paimon/utils/CompatibilityUtils.java new file mode 100644 index 0000000000..67b8a3d511 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/CompatibilityUtils.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +/** Utils for compatibility. */ +public class CompatibilityUtils { + + public static final String COMPATIBILITY_FILE_DIR = "/src/test/resources/compatibility/"; + + /** Write compatibility file. */ + public static void writeCompatibilityFile(String fileName, byte[] data) throws IOException { + File file = new File(System.getProperty("user.dir") + COMPATIBILITY_FILE_DIR + fileName); + try (FileOutputStream fos = new FileOutputStream(file)) { + fos.write(data); + } + } +} diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v10 b/paimon-core/src/test/resources/compatibility/manifest-committable-v10 new file mode 100644 index 0000000000..0ae6e7bfe3 Binary files /dev/null and b/paimon-core/src/test/resources/compatibility/manifest-committable-v10 differ diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperator.java index 674dd59e7e..827d79ca5a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperator.java @@ -128,9 +128,10 @@ public class AppendPreCommitCompactCoordinatorOperator new DataIncrement( skippedFiles, message.newFilesIncrement().deletedFiles(), - message.newFilesIncrement().changelogFiles()), - message.compactIncrement(), - message.indexIncrement()); + message.newFilesIncrement().changelogFiles(), + message.newFilesIncrement().newIndexFiles(), + message.newFilesIncrement().deletedIndexFiles()), + message.compactIncrement()); if (!newMessage.isEmpty()) { Committable newCommittable = new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java index 0190279a8b..d336717031 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java @@ -138,12 +138,15 @@ public class ChangelogCompactCoordinateOperator new DataIncrement( message.newFilesIncrement().newFiles(), message.newFilesIncrement().deletedFiles(), - skippedNewChangelogs), + skippedNewChangelogs, + message.newFilesIncrement().newIndexFiles(), + message.newFilesIncrement().deletedIndexFiles()), new CompactIncrement( message.compactIncrement().compactBefore(), message.compactIncrement().compactAfter(), - skippedCompactChangelogs), - message.indexIncrement()); + skippedCompactChangelogs, + message.compactIncrement().newIndexFiles(), + message.compactIncrement().deletedIndexFiles())); Committable newCommittable = new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage); output.collect(new StreamRecord<>(Either.Left(newCommittable))); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java index 92e0110f27..e6183ccb8a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java @@ -23,7 +23,6 @@ import org.apache.paimon.flink.sink.Committable; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -95,12 +94,15 @@ public class ChangelogCompactSortOperator extends AbstractStreamOperator<Committ new DataIncrement( message.newFilesIncrement().newFiles(), message.newFilesIncrement().deletedFiles(), - Collections.emptyList()), + Collections.emptyList(), + message.newFilesIncrement().newIndexFiles(), + message.newFilesIncrement().deletedIndexFiles()), new CompactIncrement( message.compactIncrement().compactBefore(), message.compactIncrement().compactAfter(), - Collections.emptyList()), - message.indexIncrement()); + Collections.emptyList(), + message.compactIncrement().newIndexFiles(), + message.compactIncrement().deletedIndexFiles())); if (!newMessage.isEmpty()) { Committable newCommittable = new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage); @@ -138,8 +140,8 @@ public class ChangelogCompactSortOperator extends AbstractStreamOperator<Committ new CompactIncrement( Collections.emptyList(), Collections.emptyList(), - sortedChangelogs(compactChangelogFiles, partition, bucket)), - new IndexIncrement(Collections.emptyList())); + sortedChangelogs( + compactChangelogFiles, partition, bucket))); Committable newCommittable = new Committable(checkpointId, Committable.Kind.FILE, newMessage); output.collect(new StreamRecord<>(newCommittable)); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java index 2991fb4ecc..50eb5a0796 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCommittableRewriter.java @@ -27,7 +27,6 @@ import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.utils.FileStorePathFactory; @@ -137,8 +136,8 @@ public class PostponeBucketCommittableRewriter { changelogFiles.addAll(message.newFilesIncrement().changelogFiles()); changelogFiles.addAll(message.compactIncrement().changelogFiles()); - newIndexFiles.addAll(message.indexIncrement().newIndexFiles()); - deletedIndexFiles.addAll(message.indexIncrement().deletedIndexFiles()); + newIndexFiles.addAll(message.compactIncrement().newIndexFiles()); + deletedIndexFiles.addAll(message.compactIncrement().deletedIndexFiles()); toDelete.forEach((fileName, path) -> fileIO.deleteQuietly(path)); } @@ -151,8 +150,12 @@ public class PostponeBucketCommittableRewriter { bucket, totalBuckets, DataIncrement.emptyIncrement(), - new CompactIncrement(compactBefore, realCompactAfter, changelogFiles), - new IndexIncrement(newIndexFiles, deletedIndexFiles)); + new CompactIncrement( + compactBefore, + realCompactAfter, + changelogFiles, + newIndexFiles, + deletedIndexFiles)); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java index 5dbd9f3a09..0283b197d2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/listener/PartitionMarkDoneListener.java @@ -147,9 +147,7 @@ public class PartitionMarkDoneListener implements CommitListener { for (ManifestCommittable committable : committables) { for (CommitMessage commitMessage : committable.fileCommittables()) { CommitMessageImpl message = (CommitMessageImpl) commitMessage; - if (waitCompaction - || !message.indexIncrement().isEmpty() - || !message.newFilesIncrement().isEmpty()) { + if (waitCompaction || !message.newFilesIncrement().isEmpty()) { partitions.add(message.partition()); } } @@ -199,9 +197,7 @@ public class PartitionMarkDoneListener implements CommitListener { if (watermark != null) { for (CommitMessage commitMessage : committable.fileCommittables()) { CommitMessageImpl message = (CommitMessageImpl) commitMessage; - if (waitCompaction - || !message.indexIncrement().isEmpty() - || !message.newFilesIncrement().isEmpty()) { + if (waitCompaction || !message.newFilesIncrement().isEmpty()) { partitionWatermarks.compute( message.partition(), (partition, old) -> diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java index 333f315df6..ad9e9355e1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/AppendPreCommitCompactCoordinatorOperatorTest.java @@ -190,7 +190,8 @@ public class AppendPreCommitCompactCoordinatorOperatorTest { assertThat(message.newFilesIncrement().deletedFiles()).isEmpty(); assertThat(message.newFilesIncrement().changelogFiles()).isEmpty(); assertThat(message.compactIncrement().isEmpty()).isTrue(); - assertThat(message.indexIncrement().isEmpty()).isTrue(); + assertThat(message.newFilesIncrement().newIndexFiles().isEmpty()).isTrue(); + assertThat(message.newFilesIncrement().deletedIndexFiles().isEmpty()).isTrue(); assertThat(message.newFilesIncrement().newFiles().stream().map(DataFileMeta::fileSize)) .hasSameElementsAs( Arrays.stream(mbs) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java index 109a3f984e..276bf45193 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java @@ -27,7 +27,6 @@ import org.apache.paimon.flink.utils.TestingMetricUtils; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.CompactIncrement; -import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; @@ -596,8 +595,7 @@ public class WriterOperatorTest { message.bucket(), message.totalBuckets(), message.newFilesIncrement(), - CompactIncrement.emptyIncrement(), - new IndexIncrement(Collections.emptyList())); + CompactIncrement.emptyIncrement()); commitMessages.add(newMessage); } commit.commit(commitIdentifier, commitMessages); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java index 3439415c2f..c873234105 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/listener/ListenerTestUtils.java @@ -24,7 +24,6 @@ import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFileTestUtils; import org.apache.paimon.io.DataIncrement; -import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.sink.CommitMessageImpl; @@ -67,8 +66,7 @@ class ListenerTestUtils { 0, 1, new DataIncrement(emptyList(), emptyList(), emptyList()), - new CompactIncrement(singletonList(file), emptyList(), emptyList()), - new IndexIncrement(emptyList())); + new CompactIncrement(singletonList(file), emptyList(), emptyList())); } else { compactMessage = new CommitMessageImpl( @@ -76,8 +74,7 @@ class ListenerTestUtils { 0, 1, new DataIncrement(singletonList(file), emptyList(), emptyList()), - new CompactIncrement(emptyList(), emptyList(), emptyList()), - new IndexIncrement(emptyList())); + new CompactIncrement(emptyList(), emptyList(), emptyList())); } committable.addFileCommittable(compactMessage); if (partitionMarkDoneRecoverFromState) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala index 7aea754040..300ca20ff8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala @@ -21,8 +21,7 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions import org.apache.paimon.deletionvectors.{Bitmap64DeletionVector, BitmapDeletionVector, DeletionVector} import org.apache.paimon.fs.Path -import org.apache.paimon.index.IndexFileMeta -import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, IndexIncrement} +import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement} import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper import org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand @@ -220,8 +219,7 @@ trait PaimonRowLevelCommand new CompactIncrement( Collections.emptyList[DataFileMeta], Collections.emptyList[DataFileMeta], - Collections.emptyList[DataFileMeta]), - new IndexIncrement(Collections.emptyList[IndexFileMeta]) + Collections.emptyList[DataFileMeta]) ) } .toSeq diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 344cd0879b..70f4676cad 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -26,7 +26,7 @@ import org.apache.paimon.data.serializer.InternalSerializers import org.apache.paimon.deletionvectors.DeletionVector import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner} -import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement} +import org.apache.paimon.io.{CompactIncrement, DataIncrement} import org.apache.paimon.manifest.FileKind import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils} import org.apache.paimon.spark.catalog.functions.BucketFunction @@ -344,9 +344,13 @@ case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = dvIndexFileMaintainer.getPartition, dvIndexFileMaintainer.getBucket, null, - DataIncrement.emptyIncrement(), - CompactIncrement.emptyIncrement(), - new IndexIncrement(added.map(_.indexFile).asJava, deleted.map(_.indexFile).asJava) + new DataIncrement( + java.util.Collections.emptyList(), + java.util.Collections.emptyList(), + java.util.Collections.emptyList(), + added.map(_.indexFile).asJava, + deleted.map(_.indexFile).asJava), + CompactIncrement.emptyIncrement() ) val serializer = new CommitMessageSerializer serializer.serialize(commitMessage)
