This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1107f2b05 [core] FileDeletionBase should cancel reading manifest files
when any file is failed to read (#2105)
1107f2b05 is described below
commit 1107f2b0521bc582370528479344d18eb63cfa03
Author: yuzelin <[email protected]>
AuthorDate: Tue Oct 10 22:00:16 2023 +0800
[core] FileDeletionBase should cancel reading manifest files when any file
is failed to read (#2105)
---
.../apache/paimon/operation/FileDeletionBase.java | 69 ++++++-------
.../paimon/operation/FileStoreExpireImpl.java | 17 +++-
.../apache/paimon/operation/SnapshotDeletion.java | 62 ++++++++++--
.../org/apache/paimon/operation/TagDeletion.java | 46 ++++++---
.../org/apache/paimon/table/RollbackHelper.java | 26 ++++-
.../java/org/apache/paimon/utils/TagManager.java | 24 ++++-
.../operation/CleanedFileStoreExpireTest.java | 2 +-
.../apache/paimon/operation/FileDeletionTest.java | 108 +++++++++++++++++++++
8 files changed, 277 insertions(+), 77 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
index 8de8dfa85..a26bf7b5a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java
@@ -32,8 +32,6 @@ import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.FileUtils;
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterables;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,11 +41,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -215,50 +211,41 @@ public abstract class FileDeletionBase {
}
}
- public Iterable<ManifestEntry> tryReadManifestEntries(String
manifestListName) {
- return readManifestEntries(tryReadManifestList(manifestListName));
- }
-
- /** Try to read base and delta manifest lists at one time. */
- protected Iterable<ManifestEntry> tryReadDataManifestEntries(Snapshot
snapshot) {
+ protected List<String> tryReadDataManifests(Snapshot snapshot) {
List<ManifestFileMeta> manifestFileMetas =
tryReadManifestList(snapshot.baseManifestList());
manifestFileMetas.addAll(tryReadManifestList(snapshot.deltaManifestList()));
-
- return readManifestEntries(manifestFileMetas);
+ return readManifestFileNames(manifestFileMetas);
}
- protected Iterable<ManifestEntry> readManifestEntries(
- List<ManifestFileMeta> manifestFileMetas) {
- Queue<String> files =
- manifestFileMetas.stream()
- .map(ManifestFileMeta::fileName)
- .collect(Collectors.toCollection(LinkedList::new));
- return Iterables.concat(
- (Iterable<Iterable<ManifestEntry>>)
- () ->
- new Iterator<Iterable<ManifestEntry>>() {
- @Override
- public boolean hasNext() {
- return files.size() > 0;
- }
-
- @Override
- public Iterable<ManifestEntry> next() {
- String file = files.poll();
- try {
- return manifestFile.read(file);
- } catch (Exception e) {
- LOG.warn("Failed to read manifest
file " + file, e);
- return Collections.emptyList();
- }
- }
- });
+ protected List<String> readManifestFileNames(List<ManifestFileMeta>
manifestFileMetas) {
+ return manifestFileMetas.stream()
+ .map(ManifestFileMeta::fileName)
+ .collect(Collectors.toCollection(LinkedList::new));
}
+ /**
+ * NOTE: This method is used for building data file skipping set. If
failed to read some
+ * manifests, it will throw exception which callers must handle.
+ */
protected void addMergedDataFiles(
- Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot
snapshot) {
- Iterable<ManifestEntry> entries = tryReadDataManifestEntries(snapshot);
- for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
+ Map<BinaryRow, Map<Integer, Set<String>>> dataFiles, Snapshot
snapshot)
+ throws Exception {
+ // read data manifests
+ List<String> files = tryReadDataManifests(snapshot);
+
+ // try merging
+ Map<ManifestEntry.Identifier, ManifestEntry> map = new HashMap<>();
+ for (String file : files) {
+ List<ManifestEntry> entries;
+ try {
+ entries = manifestFile.read(file);
+ } catch (Exception e) {
+ throw new Exception("Failed to read manifest file " + file, e);
+ }
+ ManifestEntry.mergeEntries(entries, map);
+ }
+
+ for (ManifestEntry entry : map.values()) {
dataFiles
.computeIfAbsent(entry.partition(), p -> new HashMap<>())
.computeIfAbsent(entry.bucket(), b -> new HashSet<>())
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index 8b177bdf9..ab6bb5d09 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -33,6 +34,7 @@ import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.function.Predicate;
/**
* Default implementation of {@link FileStoreExpire}. It retains a certain
number or period of
@@ -178,8 +180,19 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
}
Snapshot snapshot = snapshotManager.snapshot(id);
// expire merge tree files and collect changed buckets
- snapshotDeletion.cleanUnusedDataFiles(
- snapshot,
snapshotDeletion.dataFileSkipper(taggedSnapshots, id));
+ Predicate<ManifestEntry> skipper;
+ try {
+ skipper = snapshotDeletion.dataFileSkipper(taggedSnapshots,
id);
+ } catch (Exception e) {
+ LOG.info(
+ String.format(
+ "Skip cleaning data files of snapshot '%s' due
to failed to build skipping set.",
+ id),
+ e);
+ continue;
+ }
+
+ snapshotDeletion.cleanUnusedDataFiles(snapshot, skipper);
}
// delete changelog files
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
index 8b9740c33..084845a50 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java
@@ -32,6 +32,9 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.TagManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -42,6 +45,8 @@ import java.util.function.Predicate;
/** Delete snapshot files. */
public class SnapshotDeletion extends FileDeletionBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotDeletion.class);
+
/** Used to record which tag is cached in tagged snapshots list. */
private int cachedTagIndex = -1;
@@ -59,7 +64,25 @@ public class SnapshotDeletion extends FileDeletionBase {
@Override
public void cleanUnusedDataFiles(Snapshot snapshot,
Predicate<ManifestEntry> skipper) {
-
doCleanUnusedDataFile(tryReadManifestEntries(snapshot.deltaManifestList()),
skipper);
+ // try read manifests
+ List<String> manifestFileNames =
+
readManifestFileNames(tryReadManifestList(snapshot.deltaManifestList()));
+ List<ManifestEntry> manifestEntries = new ArrayList<>();
+ // data file path -> (original manifest entry, extra file paths)
+ Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new
HashMap<>();
+ for (String manifest : manifestFileNames) {
+ try {
+ manifestEntries = manifestFile.read(manifest);
+ } catch (Exception e) {
+ // cancel deletion if any exception occurs
+ LOG.warn("Failed to read some manifest files. Cancel
deletion.", e);
+ return;
+ }
+
+ getDataFileToDelete(dataFileToDelete, manifestEntries);
+ }
+
+ doCleanUnusedDataFile(dataFileToDelete, skipper);
}
@Override
@@ -67,14 +90,12 @@ public class SnapshotDeletion extends FileDeletionBase {
cleanUnusedManifests(snapshot, skippingSet, true);
}
- @VisibleForTesting
- void doCleanUnusedDataFile(
- Iterable<ManifestEntry> dataFileLog, Predicate<ManifestEntry>
skipper) {
+ private void getDataFileToDelete(
+ Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete,
+ List<ManifestEntry> dataFileEntries) {
// we cannot delete a data file directly when we meet a DELETE entry,
because that
// file might be upgraded
- // data file path -> (original manifest entry, extra file paths)
- Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new
HashMap<>();
- for (ManifestEntry entry : dataFileLog) {
+ for (ManifestEntry entry : dataFileEntries) {
Path bucketPath = pathFactory.bucketPath(entry.partition(),
entry.bucket());
Path dataFilePath = new Path(bucketPath, entry.file().fileName());
switch (entry.kind()) {
@@ -93,7 +114,11 @@ public class SnapshotDeletion extends FileDeletionBase {
"Unknown value kind " + entry.kind().name());
}
}
+ }
+ private void doCleanUnusedDataFile(
+ Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete,
+ Predicate<ManifestEntry> skipper) {
List<Path> actualDataFileToDelete = new ArrayList<>();
dataFileToDelete.forEach(
(path, pair) -> {
@@ -110,16 +135,33 @@ public class SnapshotDeletion extends FileDeletionBase {
deleteFiles(actualDataFileToDelete, fileIO::deleteQuietly);
}
+ @VisibleForTesting
+ void cleanUnusedDataFile(List<ManifestEntry> dataFileLog) {
+ Map<Path, Pair<ManifestEntry, List<Path>>> dataFileToDelete = new
HashMap<>();
+ getDataFileToDelete(dataFileToDelete, dataFileLog);
+ doCleanUnusedDataFile(dataFileToDelete, f -> false);
+ }
+
/**
* Delete added file in the manifest list files. Added files marked as
"ADD" in manifests.
*
* @param manifestListName name of manifest list
*/
public void deleteAddedDataFiles(String manifestListName) {
- deleteAddedDataFiles(tryReadManifestEntries(manifestListName));
+ List<String> manifestFileNames =
+ readManifestFileNames(tryReadManifestList(manifestListName));
+ for (String file : manifestFileNames) {
+ try {
+ List<ManifestEntry> manifestEntries = manifestFile.read(file);
+ deleteAddedDataFiles(manifestEntries);
+ } catch (Exception e) {
+ // We want to delete the data file, so just ignore the
unavailable files
+ LOG.info("Failed to read manifest " + file + ". Ignore it.",
e);
+ }
+ }
}
- public void deleteAddedDataFiles(Iterable<ManifestEntry> manifestEntries) {
+ private void deleteAddedDataFiles(List<ManifestEntry> manifestEntries) {
List<Path> dataFileToDelete = new ArrayList<>();
for (ManifestEntry entry : manifestEntries) {
if (entry.kind() == FileKind.ADD) {
@@ -134,7 +176,7 @@ public class SnapshotDeletion extends FileDeletionBase {
}
public Predicate<ManifestEntry> dataFileSkipper(
- List<Snapshot> taggedSnapshots, long expiringSnapshotId) {
+ List<Snapshot> taggedSnapshots, long expiringSnapshotId) throws
Exception {
int index = TagManager.findPreviousTag(taggedSnapshots,
expiringSnapshotId);
// refresh tag data files
if (index >= 0 && cachedTagIndex != index) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
index 77e3d7c70..09531e75c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java
@@ -28,9 +28,12 @@ import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.utils.FileStorePathFactory;
-import java.util.ArrayList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -39,6 +42,8 @@ import java.util.function.Predicate;
/** Delete tag files. */
public class TagDeletion extends FileDeletionBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(TagDeletion.class);
+
public TagDeletion(
FileIO fileIO,
FileStorePathFactory pathFactory,
@@ -50,7 +55,7 @@ public class TagDeletion extends FileDeletionBase {
@Override
public void cleanUnusedDataFiles(Snapshot taggedSnapshot,
Predicate<ManifestEntry> skipper) {
- cleanUnusedDataFiles(tryReadDataManifestEntries(taggedSnapshot),
skipper);
+ cleanUnusedDataFiles(tryReadDataManifests(taggedSnapshot), skipper);
}
@Override
@@ -59,28 +64,39 @@ public class TagDeletion extends FileDeletionBase {
cleanUnusedManifests(taggedSnapshot, skippingSet, false);
}
- public void cleanUnusedDataFiles(
- Iterable<ManifestEntry> entries, Predicate<ManifestEntry> skipper)
{
- List<Path> dataFileToDelete = new ArrayList<>();
- for (ManifestEntry entry : ManifestEntry.mergeEntries(entries)) {
- if (!skipper.test(entry)) {
- Path bucketPath = pathFactory.bucketPath(entry.partition(),
entry.bucket());
- dataFileToDelete.add(new Path(bucketPath,
entry.file().fileName()));
- for (String file : entry.file().extraFiles()) {
- dataFileToDelete.add(new Path(bucketPath, file));
- }
+ private void cleanUnusedDataFiles(
+ List<String> manifestFileNames, Predicate<ManifestEntry> skipper) {
+ Set<Path> dataFileToDelete = new HashSet<>();
+ for (String manifest : manifestFileNames) {
+ List<ManifestEntry> manifestEntries;
+ try {
+ manifestEntries = manifestFile.read(manifest);
+ } catch (Exception e) {
+ // We want to delete the data file, so just ignore the
unavailable files
+ LOG.info("Failed to read manifest " + manifest + ". Ignore
it.", e);
+ continue;
+ }
+
+ for (ManifestEntry entry : manifestEntries) {
+ if (!skipper.test(entry)) {
+ Path bucketPath =
pathFactory.bucketPath(entry.partition(), entry.bucket());
+ dataFileToDelete.add(new Path(bucketPath,
entry.file().fileName()));
+ for (String file : entry.file().extraFiles()) {
+ dataFileToDelete.add(new Path(bucketPath, file));
+ }
- recordDeletionBuckets(entry);
+ recordDeletionBuckets(entry);
+ }
}
}
deleteFiles(dataFileToDelete, fileIO::deleteQuietly);
}
- public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot) {
+ public Predicate<ManifestEntry> dataFileSkipper(Snapshot fromSnapshot)
throws Exception {
return dataFileSkipper(Collections.singletonList(fromSnapshot));
}
- public Predicate<ManifestEntry> dataFileSkipper(List<Snapshot>
fromSnapshots) {
+ public Predicate<ManifestEntry> dataFileSkipper(List<Snapshot>
fromSnapshots) throws Exception {
Map<BinaryRow, Map<Integer, Set<String>>> skipped = new HashMap<>();
for (Snapshot snapshot : fromSnapshots) {
addMergedDataFiles(skipped, snapshot);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
index 4843393e7..a1d5333e4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java
@@ -26,6 +26,9 @@ import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
@@ -40,6 +43,8 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
/** Helper class for {@link Table#rollbackTo} including utils to clean
snapshots. */
public class RollbackHelper {
+ private static final Logger LOG =
LoggerFactory.getLogger(RollbackHelper.class);
+
private final SnapshotManager snapshotManager;
private final TagManager tagManager;
private final FileIO fileIO;
@@ -136,13 +141,24 @@ public class RollbackHelper {
}
// delete data files
- Predicate<ManifestEntry> dataFileSkipper =
tagDeletion.dataFileSkipper(retainedSnapshot);
- for (Snapshot s : toBeCleaned) {
- tagDeletion.cleanUnusedDataFiles(s, dataFileSkipper);
+ Predicate<ManifestEntry> dataFileSkipper = null;
+ boolean success = true;
+ try {
+ dataFileSkipper = tagDeletion.dataFileSkipper(retainedSnapshot);
+ } catch (Exception e) {
+ LOG.info(
+ "Skip cleaning data files for deleted tags due to failed
to build skipping set.",
+ e);
+ success = false;
}
- // delete directories
- tagDeletion.cleanDataDirectories();
+ if (success) {
+ for (Snapshot s : toBeCleaned) {
+ tagDeletion.cleanUnusedDataFiles(s, dataFileSkipper);
+ }
+ // delete directories
+ tagDeletion.cleanDataDirectories();
+ }
return toBeCleaned;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index 46e62b014..8a46b5f24 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -24,6 +24,9 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.TagDeletion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
@@ -38,6 +41,8 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Manager for {@code Tag}. */
public class TagManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(TagManager.class);
+
private static final String TAG_PREFIX = "tag-";
private final FileIO fileIO;
@@ -116,9 +121,22 @@ public class TagManager {
skippedSnapshots.add(right);
// delete data files and empty directories
- Predicate<ManifestEntry> dataFileSkipper =
tagDeletion.dataFileSkipper(skippedSnapshots);
- tagDeletion.cleanUnusedDataFiles(taggedSnapshot, dataFileSkipper);
- tagDeletion.cleanDataDirectories();
+ Predicate<ManifestEntry> dataFileSkipper = null;
+ boolean success = true;
+ try {
+ dataFileSkipper = tagDeletion.dataFileSkipper(skippedSnapshots);
+ } catch (Exception e) {
+ LOG.info(
+ String.format(
+ "Skip cleaning data files of tag '%s' due to
failed to build skipping set.",
+ tagName),
+ e);
+ success = false;
+ }
+ if (success) {
+ tagDeletion.cleanUnusedDataFiles(taggedSnapshot, dataFileSkipper);
+ tagDeletion.cleanDataDirectories();
+ }
// delete manifests
tagDeletion.cleanUnusedManifests(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
index 91eb0b01c..1d5eb7329 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/CleanedFileStoreExpireTest.java
@@ -88,7 +88,7 @@ public class CleanedFileStoreExpireTest extends
FileStoreExpireTestBase {
ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition,
0, 1, dataFile);
// expire
- expire.snapshotDeletion().doCleanUnusedDataFile(Arrays.asList(add,
delete), f -> false);
+ expire.snapshotDeletion().cleanUnusedDataFile(Arrays.asList(add,
delete));
// check
assertThat(fileIO.exists(myDataFile)).isFalse();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index ede56ff14..d150ca457 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -30,6 +30,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
@@ -45,6 +46,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -500,6 +503,111 @@ public class FileDeletionTest {
}
}
+ /**
+ * When a data file is upgraded, it will have a {@link FileKind#ADD} and a
{@link
+ * FileKind#DELETE} entries. This test ensure that if the ADD entry cannot
be read correctly
+ * when expiring, the data file won't be deleted. In this test we manually
separate the ADD
+ * entry and delete entry into two manifest file and delete the ADD entry
manifest file to
+ * simulate the read exception.
+ */
+ @Test
+ public void testExpireWithMissingManifest() throws Exception {
+ TestFileStore store =
createStore(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+ SnapshotManager snapshotManager = store.snapshotManager();
+ TestKeyValueGenerator gen =
+ new
TestKeyValueGenerator(TestKeyValueGenerator.GeneratorMode.NON_PARTITIONED);
+ BinaryRow partition = gen.getPartition(gen.next());
+
+ // snapshot 1: commit A to bucket 0
+ Map<BinaryRow, Map<Integer, RecordWriter<KeyValue>>> writers = new
HashMap<>();
+ List<KeyValue> kvs = partitionedData(5, gen);
+ writeData(store, kvs, partition, 0, writers);
+ commitData(store, commitIdentifier++, writers);
+
+ // snapshot 2: compact
+ writers.values().stream()
+ .flatMap(m -> m.values().stream())
+ .forEach(
+ writer -> {
+ try {
+ writer.compact(true);
+ writer.sync();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ FileStoreTestUtils.commitData(store, commitIdentifier++, writers);
+
+ // check that there are one data file and get its path
+ FileStorePathFactory pathFactory = store.pathFactory();
+ Path bucket0 = pathFactory.bucketPath(partition, 0);
+ List<Path> datafiles =
+ Files.walk(Paths.get(bucket0.toString()))
+ .filter(Files::isRegularFile)
+ .filter(p ->
p.getFileName().toString().startsWith("data"))
+ .map(p -> new Path(p.toString()))
+ .collect(Collectors.toList());
+ assertThat(datafiles.size()).isEqualTo(1);
+ Path dataFileA = datafiles.get(0);
+
+ // check the snapshot 2 has two delta manifests entry (-A, level=0),
(+A, level=5)
+ Snapshot snapshot2 = snapshotManager.snapshot(2);
+ ManifestList manifestList = store.manifestListFactory().create();
+ ManifestFile manifestFile = store.manifestFileFactory().create();
+
+ String deltaManifestList = snapshot2.deltaManifestList();
+ List<ManifestFileMeta> manifestFileMetas =
manifestList.read(snapshot2.deltaManifestList());
+ assertThat(manifestFileMetas.size()).isEqualTo(1);
+
+ String deltaManifestFile = manifestFileMetas.get(0).fileName();
+ List<ManifestEntry> entries = manifestFile.read(deltaManifestFile);
+ assertThat(entries.size()).isEqualTo(2);
+
+ ManifestEntry addEntry = null, deleteEntry = null;
+ for (ManifestEntry entry : entries) {
+ assertThat(entry.file().fileName()).isEqualTo(dataFileA.getName());
+ if (entry.kind() == FileKind.ADD) {
+ assertThat(addEntry).isNull();
+ addEntry = entry;
+ assertThat(entry.file().level()).isGreaterThan(0);
+ } else {
+ assertThat(deleteEntry).isNull();
+ deleteEntry = entry;
+ assertThat(entry.file().level()).isEqualTo(0);
+ }
+ }
+ assertThat(addEntry).isNotNull();
+ assertThat(deleteEntry).isNotNull();
+
+ // separate two entries to two manifest files and delete the (+A,
level=5) manifest
+
fileIO.deleteQuietly(pathFactory.toManifestListPath(deltaManifestList));
+
fileIO.deleteQuietly(pathFactory.toManifestFilePath(deltaManifestFile));
+
+ List<ManifestFileMeta> newAddManifests =
+ manifestFile.write(Collections.singletonList(addEntry));
+ assertThat(newAddManifests.size()).isEqualTo(1);
+ String newAddManifestFileName = newAddManifests.get(0).fileName();
+
fileIO.deleteQuietly(pathFactory.toManifestFilePath(newAddManifestFileName));
+
+ List<ManifestFileMeta> newDeleteManifests =
+ manifestFile.write(Collections.singletonList(deleteEntry));
+ assertThat(newDeleteManifests.size()).isEqualTo(1);
+
+ List<ManifestFileMeta> newManifests =
+ Arrays.asList(newAddManifests.get(0),
newDeleteManifests.get(0));
+
+ String newManifestListName = manifestList.write(newManifests);
+
+ fileIO.rename(
+ pathFactory.toManifestListPath(newManifestListName),
+ pathFactory.toManifestListPath(deltaManifestList));
+
+ store.newExpire(1, 1, Long.MAX_VALUE).expire();
+
+ // check data file
+ assertPathExists(fileIO, dataFileA);
+ }
+
private TestFileStore createStore(TestKeyValueGenerator.GeneratorMode
mode) throws Exception {
return createStore(mode, 2);
}