This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 06439061 [FLINK-28255] Add extraFiles to DataFileMeta 06439061 is described below commit 0643906175416c2e5132fea7d3cacc934927b645 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Tue Jun 28 10:58:33 2022 +0800 [FLINK-28255] Add extraFiles to DataFileMeta This closes #175 --- .../flink/table/store/utils/RowDataUtils.java | 14 +++++ .../flink/table/store/file/data/DataFileMeta.java | 73 +++++++++++++++++++--- .../store/file/data/DataFileMetaSerializer.java | 8 ++- .../table/store/file/manifest/ManifestFile.java | 23 +++++++ .../store/file/operation/FileStoreExpireImpl.java | 73 +++++++++++++--------- .../store/file/utils/FileStorePathFactory.java | 25 -------- .../table/store/file/utils/SerializationUtils.java | 10 ++- .../flink/table/store/file/TestFileStore.java | 8 +-- .../file/data/DataFileMetaSerializerTest.java | 4 +- .../store/file/operation/FileStoreExpireTest.java | 50 +++++++++++++++ 10 files changed, 215 insertions(+), 73 deletions(-) diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java index 9f1eb027..15a2ebb3 100644 --- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java +++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/RowDataUtils.java @@ -43,7 +43,9 @@ import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; /** Utils for {@link RowData} structures. */ @@ -256,4 +258,16 @@ public class RowDataUtils { throw new UnsupportedOperationException("Unsupported type: " + fieldType); } } + + public static ArrayData toStringArrayData(List<String> list) { + return new GenericArrayData(list.stream().map(StringData::fromString).toArray()); + } + + public static List<String> fromStringArrayData(ArrayData arrayData) { + List<String> list = new ArrayList<>(arrayData.size()); + for (int i = 0; i < arrayData.size(); i++) { + list.add(arrayData.isNullAt(i) ? null : arrayData.getString(i).toString()); + } + return list; + } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java index e85e3f5e..b8d70ec0 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java @@ -21,17 +21,19 @@ package org.apache.flink.table.store.file.data; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.store.file.stats.BinaryTableStats; import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer; +import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.VarCharType; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW; import static org.apache.flink.table.store.file.utils.SerializationUtils.newBytesType; +import static org.apache.flink.table.store.file.utils.SerializationUtils.newStringType; import static org.apache.flink.util.Preconditions.checkArgument; /** Metadata of a data file. */ @@ -59,6 +61,8 @@ public class DataFileMeta { private final long schemaId; private final int level; + private final List<String> extraFiles; + public static DataFileMeta forAppend( String fileName, long fileSize, @@ -93,6 +97,34 @@ public class DataFileMeta { long maxSequenceNumber, long schemaId, int level) { + this( + fileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + valueStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + Collections.emptyList()); + } + + public DataFileMeta( + String fileName, + long fileSize, + long rowCount, + BinaryRowData minKey, + BinaryRowData maxKey, + BinaryTableStats keyStats, + BinaryTableStats valueStats, + long minSequenceNumber, + long maxSequenceNumber, + long schemaId, + int level, + List<String> extraFiles) { this.fileName = fileName; this.fileSize = fileSize; this.rowCount = rowCount; @@ -106,6 +138,7 @@ public class DataFileMeta { this.maxSequenceNumber = maxSequenceNumber; this.level = level; this.schemaId = schemaId; + this.extraFiles = Collections.unmodifiableList(extraFiles); } public String fileName() { @@ -152,6 +185,10 @@ public class DataFileMeta { return level; } + public List<String> extraFiles() { + return extraFiles; + } + public DataFileMeta upgrade(int newLevel) { checkArgument(newLevel > this.level); return new DataFileMeta( @@ -165,7 +202,24 @@ public class DataFileMeta { minSequenceNumber, maxSequenceNumber, schemaId, - newLevel); + newLevel, + extraFiles); + } + + public DataFileMeta copy(List<String> newExtraFiles) { + return new DataFileMeta( + fileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + valueStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + newExtraFiles); } @Override @@ -184,7 +238,8 @@ public class DataFileMeta { && minSequenceNumber == that.minSequenceNumber && maxSequenceNumber == that.maxSequenceNumber && schemaId == that.schemaId - && level == that.level; + && level == that.level + && Objects.equals(extraFiles, that.extraFiles); } @Override @@ -200,13 +255,14 @@ public class DataFileMeta { minSequenceNumber, maxSequenceNumber, schemaId, - level); + level, + extraFiles); } @Override public String toString() { return String.format( - "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d, %d}", + "{%s, %d, %d, %s, %s, %s, %s, %d, %d, %d, %d, %s}", fileName, fileSize, rowCount, @@ -217,12 +273,13 @@ public class DataFileMeta { minSequenceNumber, maxSequenceNumber, schemaId, - level); + level, + extraFiles); } public static RowType schema() { List<RowType.RowField> fields = new ArrayList<>(); - fields.add(new RowType.RowField("_FILE_NAME", new VarCharType(false, Integer.MAX_VALUE))); + fields.add(new RowType.RowField("_FILE_NAME", newStringType(false))); fields.add(new RowType.RowField("_FILE_SIZE", new BigIntType(false))); fields.add(new RowType.RowField("_ROW_COUNT", new BigIntType(false))); fields.add(new RowType.RowField("_MIN_KEY", newBytesType(false))); @@ -233,6 +290,8 @@ public class DataFileMeta { fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new BigIntType(false))); fields.add(new RowType.RowField("_SCHEMA_ID", new BigIntType(false))); fields.add(new RowType.RowField("_LEVEL", new IntType(false))); + fields.add( + new RowType.RowField("_EXTRA_FILES", new ArrayType(false, newStringType(false)))); return new RowType(fields); } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java index 6050a8fb..193e8e8f 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMetaSerializer.java @@ -26,6 +26,8 @@ import org.apache.flink.table.store.file.utils.ObjectSerializer; import static org.apache.flink.table.store.file.utils.SerializationUtils.deserializeBinaryRow; import static org.apache.flink.table.store.file.utils.SerializationUtils.serializeBinaryRow; +import static org.apache.flink.table.store.utils.RowDataUtils.fromStringArrayData; +import static org.apache.flink.table.store.utils.RowDataUtils.toStringArrayData; /** Serializer for {@link DataFileMeta}. */ public class DataFileMetaSerializer extends ObjectSerializer<DataFileMeta> { @@ -49,7 +51,8 @@ public class DataFileMetaSerializer extends ObjectSerializer<DataFileMeta> { meta.minSequenceNumber(), meta.maxSequenceNumber(), meta.schemaId(), - meta.level()); + meta.level(), + toStringArrayData(meta.extraFiles())); } @Override @@ -65,6 +68,7 @@ public class DataFileMetaSerializer extends ObjectSerializer<DataFileMeta> { row.getLong(7), row.getLong(8), row.getLong(9), - row.getInt(10)); + row.getInt(10), + fromStringArrayData(row.getArray(11))); } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java index 8b9d43d5..f77c697b 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java @@ -38,12 +38,17 @@ import org.apache.flink.table.store.file.writer.MetricFileWriter; import org.apache.flink.table.store.file.writer.RollingFileWriter; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.function.Supplier; /** @@ -102,6 +107,24 @@ public class ManifestFile { } } + public Iterable<ManifestEntry> readManifestFiles(List<String> manifestFiles) { + Queue<String> files = new LinkedList<>(manifestFiles); + return Iterables.concat( + (Iterable<Iterable<ManifestEntry>>) + () -> + new Iterator<Iterable<ManifestEntry>>() { + @Override + public boolean hasNext() { + return files.size() > 0; + } + + @Override + public Iterable<ManifestEntry> next() { + return read(files.poll()); + } + }); + } + /** * Write several {@link ManifestEntry}s into manifest files. * diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java index 690ba672..bd7bf226 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java @@ -18,9 +18,9 @@ package org.apache.flink.table.store.file.operation; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; import org.apache.flink.table.store.file.Snapshot; -import org.apache.flink.table.store.file.data.DataFilePathFactory; import org.apache.flink.table.store.file.manifest.ManifestEntry; import org.apache.flink.table.store.file.manifest.ManifestFile; import org.apache.flink.table.store.file.manifest.ManifestFileMeta; @@ -34,10 +34,14 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.stream.Collectors; /** * Default implementation of {@link FileStoreExpire}. It retains a certain number or period of @@ -158,8 +162,6 @@ public class FileStoreExpireImpl implements FileStoreExpire { } // delete data files - FileStorePathFactory.DataFilePathFactoryCache dataFilePathFactoryCache = - new FileStorePathFactory.DataFilePathFactoryCache(pathFactory); // deleted data files in a snapshot are not used by that snapshot, so the range of id should // be (beginInclusiveId, endExclusiveId] for (long id = beginInclusiveId + 1; id <= endExclusiveId; id++) { @@ -167,34 +169,12 @@ public class FileStoreExpireImpl implements FileStoreExpire { LOG.debug("Ready to delete data files in snapshot #" + id); } - Snapshot toExpire = snapshotManager.snapshot(id); - List<ManifestFileMeta> deltaManifests = manifestList.read(toExpire.deltaManifestList()); - - // we cannot delete a data file directly when we meet a DELETE entry, because that - // file might be upgraded - Set<Path> dataFileToDelete = new HashSet<>(); - for (ManifestFileMeta meta : deltaManifests) { - for (ManifestEntry entry : manifestFile.read(meta.fileName())) { - DataFilePathFactory dataFilePathFactory = - dataFilePathFactoryCache.getDataFilePathFactory( - entry.partition(), entry.bucket()); - Path dataFilePath = dataFilePathFactory.toPath(entry.file().fileName()); - switch (entry.kind()) { - case ADD: - dataFileToDelete.remove(dataFilePath); - break; - case DELETE: - dataFileToDelete.add(dataFilePath); - break; - default: - throw new UnsupportedOperationException( - "Unknown value kind " + entry.kind().name()); - } - } - } - for (Path dataFile : dataFileToDelete) { - FileUtils.deleteOrWarn(dataFile); - } + List<String> manifestFiles = + manifestList.read(snapshotManager.snapshot(id).deltaManifestList()).stream() + .map(ManifestFileMeta::fileName) + .collect(Collectors.toList()); + Iterable<ManifestEntry> dataFileLog = manifestFile.readManifestFiles(manifestFiles); + expireDataFiles(dataFileLog); } // delete manifests @@ -228,6 +208,37 @@ public class FileStoreExpireImpl implements FileStoreExpire { writeEarliestHint(endExclusiveId); } + @VisibleForTesting + void expireDataFiles(Iterable<ManifestEntry> dataFileLog) { + // we cannot delete a data file directly when we meet a DELETE entry, because that + // file might be upgraded + Map<Path, List<Path>> dataFileToDelete = new HashMap<>(); + for (ManifestEntry entry : dataFileLog) { + Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); + Path dataFilePath = new Path(bucketPath, entry.file().fileName()); + switch (entry.kind()) { + case ADD: + dataFileToDelete.remove(dataFilePath); + break; + case DELETE: + List<Path> extraFiles = new ArrayList<>(entry.file().extraFiles().size()); + for (String file : entry.file().extraFiles()) { + extraFiles.add(new Path(bucketPath, file)); + } + dataFileToDelete.put(dataFilePath, extraFiles); + break; + default: + throw new UnsupportedOperationException( + "Unknown value kind " + entry.kind().name()); + } + } + dataFileToDelete.forEach( + (path, extraFiles) -> { + FileUtils.deleteOrWarn(path); + extraFiles.forEach(FileUtils::deleteOrWarn); + }); + } + private void writeEarliestHint(long earliest) { // update earliest hint file diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java index dff608c4..b8fafc96 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java @@ -33,8 +33,6 @@ import org.apache.flink.util.Preconditions; import javax.annotation.concurrent.ThreadSafe; -import java.util.HashMap; -import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; @@ -131,27 +129,4 @@ public class FileStorePathFactory { public String uuid() { return uuid; } - - /** Cache for storing {@link DataFilePathFactory}s. */ - public static class DataFilePathFactoryCache { - - private final FileStorePathFactory pathFactory; - private final Map<BinaryRowData, Map<Integer, DataFilePathFactory>> cache; - - public DataFilePathFactoryCache(FileStorePathFactory pathFactory) { - this.pathFactory = pathFactory; - this.cache = new HashMap<>(); - } - - public DataFilePathFactory getDataFilePathFactory(BinaryRowData partition, int bucket) { - return cache.compute(partition, (p, m) -> m == null ? new HashMap<>() : m) - .compute( - bucket, - (b, f) -> - f == null - ? pathFactory.createDataFilePathFactory( - partition, bucket) - : f); - } - } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SerializationUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SerializationUtils.java index b78eb5c5..e18cedd3 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SerializationUtils.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/SerializationUtils.java @@ -24,6 +24,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer; import org.apache.flink.table.types.logical.VarBinaryType; +import org.apache.flink.table.types.logical.VarCharType; import java.io.EOFException; import java.io.IOException; @@ -58,9 +59,14 @@ public class SerializationUtils { return buf; } - /** Create a bytes type VarBinaryType(Integer.MAX_VALUE). */ + /** Create a bytes type VarBinaryType(VarBinaryType.MAX_LENGTH). */ public static VarBinaryType newBytesType(boolean isNullable) { - return new VarBinaryType(isNullable, Integer.MAX_VALUE); + return new VarBinaryType(isNullable, VarBinaryType.MAX_LENGTH); + } + + /** Create a varchar type VarCharType(VarCharType.MAX_LENGTH). */ + public static VarCharType newStringType(boolean isNullable) { + return new VarCharType(isNullable, VarCharType.MAX_LENGTH); } /** diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java index a2f0a5df..1c4ff107 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java @@ -359,8 +359,6 @@ public class TestFileStore extends KeyValueFileStore { FileStorePathFactory pathFactory = pathFactory(); ManifestList manifestList = manifestListFactory().create(); FileStoreScan scan = newScan(); - FileStorePathFactory.DataFilePathFactoryCache dataFilePathFactoryCache = - new FileStorePathFactory.DataFilePathFactoryCache(pathFactory); SnapshotManager snapshotManager = snapshotManager(); Long latestSnapshotId = snapshotManager.latestSnapshotId(); @@ -396,9 +394,9 @@ public class TestFileStore extends KeyValueFileStore { List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files(); for (ManifestEntry entry : entries) { result.add( - dataFilePathFactoryCache - .getDataFilePathFactory(entry.partition(), entry.bucket()) - .toPath(entry.file().fileName())); + new Path( + pathFactory.bucketPath(entry.partition(), entry.bucket()), + entry.file().fileName())); } } return result; diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileMetaSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileMetaSerializerTest.java index c929e4df..c7cf19a8 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileMetaSerializerTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileMetaSerializerTest.java @@ -20,6 +20,8 @@ package org.apache.flink.table.store.file.data; import org.apache.flink.table.store.file.utils.ObjectSerializerTestBase; +import java.util.Arrays; + /** Tests for {@link DataFileMetaSerializer}. */ public class DataFileMetaSerializerTest extends ObjectSerializerTestBase<DataFileMeta> { @@ -32,6 +34,6 @@ public class DataFileMetaSerializerTest extends ObjectSerializerTestBase<DataFil @Override protected DataFileMeta object() { - return gen.next().meta; + return gen.next().meta.copy(Arrays.asList("extra1", "extra2")); } } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java index 7b9bc4b0..999be1c9 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java @@ -18,13 +18,18 @@ package org.apache.flink.table.store.file.operation; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.store.file.KeyValue; import org.apache.flink.table.store.file.Snapshot; import org.apache.flink.table.store.file.TestFileStore; import org.apache.flink.table.store.file.TestKeyValueGenerator; +import org.apache.flink.table.store.file.ValueKind; +import org.apache.flink.table.store.file.data.DataFileMeta; +import org.apache.flink.table.store.file.manifest.ManifestEntry; import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction; +import org.apache.flink.table.store.file.utils.FileUtils; import org.apache.flink.table.store.file.utils.SnapshotManager; import org.junit.jupiter.api.AfterEach; @@ -34,10 +39,12 @@ import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link FileStoreExpireImpl}. */ @@ -68,6 +75,49 @@ public class FileStoreExpireTest { store.assertCleaned(); } + @Test + public void testExpireExtraFiles() throws IOException { + FileStoreExpireImpl expire = store.newExpire(1, 3, Long.MAX_VALUE); + + // write test files + BinaryRowData partition = gen.getPartition(gen.next()); + Path bucketPath = store.pathFactory().bucketPath(partition, 0); + Path myDataFile = new Path(bucketPath, "myDataFile"); + FileUtils.writeFileUtf8(myDataFile, "1"); + Path extra1 = new Path(bucketPath, "extra1"); + FileUtils.writeFileUtf8(extra1, "2"); + Path extra2 = new Path(bucketPath, "extra2"); + FileUtils.writeFileUtf8(extra2, "3"); + + // create DataFileMeta and ManifestEntry + List<String> extraFiles = Arrays.asList("extra1", "extra2"); + DataFileMeta dataFile = + new DataFileMeta( + "myDataFile", + 1, + 1, + EMPTY_ROW, + EMPTY_ROW, + null, + null, + 0, + 1, + 0, + 0, + extraFiles); + ManifestEntry add = new ManifestEntry(ValueKind.ADD, partition, 0, 1, dataFile); + ManifestEntry delete = new ManifestEntry(ValueKind.DELETE, partition, 0, 1, dataFile); + + // expire + expire.expireDataFiles(Arrays.asList(add, delete)); + + // check + FileSystem fs = myDataFile.getFileSystem(); + assertThat(fs.exists(myDataFile)).isFalse(); + assertThat(fs.exists(extra1)).isFalse(); + assertThat(fs.exists(extra2)).isFalse(); + } + @Test public void testNoSnapshot() { FileStoreExpire expire = store.newExpire(1, 3, Long.MAX_VALUE);