This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new da4d64b4b [core] Remove FileStoreScan.withManifestList and fix 
unstable test (#4552)
da4d64b4b is described below

commit da4d64b4b6afd8685447aed6227e2ca67cfba8b1
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 20 13:00:35 2024 +0800

    [core] Remove FileStoreScan.withManifestList and fix unstable test (#4552)
---
 .../paimon/operation/AbstractFileStoreScan.java    |  15 ---
 .../org/apache/paimon/operation/FileStoreScan.java |   2 -
 .../test/java/org/apache/paimon/TestFileStore.java | 104 ++++++++++++---------
 .../paimon/operation/ExpireSnapshotsTest.java      |   3 +-
 .../operation/KeyValueFileStoreScanTest.java       |  25 -----
 .../paimon/table/FileStoreTableTestBase.java       |   4 +-
 6 files changed, 62 insertions(+), 91 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 98e064451..c73a92062 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -63,7 +63,6 @@ import static 
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
 import static 
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkState;
 import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
 
 /** Default implementation of {@link FileStoreScan}. */
@@ -81,7 +80,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private Snapshot specifiedSnapshot = null;
     private Filter<Integer> bucketFilter = null;
     private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
-    private List<ManifestFileMeta> specifiedManifests = null;
     protected ScanMode scanMode = ScanMode.ALL;
     private Filter<Integer> levelFilter = null;
     private Filter<ManifestEntry> manifestEntryFilter = null;
@@ -161,25 +159,16 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     @Override
     public FileStoreScan withSnapshot(long snapshotId) {
-        checkState(specifiedManifests == null, "Cannot set both snapshot and 
manifests.");
         this.specifiedSnapshot = snapshotManager.snapshot(snapshotId);
         return this;
     }
 
     @Override
     public FileStoreScan withSnapshot(Snapshot snapshot) {
-        checkState(specifiedManifests == null, "Cannot set both snapshot and 
manifests.");
         this.specifiedSnapshot = snapshot;
         return this;
     }
 
-    @Override
-    public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
-        checkState(specifiedSnapshot == null, "Cannot set both snapshot and 
manifests.");
-        this.specifiedManifests = manifests;
-        return this;
-    }
-
     @Override
     public FileStoreScan withKind(ScanMode scanMode) {
         this.scanMode = scanMode;
@@ -401,10 +390,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     }
 
     private ManifestsReader.Result readManifests() {
-        if (specifiedManifests != null) {
-            return new ManifestsReader.Result(null, specifiedManifests, 
specifiedManifests);
-        }
-
         return manifestsReader.read(specifiedSnapshot, scanMode);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 7663f4822..179d16de6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -67,8 +67,6 @@ public interface FileStoreScan {
 
     FileStoreScan withSnapshot(Snapshot snapshot);
 
-    FileStoreScan withManifestList(List<ManifestFileMeta> manifests);
-
     FileStoreScan withKind(ScanMode scanMode);
 
     FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 5218a515a..0d8ea5f4a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -26,10 +26,12 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
@@ -38,7 +40,6 @@ import 
org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.operation.AbstractFileStoreWrite;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.FileStoreCommitImpl;
-import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.options.ExpireConfig;
 import org.apache.paimon.options.MemorySize;
@@ -561,29 +562,41 @@ public class TestFileStore extends KeyValueFileStore {
         return getFilesInUse(
                 snapshotId,
                 snapshotManager(),
-                newScan(),
                 fileIO,
                 pathFactory(),
-                manifestListFactory().create());
+                manifestListFactory().create(),
+                manifestFileFactory().create());
     }
 
     public static Set<Path> getFilesInUse(
             long snapshotId,
             SnapshotManager snapshotManager,
-            FileStoreScan scan,
             FileIO fileIO,
             FileStorePathFactory pathFactory,
-            ManifestList manifestList) {
+            ManifestList manifestList,
+            ManifestFile manifestFile) {
         Set<Path> result = new HashSet<>();
 
         if (snapshotManager.snapshotExists(snapshotId)) {
-            result.addAll(
+            Set<Path> files =
                     getSnapshotFileInUse(
-                            snapshotId, snapshotManager, scan, fileIO, 
pathFactory, manifestList));
+                            snapshotId,
+                            snapshotManager,
+                            fileIO,
+                            pathFactory,
+                            manifestList,
+                            manifestFile);
+            result.addAll(files);
         } else if (snapshotManager.longLivedChangelogExists(snapshotId)) {
-            result.addAll(
+            Set<Path> files =
                     getChangelogFileInUse(
-                            snapshotId, snapshotManager, scan, fileIO, 
pathFactory, manifestList));
+                            snapshotId,
+                            snapshotManager,
+                            fileIO,
+                            pathFactory,
+                            manifestList,
+                            manifestFile);
+            result.addAll(files);
         } else {
             throw new RuntimeException(
                     String.format("The snapshot %s does not exist.", 
snapshotId));
@@ -595,10 +608,10 @@ public class TestFileStore extends KeyValueFileStore {
     private static Set<Path> getSnapshotFileInUse(
             long snapshotId,
             SnapshotManager snapshotManager,
-            FileStoreScan scan,
             FileIO fileIO,
             FileStorePathFactory pathFactory,
-            ManifestList manifestList) {
+            ManifestList manifestList,
+            ManifestFile manifestFile) {
         Set<Path> result = new HashSet<>();
         SchemaManager schemaManager = new SchemaManager(fileIO, 
snapshotManager.tablePath());
         CoreOptions options = new 
CoreOptions(schemaManager.latest().get().options());
@@ -625,7 +638,11 @@ public class TestFileStore extends KeyValueFileStore {
         manifests.forEach(m -> 
result.add(pathFactory.toManifestFilePath(m.fileName())));
 
         // data file
-        List<ManifestEntry> entries = 
scan.withManifestList(manifests).plan().files();
+        List<ManifestEntry> entries =
+                manifests.stream()
+                        .flatMap(m -> manifestFile.read(m.fileName()).stream())
+                        .collect(Collectors.toList());
+        entries = new ArrayList<>(FileEntry.mergeEntries(entries));
         for (ManifestEntry entry : entries) {
             result.add(
                     new Path(
@@ -641,7 +658,9 @@ public class TestFileStore extends KeyValueFileStore {
         // use list.
         if (changelogDecoupled && !produceChangelog) {
             entries =
-                    
scan.withManifestList(manifestList.readDeltaManifests(snapshot)).plan().files();
+                    manifestList.readDeltaManifests(snapshot).stream()
+                            .flatMap(m -> 
manifestFile.read(m.fileName()).stream())
+                            .collect(Collectors.toList());
             for (ManifestEntry entry : entries) {
                 // append delete file are delayed to delete
                 if (entry.kind() == FileKind.DELETE
@@ -661,15 +680,13 @@ public class TestFileStore extends KeyValueFileStore {
     private static Set<Path> getChangelogFileInUse(
             long changelogId,
             SnapshotManager snapshotManager,
-            FileStoreScan scan,
             FileIO fileIO,
             FileStorePathFactory pathFactory,
-            ManifestList manifestList) {
+            ManifestList manifestList,
+            ManifestFile manifestFile) {
         Set<Path> result = new HashSet<>();
         SchemaManager schemaManager = new SchemaManager(fileIO, 
snapshotManager.tablePath());
         CoreOptions options = new 
CoreOptions(schemaManager.latest().get().options());
-        boolean produceChangelog =
-                options.changelogProducer() != 
CoreOptions.ChangelogProducer.NONE;
 
         Path changelogPath = 
snapshotManager.longLivedChangelogPath(changelogId);
         Changelog changelog = Changelog.fromPath(fileIO, changelogPath);
@@ -677,35 +694,27 @@ public class TestFileStore extends KeyValueFileStore {
         // changelog file
         result.add(changelogPath);
 
-        // manifest lists
-        if (!produceChangelog) {
-            
result.add(pathFactory.toManifestListPath(changelog.baseManifestList()));
-            
result.add(pathFactory.toManifestListPath(changelog.deltaManifestList()));
-        }
-        if (changelog.changelogManifestList() != null) {
-            
result.add(pathFactory.toManifestListPath(changelog.changelogManifestList()));
-        }
-
-        // manifests
-        List<ManifestFileMeta> manifests =
-                new 
ArrayList<>(manifestList.readChangelogManifests(changelog));
-        if (!produceChangelog) {
-            manifests.addAll(manifestList.readDataManifests(changelog));
-        }
-
-        manifests.forEach(m -> 
result.add(pathFactory.toManifestFilePath(m.fileName())));
-
         // data file
         // not all manifests contains useful data file
         // (1) produceChangelog = 'true': data file in changelog manifests
         // (2) produceChangelog = 'false': 'APPEND' data file in delta 
manifests
 
         // delta file
-        if (!produceChangelog) {
-            for (ManifestEntry entry :
-                    
scan.withManifestList(manifestList.readDeltaManifests(changelog))
-                            .plan()
-                            .files()) {
+        if (options.changelogProducer() == CoreOptions.ChangelogProducer.NONE) 
{
+            // TODO why we need to keep base manifests?
+            
result.add(pathFactory.toManifestListPath(changelog.baseManifestList()));
+            manifestList
+                    .readDataManifests(changelog)
+                    .forEach(m -> 
result.add(pathFactory.toManifestFilePath(m.fileName())));
+
+            
result.add(pathFactory.toManifestListPath(changelog.deltaManifestList()));
+            List<ManifestFileMeta> manifests = 
manifestList.readDeltaManifests(changelog);
+            manifests.forEach(m -> 
result.add(pathFactory.toManifestFilePath(m.fileName())));
+            List<ManifestEntry> files =
+                    manifests.stream()
+                            .flatMap(m -> 
manifestFile.read(m.fileName()).stream())
+                            .collect(Collectors.toList());
+            for (ManifestEntry entry : files) {
                 if (entry.file().fileSource().orElse(FileSource.APPEND) == 
FileSource.APPEND) {
                     result.add(
                             new Path(
@@ -713,12 +722,15 @@ public class TestFileStore extends KeyValueFileStore {
                                     entry.file().fileName()));
                 }
             }
-        } else {
-            // changelog
-            for (ManifestEntry entry :
-                    
scan.withManifestList(manifestList.readChangelogManifests(changelog))
-                            .plan()
-                            .files()) {
+        } else if (changelog.changelogManifestList() != null) {
+            
result.add(pathFactory.toManifestListPath(changelog.changelogManifestList()));
+            List<ManifestFileMeta> manifests = 
manifestList.readChangelogManifests(changelog);
+            manifests.forEach(m -> 
result.add(pathFactory.toManifestFilePath(m.fileName())));
+            List<ManifestEntry> files =
+                    manifests.stream()
+                            .flatMap(m -> 
manifestFile.read(m.fileName()).stream())
+                            .collect(Collectors.toList());
+            for (ManifestEntry entry : files) {
                 result.add(
                         new Path(
                                 pathFactory.bucketPath(entry.partition(), 
entry.bucket()),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 739d4b6bd..96dce3d78 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -44,6 +44,7 @@ import org.apache.paimon.utils.TagManager;
 
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -451,7 +452,7 @@ public class ExpireSnapshotsTest {
         store.assertCleaned();
     }
 
-    @Test
+    @RepeatedTest(5)
     public void testChangelogOutLivedSnapshot() throws Exception {
         List<KeyValue> allData = new ArrayList<>();
         List<Integer> snapshotPositions = new ArrayList<>();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index 2fd8c10cd..4f3d5c1c2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -26,8 +26,6 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.ManifestFileMeta;
-import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
@@ -252,29 +250,6 @@ public class KeyValueFileStoreScanTest {
         runTestExactMatch(scan, wantedSnapshot, expected);
     }
 
-    @Test
-    public void testWithManifestList() throws Exception {
-        ThreadLocalRandom random = ThreadLocalRandom.current();
-        int numCommits = random.nextInt(10) + 1;
-        for (int i = 0; i < numCommits; i++) {
-            List<KeyValue> data = generateData(random.nextInt(100) + 1);
-            writeData(data);
-        }
-
-        ManifestList manifestList = store.manifestListFactory().create();
-        long wantedSnapshotId = 
random.nextLong(snapshotManager.latestSnapshotId()) + 1;
-        Snapshot wantedSnapshot = snapshotManager.snapshot(wantedSnapshotId);
-        List<ManifestFileMeta> wantedManifests = 
manifestList.readDataManifests(wantedSnapshot);
-
-        FileStoreScan scan = store.newScan();
-        scan.withManifestList(wantedManifests);
-
-        List<KeyValue> expectedKvs = 
store.readKvsFromSnapshot(wantedSnapshotId);
-        gen.sort(expectedKvs);
-        Map<BinaryRow, BinaryRow> expected = store.toKvMap(expectedKvs);
-        runTestExactMatch(scan, null, expected);
-    }
-
     @Test
     public void testDropStatsInPlan() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index f6343bfe4..4d8408955 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -1473,10 +1473,10 @@ public abstract class FileStoreTableTestBase {
                 TestFileStore.getFilesInUse(
                         latestSnapshotId,
                         snapshotManager,
-                        store.newScan(),
                         table.fileIO(),
                         store.pathFactory(),
-                        store.manifestListFactory().create());
+                        store.manifestListFactory().create(),
+                        store.manifestFileFactory().create());
 
         List<Path> unusedFileList =
                 Files.walk(Paths.get(tempDir.toString()))

Reply via email to