This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9eb6305b79 [core] Strict-mode for OVERWRITE check APPEND snapshot with
fixed bucket files (#6670)
9eb6305b79 is described below
commit 9eb6305b796c2750b3487d5f8789814bfbaa4480
Author: yuzelin <[email protected]>
AuthorDate: Wed Nov 26 10:23:19 2025 +0800
[core] Strict-mode for OVERWRITE check APPEND snapshot with fixed bucket
files (#6670)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 5 +-
.../java/org/apache/paimon/AbstractFileStore.java | 3 +-
.../paimon/operation/FileStoreCommitImpl.java | 20 +-----
.../paimon/operation/commit/ConflictDetection.java | 63 ++++++++++++++++-
.../apache/paimon/table/sink/TableCommitTest.java | 80 ++++++++++++++++++++--
6 files changed, 145 insertions(+), 28 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index c6101e0a85..b50b9f2c9a 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -240,7 +240,7 @@ under the License.
<td><h5>commit.strict-mode.last-safe-snapshot</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
- <td>If set, committer will check if there are other commit user's
COMPACT / OVERWRITE snapshot, starting from the snapshot after this one. If
found, commit will be aborted. If the value of this option is -1, committer
will not check for its first commit.</td>
+ <td>If set, committer will check if there are other commit user's
snapshot starting from the snapshot after this one. If found a COMPACT /
OVERWRITE snapshot, or found a APPEND snapshot which committed files to fixed
bucket, commit will be aborted.If the value of this option is -1, committer
will not check for its first commit.</td>
</tr>
<tr>
<td><h5>commit.timeout</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index fa5b133a3c..e9077ff589 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1949,8 +1949,9 @@ public class CoreOptions implements Serializable {
.longType()
.noDefaultValue()
.withDescription(
- "If set, committer will check if there are other
commit user's COMPACT / OVERWRITE snapshot, "
- + "starting from the snapshot after this
one. If found, commit will be aborted. "
+ "If set, committer will check if there are other
commit user's snapshot starting from the "
+ + "snapshot after this one. If found a
COMPACT / OVERWRITE snapshot, or found a "
+ + "APPEND snapshot which committed files
to fixed bucket, commit will be aborted."
+ "If the value of this option is -1,
committer will not check for its first commit.");
public static final ConfigOption<String> CLUSTERING_COLUMNS =
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 5fa81383ce..a7e15b31d6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -277,7 +277,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
newKeyComparator(),
bucketMode(),
options.deletionVectorsEnabled(),
- newIndexFileHandler());
+ newIndexFileHandler(),
+ newScan());
return new FileStoreCommitImpl(
snapshotCommit,
fileIO,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index ceb68ed6ba..d0f18760ec 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -992,24 +992,8 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot
>= 0) {
- for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId;
id++) {
- Snapshot snapshot = snapshotManager.snapshot(id);
- if ((snapshot.commitKind() == CommitKind.COMPACT
- || snapshot.commitKind() ==
CommitKind.OVERWRITE)
- && !snapshot.commitUser().equals(commitUser)) {
- throw new RuntimeException(
- String.format(
- "When trying to commit snapshot %d, "
- + "commit user %s has found a %s
snapshot (id: %d) by another user %s. "
- + "Giving up committing as %s is
set.",
- newSnapshotId,
- commitUser,
- snapshot.commitKind().name(),
- id,
- snapshot.commitUser(),
-
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
- }
- }
+ conflictDetection.commitStrictModeCheck(
+ strictModeLastSafeSnapshot, newSnapshotId, commitKind,
snapshotManager);
strictModeLastSafeSnapshot = newSnapshotId - 1;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 9739beae88..0261a3d7de 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -18,6 +18,7 @@
package org.apache.paimon.operation.commit;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.data.BinaryRow;
@@ -28,13 +29,17 @@ import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.manifest.SimpleFileEntryWithDV;
+import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +52,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -71,6 +77,7 @@ public class ConflictDetection {
private final BucketMode bucketMode;
private final boolean deletionVectorsEnabled;
private final IndexFileHandler indexFileHandler;
+ private final FileStoreScan scan;
private @Nullable PartitionExpire partitionExpire;
@@ -82,7 +89,8 @@ public class ConflictDetection {
@Nullable Comparator<InternalRow> keyComparator,
BucketMode bucketMode,
boolean deletionVectorsEnabled,
- IndexFileHandler indexFileHandler) {
+ IndexFileHandler indexFileHandler,
+ FileStoreScan scan) {
this.tableName = tableName;
this.commitUser = commitUser;
this.partitionType = partitionType;
@@ -91,12 +99,65 @@ public class ConflictDetection {
this.bucketMode = bucketMode;
this.deletionVectorsEnabled = deletionVectorsEnabled;
this.indexFileHandler = indexFileHandler;
+ this.scan = scan;
}
public void withPartitionExpire(PartitionExpire partitionExpire) {
this.partitionExpire = partitionExpire;
}
+ public void commitStrictModeCheck(
+ @Nullable Long strictModeLastSafeSnapshot,
+ long newSnapshotId,
+ CommitKind newCommitKind,
+ SnapshotManager snapshotManager) {
+ if (strictModeLastSafeSnapshot == null || strictModeLastSafeSnapshot <
0) {
+ return;
+ }
+
+ for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId;
id++) {
+ Snapshot snapshot = snapshotManager.snapshot(id);
+ if (snapshot.commitUser().equals(commitUser)) {
+ continue;
+ }
+ if (snapshot.commitKind() == CommitKind.COMPACT
+ || snapshot.commitKind() == CommitKind.OVERWRITE) {
+ throw new RuntimeException(
+ String.format(
+ "When trying to commit snapshot %d, "
+ + "commit user %s has found a %s
snapshot (id: %d) by another user %s. "
+ + "Giving up committing as %s is set.",
+ newSnapshotId,
+ commitUser,
+ snapshot.commitKind().name(),
+ id,
+ snapshot.commitUser(),
+
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
+ }
+ if (snapshot.commitKind() == CommitKind.APPEND
+ && newCommitKind == CommitKind.OVERWRITE) {
+ Iterator<ManifestEntry> entries =
+ scan.withSnapshot(snapshot)
+ .withKind(ScanMode.DELTA)
+ .onlyReadRealBuckets()
+ .dropStats()
+ .readFileIterator();
+ if (entries.hasNext()) {
+ throw new RuntimeException(
+ String.format(
+ "When trying to commit snapshot %d, "
+ + "commit user %s has found a
APPEND snapshot (id: %d) by another user %s "
+ + "which committed files to fixed
bucket. Giving up committing as %s is set.",
+ newSnapshotId,
+ commitUser,
+ id,
+ snapshot.commitUser(),
+
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
+ }
+ }
+ }
+ }
+
public void checkNoConflictsOrFail(
Snapshot snapshot,
List<SimpleFileEntry> baseEntries,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index 0a39ca7ef8..10373c0377 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -323,7 +323,7 @@ public class TableCommitTest {
}
@Test
- public void testStrictMode() throws Exception {
+ public void testStrictModeForCompact() throws Exception {
String path = tempDir.toString();
RowType rowType =
RowType.of(
@@ -369,18 +369,88 @@ public class TableCommitTest {
write2.write(GenericRow.of(1, 1L));
commit2.commit(1, write2.prepareCommit(false, 1));
- // APPEND commit is ignored
+ // COMPACT commit should be checked
+
+ write1.write(GenericRow.of(4, 4L));
+ write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit1.commit(3, write1.prepareCommit(true, 3));
+
+ write2.write(GenericRow.of(5, 5L));
+ assertThatThrownBy(() -> commit2.commit(3, write2.prepareCommit(false,
3)))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(
+ "Giving up committing as
commit.strict-mode.last-safe-snapshot is set.");
+ }
+
+ @Test
+ public void testStrictModeForAppend() throws Exception {
+ String path = tempDir.toString();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+ new String[] {"k", "v"});
+
+ Options options = new Options();
+ options.set(CoreOptions.PATH, path);
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(path)),
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ options.toMap(),
+ ""));
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(path),
+ tableSchema,
+ CatalogEnvironment.empty());
+ String user1 = UUID.randomUUID().toString();
+ FileStoreTable fixedBucketWriteTable = table;
+ TableWriteImpl<?> write1 = fixedBucketWriteTable.newWrite(user1);
+ TableCommitImpl commit1 = fixedBucketWriteTable.newCommit(user1);
+ Map<String, String> newOptions = new HashMap<>();
+
newOptions.put(CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "-1");
+ table = table.copy(newOptions);
+ String user2 = UUID.randomUUID().toString();
+ TableWriteImpl<?> write2 = table.newWrite(user2);
+ TableCommitImpl commit2 =
table.newCommit(user2).withOverwrite(Collections.emptyMap());
+
+ // by default, first commit is not checked
+
+ write1.write(GenericRow.of(0, 0L));
+ write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+ commit1.commit(1, write1.prepareCommit(true, 1));
+
+ write2.write(GenericRow.of(1, 1L));
+ commit2.commit(1, write2.prepareCommit(false, 1));
+
+ // APPEND with postpone bucket files should be ignored
+ write1.close();
+ commit1.close();
+ Map<String, String> postponeWriteOptions = new HashMap<>();
+ postponeWriteOptions.put(CoreOptions.BUCKET.key(), "-2");
+
postponeWriteOptions.put(CoreOptions.POSTPONE_BATCH_WRITE_FIXED_BUCKET.key(),
"false");
+ FileStoreTable postponeWriteTable =
fixedBucketWriteTable.copy(postponeWriteOptions);
+ write1 = postponeWriteTable.newWrite(user1);
+ commit1 = postponeWriteTable.newCommit(user1);
write1.write(GenericRow.of(2, 2L));
commit1.commit(2, write1.prepareCommit(false, 2));
write2.write(GenericRow.of(3, 3L));
commit2.commit(2, write2.prepareCommit(false, 2));
- // COMPACT commit should be checked
-
+ // APPEND with fixed bucket files should be checked
+ write1.close();
+ commit1.close();
+ write1 = fixedBucketWriteTable.newWrite(user1);
+ commit1 = fixedBucketWriteTable.newCommit(user1);
write1.write(GenericRow.of(4, 4L));
- write1.compact(BinaryRow.EMPTY_ROW, 0, true);
commit1.commit(3, write1.prepareCommit(true, 3));
write2.write(GenericRow.of(5, 5L));