This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
commit ded6aebad92457dbf094076920ac19f5da253d74 Author: JingsongLi <[email protected]> AuthorDate: Tue Dec 30 16:58:47 2025 +0800 [core] Extract StrictModeChecker from ConflictDetection in Commit --- .../java/org/apache/paimon/AbstractFileStore.java | 14 ++- .../paimon/operation/FileStoreCommitImpl.java | 20 ++-- .../paimon/operation/commit/ConflictDetection.java | 63 +----------- .../paimon/operation/commit/StrictModeChecker.java | 114 +++++++++++++++++++++ .../apache/paimon/table/sink/TableCommitTest.java | 54 +++++----- 5 files changed, 159 insertions(+), 106 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 4eea07a7e9..6944c0283f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -44,6 +44,7 @@ import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.operation.commit.ConflictDetection; +import org.apache.paimon.operation.commit.StrictModeChecker; import org.apache.paimon.partition.PartitionExpireStrategy; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -279,8 +280,13 @@ abstract class AbstractFileStore<T> implements FileStore<T> { newKeyComparator(), bucketMode(), options.deletionVectorsEnabled(), - newIndexFileHandler(), - newScan()); + newIndexFileHandler()); + StrictModeChecker strictModeChecker = + StrictModeChecker.create( + snapshotManager, + commitUser, + this::newScan, + options.commitStrictModeLastSafeSnapshot().orElse(null)); return new FileStoreCommitImpl( snapshotCommit, fileIO, @@ -310,10 +316,10 @@ abstract class AbstractFileStore<T> implements FileStore<T> { options.commitTimeout(), options.commitMinRetryWait(), options.commitMaxRetryWait(), - options.commitStrictModeLastSafeSnapshot().orElse(null), options.rowTrackingEnabled(), options.commitDiscardDuplicateFiles(), - conflictDetection); + conflictDetection, + strictModeChecker); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index aa47e69d5e..9933e40250 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -50,6 +50,7 @@ import org.apache.paimon.operation.commit.ConflictDetection.ConflictCheck; import org.apache.paimon.operation.commit.ManifestEntryChanges; import org.apache.paimon.operation.commit.RetryCommitResult; import org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned; +import org.apache.paimon.operation.commit.StrictModeChecker; import org.apache.paimon.operation.commit.SuccessCommitResult; import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.operation.metrics.CommitStats; @@ -158,10 +159,10 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final long commitMinRetryWait; private final long commitMaxRetryWait; private final int commitMaxRetries; - @Nullable private Long strictModeLastSafeSnapshot; private final InternalRowPartitionComputer partitionComputer; private final boolean rowTrackingEnabled; private final boolean discardDuplicateFiles; + @Nullable private final StrictModeChecker strictModeChecker; private final ConflictDetection conflictDetection; private final CommitCleaner commitCleaner; @@ -198,10 +199,10 @@ public class FileStoreCommitImpl implements FileStoreCommit { long commitTimeout, long commitMinRetryWait, long commitMaxRetryWait, - @Nullable Long strictModeLastSafeSnapshot, boolean rowTrackingEnabled, boolean discardDuplicateFiles, - ConflictDetection conflictDetection) { + ConflictDetection conflictDetection, + @Nullable StrictModeChecker strictModeChecker) { this.snapshotCommit = snapshotCommit; this.fileIO = fileIO; this.schemaManager = schemaManager; @@ -228,7 +229,6 @@ public class FileStoreCommitImpl implements FileStoreCommit { this.commitTimeout = commitTimeout; this.commitMinRetryWait = commitMinRetryWait; this.commitMaxRetryWait = commitMaxRetryWait; - this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot; this.partitionComputer = new InternalRowPartitionComputer( options.partitionDefaultName(), @@ -241,6 +241,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { this.bucketMode = bucketMode; this.rowTrackingEnabled = rowTrackingEnabled; this.discardDuplicateFiles = discardDuplicateFiles; + this.strictModeChecker = strictModeChecker; this.conflictDetection = conflictDetection; this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile); } @@ -873,10 +874,9 @@ public class FileStoreCommitImpl implements FileStoreCommit { } } - if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot >= 0) { - conflictDetection.commitStrictModeCheck( - strictModeLastSafeSnapshot, newSnapshotId, commitKind, snapshotManager); - strictModeLastSafeSnapshot = newSnapshotId - 1; + if (strictModeChecker != null) { + strictModeChecker.check(newSnapshotId, commitKind); + strictModeChecker.update(newSnapshotId - 1); } if (LOG.isDebugEnabled()) { @@ -1084,8 +1084,8 @@ public class FileStoreCommitImpl implements FileStoreCommit { commitUser, identifier, commitKind.name()); - if (strictModeLastSafeSnapshot != null) { - strictModeLastSafeSnapshot = newSnapshot.id(); + if (strictModeChecker != null) { + strictModeChecker.update(newSnapshotId); } final List<SimpleFileEntry> finalBaseFiles = baseDataFiles; final List<ManifestEntry> finalDeltaFiles = deltaFiles; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java index ae58cba819..801bba4817 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java @@ -18,7 +18,6 @@ package org.apache.paimon.operation.commit; -import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.Snapshot.CommitKind; import org.apache.paimon.data.BinaryRow; @@ -29,17 +28,13 @@ import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; -import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.SimpleFileEntry; import org.apache.paimon.manifest.SimpleFileEntryWithDV; -import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.table.BucketMode; -import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.SnapshotManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +47,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -77,7 +71,6 @@ public class ConflictDetection { private final BucketMode bucketMode; private final boolean deletionVectorsEnabled; private final IndexFileHandler indexFileHandler; - private final FileStoreScan scan; private @Nullable PartitionExpire partitionExpire; @@ -89,8 +82,7 @@ public class ConflictDetection { @Nullable Comparator<InternalRow> keyComparator, BucketMode bucketMode, boolean deletionVectorsEnabled, - IndexFileHandler indexFileHandler, - FileStoreScan scan) { + IndexFileHandler indexFileHandler) { this.tableName = tableName; this.commitUser = commitUser; this.partitionType = partitionType; @@ -99,7 +91,6 @@ public class ConflictDetection { this.bucketMode = bucketMode; this.deletionVectorsEnabled = deletionVectorsEnabled; this.indexFileHandler = indexFileHandler; - this.scan = scan; } @Nullable @@ -111,58 +102,6 @@ public class ConflictDetection { this.partitionExpire = partitionExpire; } - public void commitStrictModeCheck( - @Nullable Long strictModeLastSafeSnapshot, - long newSnapshotId, - CommitKind newCommitKind, - SnapshotManager snapshotManager) { - if (strictModeLastSafeSnapshot == null || strictModeLastSafeSnapshot < 0) { - return; - } - - for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; id++) { - Snapshot snapshot = snapshotManager.snapshot(id); - if (snapshot.commitUser().equals(commitUser)) { - continue; - } - if (snapshot.commitKind() == CommitKind.COMPACT - || snapshot.commitKind() == CommitKind.OVERWRITE) { - throw new RuntimeException( - String.format( - "When trying to commit snapshot %d, " - + "commit user %s has found a %s snapshot (id: %d) by another user %s. " - + "Giving up committing as %s is set.", - newSnapshotId, - commitUser, - snapshot.commitKind().name(), - id, - snapshot.commitUser(), - CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key())); - } - if (snapshot.commitKind() == CommitKind.APPEND - && newCommitKind == CommitKind.OVERWRITE) { - Iterator<ManifestEntry> entries = - scan.withSnapshot(snapshot) - .withKind(ScanMode.DELTA) - .onlyReadRealBuckets() - .dropStats() - .readFileIterator(); - if (entries.hasNext()) { - throw new RuntimeException( - String.format( - "When trying to commit snapshot %d, " - + "commit user %s has found a APPEND snapshot (id: %d) by another user %s " - + "which committed files to fixed bucket. Giving up committing as %s is set.", - newSnapshotId, - commitUser, - id, - snapshot.commitUser(), - CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key())); - } - } - } - } - public void checkNoConflictsOrFail( Snapshot snapshot, List<SimpleFileEntry> baseEntries, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java new file mode 100644 index 0000000000..e2d76c2327 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation.commit; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.Snapshot.CommitKind; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.operation.FileStoreScan; +import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.utils.SnapshotManager; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.function.Supplier; + +/** A checker to check strict mode based on last safe snapshot. */ +public class StrictModeChecker { + + private final SnapshotManager snapshotManager; + private final String commitUser; + private final FileStoreScan scan; + + private long strictModeLastSafeSnapshot; + + public StrictModeChecker( + SnapshotManager snapshotManager, + String commitUser, + FileStoreScan scan, + long strictModeLastSafeSnapshot) { + this.snapshotManager = snapshotManager; + this.commitUser = commitUser; + this.scan = scan; + this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot; + } + + @Nullable + public static StrictModeChecker create( + SnapshotManager snapshotManager, + String commitUser, + Supplier<FileStoreScan> scanSupplier, + @Nullable Long strictModeLastSafeSnapshot) { + if (strictModeLastSafeSnapshot == null) { + return null; + } + return new StrictModeChecker( + snapshotManager, commitUser, scanSupplier.get(), strictModeLastSafeSnapshot); + } + + public void check(long newSnapshotId, CommitKind newCommitKind) { + for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; id++) { + Snapshot snapshot = snapshotManager.snapshot(id); + if (snapshot.commitUser().equals(commitUser)) { + continue; + } + if (snapshot.commitKind() == CommitKind.COMPACT + || snapshot.commitKind() == CommitKind.OVERWRITE) { + throw new RuntimeException( + String.format( + "When trying to commit snapshot %d, " + + "commit user %s has found a %s snapshot (id: %d) by another user %s. " + + "Giving up committing as %s is set.", + newSnapshotId, + commitUser, + snapshot.commitKind().name(), + id, + snapshot.commitUser(), + CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key())); + } + if (snapshot.commitKind() == CommitKind.APPEND + && newCommitKind == CommitKind.OVERWRITE) { + Iterator<ManifestEntry> entries = + scan.withSnapshot(snapshot) + .withKind(ScanMode.DELTA) + .onlyReadRealBuckets() + .dropStats() + .readFileIterator(); + if (entries.hasNext()) { + throw new RuntimeException( + String.format( + "When trying to commit snapshot %d, " + + "commit user %s has found a APPEND snapshot (id: %d) by another user %s " + + "which committed files to fixed bucket. Giving up committing as %s is set.", + newSnapshotId, + commitUser, + id, + snapshot.commitUser(), + CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key())); + } + } + } + } + + public void update(long newSafeSnapshot) { + strictModeLastSafeSnapshot = newSafeSnapshot; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java index 2b3625a3c3..7fc17ae792 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java @@ -67,6 +67,7 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; import static java.util.Collections.singletonMap; +import static org.apache.paimon.CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT; import static org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -421,19 +422,17 @@ public class TableCommitTest { TableWriteImpl<?> write1 = table.newWrite(user1); TableCommitImpl commit1 = table.newCommit(user1); - Map<String, String> newOptions = new HashMap<>(); - newOptions.put(CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "-1"); - table = table.copy(newOptions); - String user2 = UUID.randomUUID().toString(); - TableWriteImpl<?> write2 = table.newWrite(user2); - TableCommitImpl commit2 = table.newCommit(user2); - - // by default, first commit is not checked - write1.write(GenericRow.of(0, 0L)); write1.compact(BinaryRow.EMPTY_ROW, 0, true); commit1.commit(1, write1.prepareCommit(true, 1)); + // test skip this commit check + + String user2 = UUID.randomUUID().toString(); + table = table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2")); + TableWriteImpl<?> write2 = table.newWrite(user2); + TableCommitImpl commit2 = table.newCommit(user2); + write2.write(GenericRow.of(1, 1L)); commit2.commit(1, write2.prepareCommit(false, 1)); @@ -482,19 +481,17 @@ public class TableCommitTest { TableWriteImpl<?> write1 = fixedBucketWriteTable.newWrite(user1); TableCommitImpl commit1 = fixedBucketWriteTable.newCommit(user1); - Map<String, String> newOptions = new HashMap<>(); - newOptions.put(CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "-1"); - table = table.copy(newOptions); - String user2 = UUID.randomUUID().toString(); - TableWriteImpl<?> write2 = table.newWrite(user2); - TableCommitImpl commit2 = table.newCommit(user2).withOverwrite(Collections.emptyMap()); - - // by default, first commit is not checked - write1.write(GenericRow.of(0, 0L)); write1.compact(BinaryRow.EMPTY_ROW, 0, true); commit1.commit(1, write1.prepareCommit(true, 1)); + // test skip this commit check + + String user2 = UUID.randomUUID().toString(); + table = table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2")); + TableWriteImpl<?> write2 = table.newWrite(user2); + TableCommitImpl commit2 = table.newCommit(user2).withOverwrite(Collections.emptyMap()); + write2.write(GenericRow.of(1, 1L)); commit2.commit(1, write2.prepareCommit(false, 1)); @@ -560,19 +557,17 @@ public class TableCommitTest { TableWriteImpl<?> write1 = fixedBucketWriteTable.newWrite(user1); TableCommitImpl commit1 = fixedBucketWriteTable.newCommit(user1); - Map<String, String> newOptions = new HashMap<>(); - newOptions.put(CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "-1"); - table = table.copy(newOptions); - String user2 = UUID.randomUUID().toString(); - TableWriteImpl<?> write2 = table.newWrite(user2); - TableCommitImpl commit2 = table.newCommit(user2); - - // by default, first commit is not checked - write1.write(GenericRow.of(0, 0L)); write1.compact(BinaryRow.EMPTY_ROW, 0, true); commit1.commit(1, write1.prepareCommit(true, 1)); + // test skip this commit check + + String user2 = UUID.randomUUID().toString(); + table = table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2")); + TableWriteImpl<?> write2 = table.newWrite(user2); + TableCommitImpl commit2 = table.newCommit(user2); + write2.write(GenericRow.of(1, 1L)); commit2.commit(1, write2.prepareCommit(false, 1)); @@ -896,15 +891,14 @@ public class TableCommitTest { tableSchema, CatalogEnvironment.empty()); String user1 = UUID.randomUUID().toString(); - FileStoreTable overwriteUpgrade = - table.copy(Collections.singletonMap("write-only", "true")); + FileStoreTable overwriteUpgrade = table.copy(singletonMap("write-only", "true")); TableWriteImpl<?> write1 = overwriteUpgrade.newWrite(user1); TableCommitImpl commit1 = overwriteUpgrade.newCommit(user1).withOverwrite(Collections.emptyMap()); String user2 = UUID.randomUUID().toString(); FileStoreTable compactTable = - table.copy(Collections.singletonMap("compaction.force-up-level-0", "true")); + table.copy(singletonMap("compaction.force-up-level-0", "true")); TableWriteImpl<?> write2 = compactTable.newWrite(user2); TableCommitImpl commit2 = compactTable.newCommit(user2);
