This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c91df1613 [core][test] Test that snapshot deletion stops when 
throwing exception for skipping set (#5438)
5c91df1613 is described below

commit 5c91df1613f5f6a53686c04c636d2109561bbcfb
Author: yuzelin <[email protected]>
AuthorDate: Fri Apr 11 14:35:15 2025 +0800

    [core][test] Test that snapshot deletion stops when throwing exception for 
skipping set (#5438)
---
 .../apache/paimon/table/ExpireSnapshotsImpl.java   |  29 ++--
 .../test/java/org/apache/paimon/TestFileStore.java |   4 +
 .../paimon/operation/ExpireSnapshotsTest.java      |  28 ++++
 .../apache/paimon/operation/FileDeletionTest.java  | 176 +++++++++++++++++++++
 .../paimon/operation/FileStoreTestUtils.java       |   4 +
 5 files changed, 231 insertions(+), 10 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index 7c63ab7e54..4e6a1f87da 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -230,21 +230,31 @@ public class ExpireSnapshotsImpl implements 
ExpireSnapshots {
             return 0;
         }
 
-        Set<String> skippingSet = new HashSet<>();
+        Set<String> skippingSet = null;
         try {
-            
skippingSet.addAll(snapshotDeletion.manifestSkippingSet(skippingSnapshots));
+            skippingSet = new 
HashSet<>(snapshotDeletion.manifestSkippingSet(skippingSnapshots));
         } catch (Exception e) {
-            // maybe snapshot been deleted by other jobs.
-            if (e.getCause() == null || !(e.getCause() instanceof 
FileNotFoundException)) {
-                throw e;
-            }
+            LOG.info("Skip cleaning manifest files due to failed to build 
skipping set.", e);
         }
+        if (skippingSet != null) {
+            for (long id = beginInclusiveId; id < endExclusiveId; id++) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ready to delete manifests in snapshot #" + id);
+                }
 
-        for (long id = beginInclusiveId; id < endExclusiveId; id++) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Ready to delete manifests in snapshot #" + id);
+                Snapshot snapshot;
+                try {
+                    snapshot = snapshotManager.tryGetSnapshot(id);
+                } catch (FileNotFoundException e) {
+                    beginInclusiveId = id + 1;
+                    continue;
+                }
+                snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet);
             }
+        }
 
+        // delete snapshot file finally
+        for (long id = beginInclusiveId; id < endExclusiveId; id++) {
             Snapshot snapshot;
             try {
                 snapshot = snapshotManager.tryGetSnapshot(id);
@@ -252,7 +262,6 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots 
{
                 beginInclusiveId = id + 1;
                 continue;
             }
-            snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet);
             if (expireConfig.isChangelogDecoupled()) {
                 commitChangelog(new Changelog(snapshot));
             }
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 3e09b5a887..090bc58f35 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -152,6 +152,10 @@ public class TestFileStore extends KeyValueFileStore {
         return new SchemaManager(FileIOFinder.find(new Path(root)), 
options.path());
     }
 
+    public FileIO fileIO() {
+        return fileIO;
+    }
+
     public AbstractFileStoreWrite<KeyValue> newWrite() {
         return super.newWrite(commitUser);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index cdc71dcf8f..3164ac96d0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -629,6 +629,34 @@ public class ExpireSnapshotsTest {
         store.assertCleaned();
     }
 
+    @Test
+    public void testManifestFileSkippingSetFileNotFoundException() throws 
Exception {
+        List<KeyValue> allData = new ArrayList<>();
+        List<Integer> snapshotPositions = new ArrayList<>();
+        commit(10, allData, snapshotPositions);
+
+        Snapshot snapshot2 = snapshotManager.snapshot(2);
+        TagManager tagManager = store.newTagManager();
+        tagManager.createTag(snapshot2, "tag2", null, Collections.emptyList(), 
false);
+
+        // delete manifest list file for tag2 to cause FileNotFoundException
+        Path toDelete =
+                
store.pathFactory().manifestListFactory().toPath(snapshot2.baseManifestList());
+        fileIO.deleteQuietly(toDelete);
+
+        ExpireConfig config =
+                ExpireConfig.builder()
+                        .snapshotRetainMin(1)
+                        .snapshotRetainMax(1)
+                        .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
+                        .build();
+
+        store.newExpire(config).expire();
+
+        int latestSnapshotId = snapshotManager.latestSnapshotId().intValue();
+        assertSnapshot(latestSnapshotId, allData, snapshotPositions);
+    }
+
     private TestFileStore createStore() {
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 2bb472b128..537a0ef774 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -27,6 +27,8 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.ExpireFileEntry;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
@@ -38,6 +40,7 @@ import org.apache.paimon.options.ExpireConfig;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.ExpireSnapshots;
 import org.apache.paimon.table.ExpireSnapshotsImpl;
 import org.apache.paimon.types.RowType;
@@ -53,20 +56,24 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.SNAPSHOT_CLEAN_EMPTY_DIRECTORIES;
 import static 
org.apache.paimon.operation.FileStoreCommitImpl.mustConflictCheck;
+import static 
org.apache.paimon.operation.FileStoreTestUtils.assertNFilesExists;
 import static org.apache.paimon.operation.FileStoreTestUtils.assertPathExists;
 import static 
org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
 import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
@@ -702,6 +709,130 @@ public class FileDeletionTest {
         assertPathExists(fileIO, pathFactory.bucketPath(partition, 1));
     }
 
+    @Test
+    public void testDataFileSkippingSetException() throws Exception {
+        TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 2);
+        tagManager = new TagManager(fileIO, store.options().path());
+        SnapshotManager snapshotManager = store.snapshotManager();
+        ChangelogManager changelogManager = store.changelogManager();
+        TestKeyValueGenerator gen =
+                new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+        BinaryRow partition = gen.getPartition(gen.next());
+
+        // step 1: commit A to bucket 0
+        Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new 
HashMap<>();
+        List<KeyValue> kvs = partitionedData(5, gen);
+        writeData(store, kvs, partition, 0, writers);
+        commitData(store, commitIdentifier++, writers);
+
+        // step 2: commit B to bucket 1 and create tag2
+        writers.clear();
+        kvs = partitionedData(5, gen);
+        writeData(store, kvs, partition, 1, writers);
+        commitData(store, commitIdentifier++, writers);
+        createTag(snapshotManager.snapshot(2), "tag2", 
store.options().tagDefaultTimeRetained());
+
+        // step 3: commit -B
+        cleanBucket(store, partition, 1);
+
+        // step 4: commit -A
+        cleanBucket(store, partition, 0);
+
+        // action: expire snapshot 1-3 and throw artificial exception when 
constructing skipping set
+        // from tag2
+        // result: exist A & B (because of tag2)
+        TestSnapshotDeletion snapshotDeletion =
+                new TestSnapshotDeletion(
+                        store.fileIO(),
+                        store.pathFactory(),
+                        store.manifestFileFactory().create(),
+                        store.manifestListFactory().create(),
+                        store.newIndexFileHandler(),
+                        store.newStatsFileHandler(),
+                        store.options().changelogProducer() != 
CoreOptions.ChangelogProducer.NONE,
+                        store.options().cleanEmptyDirectories(),
+                        store.options().deleteFileThreadNum());
+
+        ExpireSnapshots expireSnapshots =
+                new ExpireSnapshotsImpl(
+                        snapshotManager, changelogManager, snapshotDeletion, 
tagManager);
+        snapshotDeletion.readMergedDataFilesThrowException = true;
+        expireSnapshots
+                .config(
+                        ExpireConfig.builder()
+                                .snapshotRetainMax(1)
+                                .snapshotRetainMin(1)
+                                
.snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
+                                .build())
+                .expire();
+
+        FileStorePathFactory pathFactory = store.pathFactory();
+        assertNFilesExists(fileIO, pathFactory.bucketPath(partition, 0), 1);
+        assertNFilesExists(fileIO, pathFactory.bucketPath(partition, 1), 1);
+    }
+
+    @Test
+    public void testManifestFileSkippingSetException() throws Exception {
+        TestFileStore store = 
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED, 2);
+        tagManager = new TagManager(fileIO, store.options().path());
+        SnapshotManager snapshotManager = store.snapshotManager();
+        ChangelogManager changelogManager = store.changelogManager();
+        TestKeyValueGenerator gen =
+                new 
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+        BinaryRow partition = gen.getPartition(gen.next());
+
+        // step 1: commit A to bucket 0
+        Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new 
HashMap<>();
+        List<KeyValue> kvs = partitionedData(5, gen);
+        writeData(store, kvs, partition, 0, writers);
+        commitData(store, commitIdentifier++, writers);
+
+        // step 2: commit B to bucket 1 and create tag2
+        writers.clear();
+        kvs = partitionedData(5, gen);
+        writeData(store, kvs, partition, 1, writers);
+        commitData(store, commitIdentifier++, writers);
+        createTag(snapshotManager.snapshot(2), "tag2", 
store.options().tagDefaultTimeRetained());
+
+        // step 3: commit -B
+        cleanBucket(store, partition, 1);
+
+        // step 4: commit -A
+        cleanBucket(store, partition, 0);
+
+        Path manifestPath = store.pathFactory().manifestPath();
+        int manifestFileNum = fileIO.listStatus(manifestPath).length;
+
+        // action: expire snapshot 1-3 and throw artificial exception when 
constructing manifest
+        // skipping set
+        // result: no manifests were deleted (manifestFileNum)
+        TestSnapshotDeletion snapshotDeletion =
+                new TestSnapshotDeletion(
+                        store.fileIO(),
+                        store.pathFactory(),
+                        store.manifestFileFactory().create(),
+                        store.manifestListFactory().create(),
+                        store.newIndexFileHandler(),
+                        store.newStatsFileHandler(),
+                        store.options().changelogProducer() != 
CoreOptions.ChangelogProducer.NONE,
+                        store.options().cleanEmptyDirectories(),
+                        store.options().deleteFileThreadNum());
+        ExpireSnapshots expireSnapshots =
+                new ExpireSnapshotsImpl(
+                        snapshotManager, changelogManager, snapshotDeletion, 
tagManager);
+        snapshotDeletion.manifestSkippingSetThrowException = true;
+        expireSnapshots
+                .config(
+                        ExpireConfig.builder()
+                                .snapshotRetainMax(1)
+                                .snapshotRetainMin(1)
+                                
.snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
+                                .build())
+                .expire();
+
+        assertNFilesExists(fileIO, manifestPath, manifestFileNum);
+    }
+
     private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode 
mode) throws Exception {
         return createStore(mode, 2);
     }
@@ -808,4 +939,49 @@ public class FileDeletionTest {
     private void createTag(Snapshot snapshot, String tagName, Duration 
timeRetained) {
         tagManager.createTag(snapshot, tagName, timeRetained, 
Collections.emptyList(), false);
     }
+
+    private static class TestSnapshotDeletion extends SnapshotDeletion {
+
+        private boolean readMergedDataFilesThrowException = false;
+        private boolean manifestSkippingSetThrowException = false;
+
+        public TestSnapshotDeletion(
+                FileIO fileIO,
+                FileStorePathFactory pathFactory,
+                ManifestFile manifestFile,
+                ManifestList manifestList,
+                IndexFileHandler indexFileHandler,
+                StatsFileHandler statsFileHandler,
+                boolean produceChangelog,
+                boolean cleanEmptyDirectories,
+                int deleteFileThreadNum) {
+            super(
+                    fileIO,
+                    pathFactory,
+                    manifestFile,
+                    manifestList,
+                    indexFileHandler,
+                    statsFileHandler,
+                    produceChangelog,
+                    cleanEmptyDirectories,
+                    deleteFileThreadNum);
+        }
+
+        @Override
+        protected Collection<ExpireFileEntry> 
readMergedDataFiles(List<ManifestFileMeta> manifests)
+                throws IOException {
+            if (readMergedDataFilesThrowException) {
+                throw new IOException("Throwing exception for test purpose.");
+            }
+            return super.readMergedDataFiles(manifests);
+        }
+
+        @Override
+        public Set<String> manifestSkippingSet(List<Snapshot> 
skippingSnapshots) {
+            if (manifestSkippingSetThrowException) {
+                throw new RuntimeException("Throwing exception for test 
purpose.");
+            }
+            return super.manifestSkippingSet(skippingSnapshots);
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
index deab3a9703..768ccfe290 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreTestUtils.java
@@ -62,6 +62,10 @@ public class FileStoreTestUtils {
         assertThat(fileIO.exists(path)).isFalse();
     }
 
+    public static void assertNFilesExists(FileIO fileIO, Path path, int num) 
throws IOException {
+        assertThat(fileIO.listStatus(path)).hasSize(num);
+    }
+
     // 
--------------------------------------------------------------------------------
     // writeData & commitData are copied from TestFileStore#commitDataImpl and 
modified
     // 
--------------------------------------------------------------------------------

Reply via email to