This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5c91df1613 [core][test] Test that snapshot deletion stops when
throwing exception for skipping set (#5438)
5c91df1613 is described below
commit 5c91df1613f5f6a53686c04c636d2109561bbcfb
Author: yuzelin <[email protected]>
AuthorDate: Fri Apr 11 14:35:15 2025 +0800
[core][test] Test that snapshot deletion stops when throwing exception for
skipping set (#5438)
---
.../apache/paimon/table/ExpireSnapshotsImpl.java | 29 ++--
.../test/java/org/apache/paimon/TestFileStore.java | 4 +
.../paimon/operation/ExpireSnapshotsTest.java | 28 ++++
.../apache/paimon/operation/FileDeletionTest.java | 176 +++++++++++++++++++++
.../paimon/operation/FileStoreTestUtils.java | 4 +
5 files changed, 231 insertions(+), 10 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index 7c63ab7e54..4e6a1f87da 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -230,21 +230,31 @@ public class ExpireSnapshotsImpl implements
ExpireSnapshots {
return 0;
}
- Set<String> skippingSet = new HashSet<>();
+ Set<String> skippingSet = null;
try {
-
skippingSet.addAll(snapshotDeletion.manifestSkippingSet(skippingSnapshots));
+ skippingSet = new
HashSet<>(snapshotDeletion.manifestSkippingSet(skippingSnapshots));
} catch (Exception e) {
- // maybe snapshot been deleted by other jobs.
- if (e.getCause() == null || !(e.getCause() instanceof
FileNotFoundException)) {
- throw e;
- }
+ LOG.info("Skip cleaning manifest files due to failed to build
skipping set.", e);
}
+ if (skippingSet != null) {
+ for (long id = beginInclusiveId; id < endExclusiveId; id++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ready to delete manifests in snapshot #" + id);
+ }
- for (long id = beginInclusiveId; id < endExclusiveId; id++) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ready to delete manifests in snapshot #" + id);
+ Snapshot snapshot;
+ try {
+ snapshot = snapshotManager.tryGetSnapshot(id);
+ } catch (FileNotFoundException e) {
+ beginInclusiveId = id + 1;
+ continue;
+ }
+ snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet);
}
+ }
+ // delete snapshot file finally
+ for (long id = beginInclusiveId; id < endExclusiveId; id++) {
Snapshot snapshot;
try {
snapshot = snapshotManager.tryGetSnapshot(id);
@@ -252,7 +262,6 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots
{
beginInclusiveId = id + 1;
continue;
}
- snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet);
if (expireConfig.isChangelogDecoupled()) {
commitChangelog(new Changelog(snapshot));
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 3e09b5a887..090bc58f35 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -152,6 +152,10 @@ public class TestFileStore extends KeyValueFileStore {
return new SchemaManager(FileIOFinder.find(new Path(root)),
options.path());
}
+ public FileIO fileIO() {
+ return fileIO;
+ }
+
public AbstractFileStoreWrite<KeyValue> newWrite() {
return super.newWrite(commitUser);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index cdc71dcf8f..3164ac96d0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -629,6 +629,34 @@ public class ExpireSnapshotsTest {
store.assertCleaned();
}
+ @Test
+ public void testManifestFileSkippingSetFileNotFoundException() throws
Exception {
+ List<KeyValue> allData = new ArrayList<>();
+ List<Integer> snapshotPositions = new ArrayList<>();
+ commit(10, allData, snapshotPositions);
+
+ Snapshot snapshot2 = snapshotManager.snapshot(2);
+ TagManager tagManager = store.newTagManager();
+ tagManager.createTag(snapshot2, "tag2", null, Collections.emptyList(),
false);
+
+ // delete manifest list file for tag2 to cause FileNotFoundException
+ Path toDelete =
+
store.pathFactory().manifestListFactory().toPath(snapshot2.baseManifestList());
+ fileIO.deleteQuietly(toDelete);
+
+ ExpireConfig config =
+ ExpireConfig.builder()
+ .snapshotRetainMin(1)
+ .snapshotRetainMax(1)
+ .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
+ .build();
+
+ store.newExpire(config).expire();
+
+ int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
+ assertSnapshot(latestSnapshotId, allData, snapshotPositions);
+ }
+
private TestFileStore createStore() {
ThreadLocalRandom random = ThreadLocalRandom.current();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 2bb472b128..537a0ef774 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -27,6 +27,8 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
@@ -38,6 +40,7 @@ import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.ExpireSnapshotsImpl;
import org.apache.paimon.types.RowType;
@@ -53,20 +56,24 @@ import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.SNAPSHOT_CLEAN_EMPTY_DIRECTORIES;
import static
org.apache.paimon.operation.FileStoreCommitImpl.mustConflictCheck;
+import static
org.apache.paimon.operation.FileStoreTestUtils.assertNFilesExists;
import static org.apache.paimon.operation.FileStoreTestUtils.assertPathExists;
import static
org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
@@ -702,6 +709,130 @@ public class FileDeletionTest {
assertPathExists(fileIO, pathFactory.bucketPath(partition, 1));
}
+ @Test
+ public void testDataFileSkippingSetException() throws Exception {
+ TestFileStore store =
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 2);
+ tagManager = new TagManager(fileIO, store.options().path());
+ SnapshotManager snapshotManager = store.snapshotManager();
+ ChangelogManager changelogManager = store.changelogManager();
+ TestKeyValueGenerator gen =
+ new
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+ BinaryRow partition = gen.getPartition(gen.next());
+
+ // step 1: commit A to bucket 0
+ Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new
HashMap<>();
+ List<KeyValue> kvs = partitionedData(5, gen);
+ writeData(store, kvs, partition, 0, writers);
+ commitData(store, commitIdentifier++, writers);
+
+ // step 2: commit B to bucket 1 and create tag2
+ writers.clear();
+ kvs = partitionedData(5, gen);
+ writeData(store, kvs, partition, 1, writers);
+ commitData(store, commitIdentifier++, writers);
+ createTag(snapshotManager.snapshot(2), "tag2",
store.options().tagDefaultTimeRetained());
+
+ // step 3: commit -B
+ cleanBucket(store, partition, 1);
+
+ // step 4: commit -A
+ cleanBucket(store, partition, 0);
+
+ // action: expire snapshot 1-3 and throw artificial exception when
constructing skipping set
+ // from tag2
+ // result: exist A & B (because of tag2)
+ TestSnapshotDeletion snapshotDeletion =
+ new TestSnapshotDeletion(
+ store.fileIO(),
+ store.pathFactory(),
+ store.manifestFileFactory().create(),
+ store.manifestListFactory().create(),
+ store.newIndexFileHandler(),
+ store.newStatsFileHandler(),
+ store.options().changelogProducer() !=
CoreOptions.ChangelogProducer.NONE,
+ store.options().cleanEmptyDirectories(),
+ store.options().deleteFileThreadNum());
+
+ ExpireSnapshots expireSnapshots =
+ new ExpireSnapshotsImpl(
+ snapshotManager, changelogManager, snapshotDeletion,
tagManager);
+ snapshotDeletion.readMergedDataFilesThrowException = true;
+ expireSnapshots
+ .config(
+ ExpireConfig.builder()
+ .snapshotRetainMax(1)
+ .snapshotRetainMin(1)
+
.snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
+ .build())
+ .expire();
+
+ FileStorePathFactory pathFactory = store.pathFactory();
+ assertNFilesExists(fileIO, pathFactory.bucketPath(partition, 0), 1);
+ assertNFilesExists(fileIO, pathFactory.bucketPath(partition, 1), 1);
+ }
+
+ @Test
+ public void testManifestFileSkippingSetException() throws Exception {
+ TestFileStore store =
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 2);
+ tagManager = new TagManager(fileIO, store.options().path());
+ SnapshotManager snapshotManager = store.snapshotManager();
+ ChangelogManager changelogManager = store.changelogManager();
+ TestKeyValueGenerator gen =
+ new
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+ BinaryRow partition = gen.getPartition(gen.next());
+
+ // step 1: commit A to bucket 0
+ Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new
HashMap<>();
+ List<KeyValue> kvs = partitionedData(5, gen);
+ writeData(store, kvs, partition, 0, writers);
+ commitData(store, commitIdentifier++, writers);
+
+ // step 2: commit B to bucket 1 and create tag2
+ writers.clear();
+ kvs = partitionedData(5, gen);
+ writeData(store, kvs, partition, 1, writers);
+ commitData(store, commitIdentifier++, writers);
+ createTag(snapshotManager.snapshot(2), "tag2",
store.options().tagDefaultTimeRetained());
+
+ // step 3: commit -B
+ cleanBucket(store, partition, 1);
+
+ // step 4: commit -A
+ cleanBucket(store, partition, 0);
+
+ Path manifestPath = store.pathFactory().manifestPath();
+ int manifestFileNum = fileIO.listStatus(manifestPath).length;
+
+ // action: expire snapshot 1-3 and throw artificial exception when
constructing manifest
+ // skipping set
+ // result: no manifests were deleted (manifestFileNum)
+ TestSnapshotDeletion snapshotDeletion =
+ new TestSnapshotDeletion(
+ store.fileIO(),
+ store.pathFactory(),
+ store.manifestFileFactory().create(),
+ store.manifestListFactory().create(),
+ store.newIndexFileHandler(),
+ store.newStatsFileHandler(),
+ store.options().changelogProducer() !=
CoreOptions.ChangelogProducer.NONE,
+ store.options().cleanEmptyDirectories(),
+ store.options().deleteFileThreadNum());
+ ExpireSnapshots expireSnapshots =
+ new ExpireSnapshotsImpl(
+ snapshotManager, changelogManager, snapshotDeletion,
tagManager);
+ snapshotDeletion.manifestSkippingSetThrowException = true;
+ expireSnapshots
+ .config(
+ ExpireConfig.builder()
+ .snapshotRetainMax(1)
+ .snapshotRetainMin(1)
+
.snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
+ .build())
+ .expire();
+
+ assertNFilesExists(fileIO, manifestPath, manifestFileNum);
+ }
+
private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode
mode) throws Exception {
return createStore(mode, 2);
}
@@ -808,4 +939,49 @@ public class FileDeletionTest {
private void createTag(Snapshot snapshot, String tagName, Duration
timeRetained) {
tagManager.createTag(snapshot, tagName, timeRetained,
Collections.emptyList(), false);
}
+
+ private static class TestSnapshotDeletion extends SnapshotDeletion {
+
+ private boolean readMergedDataFilesThrowException = false;
+ private boolean manifestSkippingSetThrowException = false;
+
+ public TestSnapshotDeletion(
+ FileIO fileIO,
+ FileStorePathFactory pathFactory,
+ ManifestFile manifestFile,
+ ManifestList manifestList,
+ IndexFileHandler indexFileHandler,
+ StatsFileHandler statsFileHandler,
+ boolean produceChangelog,
+ boolean cleanEmptyDirectories,
+ int deleteFileThreadNum) {
+ super(
+ fileIO,
+ pathFactory,
+ manifestFile,
+ manifestList,
+ indexFileHandler,
+ statsFileHandler,
+ produceChangelog,
+ cleanEmptyDirectories,
+ deleteFileThreadNum);
+ }
+
+ @Override
+ protected Collection<ExpireFileEntry>
readMergedDataFiles(List<ManifestFileMeta> manifests)
+ throws IOException {
+ if (readMergedDataFilesThrowException) {
+ throw new IOException("Throwing exception for test purpose.");
+ }
+ return super.readMergedDataFiles(manifests);
+ }
+
+ @Override
+ public Set<String> manifestSkippingSet(List<Snapshot>
skippingSnapshots) {
+ if (manifestSkippingSetThrowException) {
+ throw new RuntimeException("Throwing exception for test
purpose.");
+ }
+ return super.manifestSkippingSet(skippingSnapshots);
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
index deab3a9703..768ccfe290 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
@@ -62,6 +62,10 @@ public class FileStoreTestUtils {
assertThat(fileIO.exists(path)).isFalse();
}
+ public static void assertNFilesExists(FileIO fileIO, Path path, int num)
throws IOException {
+ assertThat(fileIO.listStatus(path)).hasSize(num);
+ }
+
//
--------------------------------------------------------------------------------
// writeData & commitData are copied from TestFileStore#commitDataImpl and
modified
//
--------------------------------------------------------------------------------