This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new da4d64b4b [core] Remove FileStoreScan.withManifestList and fix
unstable test (#4552)
da4d64b4b is described below
commit da4d64b4b6afd8685447aed6227e2ca67cfba8b1
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 20 13:00:35 2024 +0800
[core] Remove FileStoreScan.withManifestList and fix unstable test (#4552)
---
.../paimon/operation/AbstractFileStoreScan.java | 15 ---
.../org/apache/paimon/operation/FileStoreScan.java | 2 -
.../test/java/org/apache/paimon/TestFileStore.java | 104 ++++++++++++---------
.../paimon/operation/ExpireSnapshotsTest.java | 3 +-
.../operation/KeyValueFileStoreScanTest.java | 25 -----
.../paimon/table/FileStoreTableTestBase.java | 4 +-
6 files changed, 62 insertions(+), 91 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 98e064451..c73a92062 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -63,7 +63,6 @@ import static
org.apache.paimon.utils.ManifestReadThreadPool.getExecutorService;
import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkState;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute;
/** Default implementation of {@link FileStoreScan}. */
@@ -81,7 +80,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
private Snapshot specifiedSnapshot = null;
private Filter<Integer> bucketFilter = null;
private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
- private List<ManifestFileMeta> specifiedManifests = null;
protected ScanMode scanMode = ScanMode.ALL;
private Filter<Integer> levelFilter = null;
private Filter<ManifestEntry> manifestEntryFilter = null;
@@ -161,25 +159,16 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Override
public FileStoreScan withSnapshot(long snapshotId) {
- checkState(specifiedManifests == null, "Cannot set both snapshot and
manifests.");
this.specifiedSnapshot = snapshotManager.snapshot(snapshotId);
return this;
}
@Override
public FileStoreScan withSnapshot(Snapshot snapshot) {
- checkState(specifiedManifests == null, "Cannot set both snapshot and
manifests.");
this.specifiedSnapshot = snapshot;
return this;
}
- @Override
- public FileStoreScan withManifestList(List<ManifestFileMeta> manifests) {
- checkState(specifiedSnapshot == null, "Cannot set both snapshot and
manifests.");
- this.specifiedManifests = manifests;
- return this;
- }
-
@Override
public FileStoreScan withKind(ScanMode scanMode) {
this.scanMode = scanMode;
@@ -401,10 +390,6 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
}
private ManifestsReader.Result readManifests() {
- if (specifiedManifests != null) {
- return new ManifestsReader.Result(null, specifiedManifests,
specifiedManifests);
- }
-
return manifestsReader.read(specifiedSnapshot, scanMode);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 7663f4822..179d16de6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -67,8 +67,6 @@ public interface FileStoreScan {
FileStoreScan withSnapshot(Snapshot snapshot);
- FileStoreScan withManifestList(List<ManifestFileMeta> manifests);
-
FileStoreScan withKind(ScanMode scanMode);
FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 5218a515a..0d8ea5f4a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -26,10 +26,12 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.memory.HeapMemorySegmentPool;
@@ -38,7 +40,6 @@ import
org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.operation.AbstractFileStoreWrite;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreCommitImpl;
-import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.options.MemorySize;
@@ -561,29 +562,41 @@ public class TestFileStore extends KeyValueFileStore {
return getFilesInUse(
snapshotId,
snapshotManager(),
- newScan(),
fileIO,
pathFactory(),
- manifestListFactory().create());
+ manifestListFactory().create(),
+ manifestFileFactory().create());
}
public static Set<Path> getFilesInUse(
long snapshotId,
SnapshotManager snapshotManager,
- FileStoreScan scan,
FileIO fileIO,
FileStorePathFactory pathFactory,
- ManifestList manifestList) {
+ ManifestList manifestList,
+ ManifestFile manifestFile) {
Set<Path> result = new HashSet<>();
if (snapshotManager.snapshotExists(snapshotId)) {
- result.addAll(
+ Set<Path> files =
getSnapshotFileInUse(
- snapshotId, snapshotManager, scan, fileIO,
pathFactory, manifestList));
+ snapshotId,
+ snapshotManager,
+ fileIO,
+ pathFactory,
+ manifestList,
+ manifestFile);
+ result.addAll(files);
} else if (snapshotManager.longLivedChangelogExists(snapshotId)) {
- result.addAll(
+ Set<Path> files =
getChangelogFileInUse(
- snapshotId, snapshotManager, scan, fileIO,
pathFactory, manifestList));
+ snapshotId,
+ snapshotManager,
+ fileIO,
+ pathFactory,
+ manifestList,
+ manifestFile);
+ result.addAll(files);
} else {
throw new RuntimeException(
String.format("The snapshot %s does not exist.",
snapshotId));
@@ -595,10 +608,10 @@ public class TestFileStore extends KeyValueFileStore {
private static Set<Path> getSnapshotFileInUse(
long snapshotId,
SnapshotManager snapshotManager,
- FileStoreScan scan,
FileIO fileIO,
FileStorePathFactory pathFactory,
- ManifestList manifestList) {
+ ManifestList manifestList,
+ ManifestFile manifestFile) {
Set<Path> result = new HashSet<>();
SchemaManager schemaManager = new SchemaManager(fileIO,
snapshotManager.tablePath());
CoreOptions options = new
CoreOptions(schemaManager.latest().get().options());
@@ -625,7 +638,11 @@ public class TestFileStore extends KeyValueFileStore {
manifests.forEach(m ->
result.add(pathFactory.toManifestFilePath(m.fileName())));
// data file
- List<ManifestEntry> entries =
scan.withManifestList(manifests).plan().files();
+ List<ManifestEntry> entries =
+ manifests.stream()
+ .flatMap(m -> manifestFile.read(m.fileName()).stream())
+ .collect(Collectors.toList());
+ entries = new ArrayList<>(FileEntry.mergeEntries(entries));
for (ManifestEntry entry : entries) {
result.add(
new Path(
@@ -641,7 +658,9 @@ public class TestFileStore extends KeyValueFileStore {
// use list.
if (changelogDecoupled && !produceChangelog) {
entries =
-
scan.withManifestList(manifestList.readDeltaManifests(snapshot)).plan().files();
+ manifestList.readDeltaManifests(snapshot).stream()
+ .flatMap(m ->
manifestFile.read(m.fileName()).stream())
+ .collect(Collectors.toList());
for (ManifestEntry entry : entries) {
// append delete file are delayed to delete
if (entry.kind() == FileKind.DELETE
@@ -661,15 +680,13 @@ public class TestFileStore extends KeyValueFileStore {
private static Set<Path> getChangelogFileInUse(
long changelogId,
SnapshotManager snapshotManager,
- FileStoreScan scan,
FileIO fileIO,
FileStorePathFactory pathFactory,
- ManifestList manifestList) {
+ ManifestList manifestList,
+ ManifestFile manifestFile) {
Set<Path> result = new HashSet<>();
SchemaManager schemaManager = new SchemaManager(fileIO,
snapshotManager.tablePath());
CoreOptions options = new
CoreOptions(schemaManager.latest().get().options());
- boolean produceChangelog =
- options.changelogProducer() !=
CoreOptions.ChangelogProducer.NONE;
Path changelogPath =
snapshotManager.longLivedChangelogPath(changelogId);
Changelog changelog = Changelog.fromPath(fileIO, changelogPath);
@@ -677,35 +694,27 @@ public class TestFileStore extends KeyValueFileStore {
// changelog file
result.add(changelogPath);
- // manifest lists
- if (!produceChangelog) {
-
result.add(pathFactory.toManifestListPath(changelog.baseManifestList()));
-
result.add(pathFactory.toManifestListPath(changelog.deltaManifestList()));
- }
- if (changelog.changelogManifestList() != null) {
-
result.add(pathFactory.toManifestListPath(changelog.changelogManifestList()));
- }
-
- // manifests
- List<ManifestFileMeta> manifests =
- new
ArrayList<>(manifestList.readChangelogManifests(changelog));
- if (!produceChangelog) {
- manifests.addAll(manifestList.readDataManifests(changelog));
- }
-
- manifests.forEach(m ->
result.add(pathFactory.toManifestFilePath(m.fileName())));
-
// data file
// not all manifests contains useful data file
// (1) produceChangelog = 'true': data file in changelog manifests
// (2) produceChangelog = 'false': 'APPEND' data file in delta
manifests
// delta file
- if (!produceChangelog) {
- for (ManifestEntry entry :
-
scan.withManifestList(manifestList.readDeltaManifests(changelog))
- .plan()
- .files()) {
+ if (options.changelogProducer() == CoreOptions.ChangelogProducer.NONE)
{
+ // TODO why we need to keep base manifests?
+
result.add(pathFactory.toManifestListPath(changelog.baseManifestList()));
+ manifestList
+ .readDataManifests(changelog)
+ .forEach(m ->
result.add(pathFactory.toManifestFilePath(m.fileName())));
+
+
result.add(pathFactory.toManifestListPath(changelog.deltaManifestList()));
+ List<ManifestFileMeta> manifests =
manifestList.readDeltaManifests(changelog);
+ manifests.forEach(m ->
result.add(pathFactory.toManifestFilePath(m.fileName())));
+ List<ManifestEntry> files =
+ manifests.stream()
+ .flatMap(m ->
manifestFile.read(m.fileName()).stream())
+ .collect(Collectors.toList());
+ for (ManifestEntry entry : files) {
if (entry.file().fileSource().orElse(FileSource.APPEND) ==
FileSource.APPEND) {
result.add(
new Path(
@@ -713,12 +722,15 @@ public class TestFileStore extends KeyValueFileStore {
entry.file().fileName()));
}
}
- } else {
- // changelog
- for (ManifestEntry entry :
-
scan.withManifestList(manifestList.readChangelogManifests(changelog))
- .plan()
- .files()) {
+ } else if (changelog.changelogManifestList() != null) {
+
result.add(pathFactory.toManifestListPath(changelog.changelogManifestList()));
+ List<ManifestFileMeta> manifests =
manifestList.readChangelogManifests(changelog);
+ manifests.forEach(m ->
result.add(pathFactory.toManifestFilePath(m.fileName())));
+ List<ManifestEntry> files =
+ manifests.stream()
+ .flatMap(m ->
manifestFile.read(m.fileName()).stream())
+ .collect(Collectors.toList());
+ for (ManifestEntry entry : files) {
result.add(
new Path(
pathFactory.bucketPath(entry.partition(),
entry.bucket()),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index 739d4b6bd..96dce3d78 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -44,6 +44,7 @@ import org.apache.paimon.utils.TagManager;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -451,7 +452,7 @@ public class ExpireSnapshotsTest {
store.assertCleaned();
}
- @Test
+ @RepeatedTest(5)
public void testChangelogOutLivedSnapshot() throws Exception {
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index 2fd8c10cd..4f3d5c1c2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -26,8 +26,6 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.manifest.ManifestFileMeta;
-import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
@@ -252,29 +250,6 @@ public class KeyValueFileStoreScanTest {
runTestExactMatch(scan, wantedSnapshot, expected);
}
- @Test
- public void testWithManifestList() throws Exception {
- ThreadLocalRandom random = ThreadLocalRandom.current();
- int numCommits = random.nextInt(10) + 1;
- for (int i = 0; i < numCommits; i++) {
- List<KeyValue> data = generateData(random.nextInt(100) + 1);
- writeData(data);
- }
-
- ManifestList manifestList = store.manifestListFactory().create();
- long wantedSnapshotId =
random.nextLong(snapshotManager.latestSnapshotId()) + 1;
- Snapshot wantedSnapshot = snapshotManager.snapshot(wantedSnapshotId);
- List<ManifestFileMeta> wantedManifests =
manifestList.readDataManifests(wantedSnapshot);
-
- FileStoreScan scan = store.newScan();
- scan.withManifestList(wantedManifests);
-
- List<KeyValue> expectedKvs =
store.readKvsFromSnapshot(wantedSnapshotId);
- gen.sort(expectedKvs);
- Map<BinaryRow, BinaryRow> expected = store.toKvMap(expectedKvs);
- runTestExactMatch(scan, null, expected);
- }
-
@Test
public void testDropStatsInPlan() throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index f6343bfe4..4d8408955 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -1473,10 +1473,10 @@ public abstract class FileStoreTableTestBase {
TestFileStore.getFilesInUse(
latestSnapshotId,
snapshotManager,
- store.newScan(),
table.fileIO(),
store.pathFactory(),
- store.manifestListFactory().create());
+ store.manifestListFactory().create(),
+ store.manifestFileFactory().create());
List<Path> unusedFileList =
Files.walk(Paths.get(tempDir.toString()))