This is an automated email from the ASF dual-hosted git repository. czweng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 929a411f [FLINK-30611] Expiration can be performed with missing files 929a411f is described below commit 929a411f29f4fc76dfead6e716daae56c165724c Author: tsreaper <tsreape...@gmail.com> AuthorDate: Tue Jan 10 18:38:05 2023 +0800 [FLINK-30611] Expiration can be performed with missing files This closes #474. --- .../table/store/file/manifest/ManifestFile.java | 23 ---- .../store/file/operation/FileStoreExpireImpl.java | 48 ++++++++- .../flink/table/store/file/TestFileStore.java | 64 ++++++----- ...reTest.java => CleanedFileStoreExpireTest.java} | 90 +--------------- .../file/operation/FileStoreExpireTestBase.java | 119 +++++++++++++++++++++ .../operation/UncleanedFileStoreExpireTest.java | 84 +++++++++++++++ 6 files changed, 288 insertions(+), 140 deletions(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java index c6dbab0b..77b26de2 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java @@ -35,13 +35,8 @@ import org.apache.flink.table.store.format.FieldStatsCollector; import org.apache.flink.table.store.format.FileFormat; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; - import java.io.IOException; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; /** * This file includes several {@link ManifestEntry}s, representing the additional changes since last @@ -91,24 +86,6 @@ public class ManifestFile { } } - public Iterable<ManifestEntry> readManifestFiles(List<String> manifestFiles) { - Queue<String> files = new LinkedList<>(manifestFiles); - return Iterables.concat( - (Iterable<Iterable<ManifestEntry>>) - () -> - new Iterator<Iterable<ManifestEntry>>() { - @Override - public boolean hasNext() { - return files.size() > 0; - } - - @Override - public Iterable<ManifestEntry> next() { - return read(files.poll()); - } - }); - } - /** * Write several {@link ManifestEntry}s into manifest files. * diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java index 5f85cb8a..fd7012f9 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreExpireImpl.java @@ -29,14 +29,20 @@ import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.FileUtils; import org.apache.flink.table.store.file.utils.SnapshotManager; +import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; import java.util.stream.Collectors; @@ -184,9 +190,14 @@ public class FileStoreExpireImpl implements FileStoreExpire { } Snapshot toExpire = snapshotManager.snapshot(id); + // cannot call `toExpire.readAllDataManifests` directly, it is possible that a job is + // killed during expiration, so some manifest files may have been deleted + List<ManifestFileMeta> toExpireManifests = new ArrayList<>(); + toExpireManifests.addAll(tryReadManifestList(toExpire.baseManifestList())); + toExpireManifests.addAll(tryReadManifestList(toExpire.deltaManifestList())); // delete manifest - for (ManifestFileMeta manifest : toExpire.readAllDataManifests(manifestList)) { + for (ManifestFileMeta manifest : toExpireManifests) { if (!manifestsInUse.contains(manifest) && !deletedManifests.contains(manifest)) { manifestFile.delete(manifest.fileName()); deletedManifests.add(manifest); @@ -194,7 +205,7 @@ public class FileStoreExpireImpl implements FileStoreExpire { } if (toExpire.changelogManifestList() != null) { for (ManifestFileMeta manifest : - manifestList.read(toExpire.changelogManifestList())) { + tryReadManifestList(toExpire.changelogManifestList())) { manifestFile.delete(manifest.fileName()); } } @@ -260,10 +271,39 @@ public class FileStoreExpireImpl implements FileStoreExpire { private Iterable<ManifestEntry> getManifestEntriesFromManifestList(String manifestListName) { List<String> manifestFiles = - manifestList.read(manifestListName).stream() + tryReadManifestList(manifestListName).stream() .map(ManifestFileMeta::fileName) .collect(Collectors.toList()); - return manifestFile.readManifestFiles(manifestFiles); + Queue<String> files = new LinkedList<>(manifestFiles); + return Iterables.concat( + (Iterable<Iterable<ManifestEntry>>) + () -> + new Iterator<Iterable<ManifestEntry>>() { + @Override + public boolean hasNext() { + return files.size() > 0; + } + + @Override + public Iterable<ManifestEntry> next() { + String file = files.poll(); + try { + return manifestFile.read(file); + } catch (Exception e) { + LOG.warn("Failed to read manifest file " + file, e); + return Collections.emptyList(); + } + } + }); + } + + private List<ManifestFileMeta> tryReadManifestList(String manifestListName) { + try { + return manifestList.read(manifestListName); + } catch (Exception e) { + LOG.warn("Failed to read manifest list file " + manifestListName, e); + return Collections.emptyList(); + } } private void writeEarliestHint(long earliest) { diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java index 376befa8..0c61503f 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java @@ -404,9 +404,6 @@ public class TestFileStore extends KeyValueFileStore { private Set<Path> getFilesInUse() { Set<Path> result = new HashSet<>(); - FileStorePathFactory pathFactory = pathFactory(); - ManifestList manifestList = manifestListFactory().create(); - FileStoreScan scan = newScan(); SchemaManager schemaManager = new SchemaManager(options.path()); schemaManager.listAllIds().forEach(id -> result.add(schemaManager.toSchemaPath(id))); @@ -427,35 +424,48 @@ public class TestFileStore extends KeyValueFileStore { } for (long id = firstInUseSnapshotId; id <= latestSnapshotId; id++) { - Path snapshotPath = snapshotManager.snapshotPath(id); - Snapshot snapshot = Snapshot.fromPath(snapshotPath); + result.addAll(getFilesInUse(id)); + } + return result; + } - // snapshot file - result.add(snapshotPath); + public Set<Path> getFilesInUse(long snapshotId) { + Set<Path> result = new HashSet<>(); - // manifest lists - result.add(pathFactory.toManifestListPath(snapshot.baseManifestList())); - result.add(pathFactory.toManifestListPath(snapshot.deltaManifestList())); - if (snapshot.changelogManifestList() != null) { - result.add(pathFactory.toManifestListPath(snapshot.changelogManifestList())); - } + SnapshotManager snapshotManager = snapshotManager(); + FileStorePathFactory pathFactory = pathFactory(); + ManifestList manifestList = manifestListFactory().create(); + FileStoreScan scan = newScan(); - // manifests - List<ManifestFileMeta> manifests = snapshot.readAllDataManifests(manifestList); - if (snapshot.changelogManifestList() != null) { - manifests.addAll(manifestList.read(snapshot.changelogManifestList())); - } - manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName()))); + Path snapshotPath = snapshotManager.snapshotPath(snapshotId); + Snapshot snapshot = Snapshot.fromPath(snapshotPath); - // data file - List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files(); - for (ManifestEntry entry : entries) { - result.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); - } + // snapshot file + result.add(snapshotPath); + + // manifest lists + result.add(pathFactory.toManifestListPath(snapshot.baseManifestList())); + result.add(pathFactory.toManifestListPath(snapshot.deltaManifestList())); + if (snapshot.changelogManifestList() != null) { + result.add(pathFactory.toManifestListPath(snapshot.changelogManifestList())); } + + // manifests + List<ManifestFileMeta> manifests = snapshot.readAllDataManifests(manifestList); + if (snapshot.changelogManifestList() != null) { + manifests.addAll(manifestList.read(snapshot.changelogManifestList())); + } + manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName()))); + + // data file + List<ManifestEntry> entries = scan.withManifestList(manifests).plan().files(); + for (ManifestEntry entry : entries) { + result.add( + new Path( + pathFactory.bucketPath(entry.partition(), entry.bucket()), + entry.file().fileName())); + } + return result; } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/CleanedFileStoreExpireTest.java similarity index 68% rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/CleanedFileStoreExpireTest.java index 8c4da4c2..a03717d9 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/CleanedFileStoreExpireTest.java @@ -21,82 +21,29 @@ package org.apache.flink.table.store.file.operation; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.binary.BinaryRowData; -import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.file.KeyValue; -import org.apache.flink.table.store.file.Snapshot; -import org.apache.flink.table.store.file.TestFileStore; -import org.apache.flink.table.store.file.TestKeyValueGenerator; import org.apache.flink.table.store.file.io.DataFileMeta; import org.apache.flink.table.store.file.manifest.FileKind; import org.apache.flink.table.store.file.manifest.ManifestEntry; -import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction; -import org.apache.flink.table.store.file.schema.SchemaManager; -import org.apache.flink.table.store.file.schema.UpdateSchema; import org.apache.flink.table.store.file.utils.FileUtils; import org.apache.flink.table.store.file.utils.SnapshotManager; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link FileStoreExpireImpl}. */ -public class FileStoreExpireTest { - - private TestKeyValueGenerator gen; - @TempDir java.nio.file.Path tempDir; - private TestFileStore store; - private SnapshotManager snapshotManager; - - @BeforeEach - public void beforeEach() throws Exception { - gen = new TestKeyValueGenerator(); - store = createStore(); - snapshotManager = store.snapshotManager(); - SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri())); - schemaManager.commitNewVersion( - new UpdateSchema( - TestKeyValueGenerator.DEFAULT_ROW_TYPE, - TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), - TestKeyValueGenerator.getPrimaryKeys( - TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), - Collections.emptyMap(), - null)); - } - - private TestFileStore createStore() { - ThreadLocalRandom random = ThreadLocalRandom.current(); - - CoreOptions.ChangelogProducer changelogProducer; - if (random.nextBoolean()) { - changelogProducer = CoreOptions.ChangelogProducer.INPUT; - } else { - changelogProducer = CoreOptions.ChangelogProducer.NONE; - } - - return new TestFileStore.Builder( - "avro", - tempDir.toString(), - 1, - TestKeyValueGenerator.DEFAULT_PART_TYPE, - TestKeyValueGenerator.KEY_TYPE, - TestKeyValueGenerator.DEFAULT_ROW_TYPE, - TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, - DeduplicateMergeFunction.factory()) - .changelogProducer(changelogProducer) - .build(); - } +/** + * Tests for {@link FileStoreExpireImpl}. After expiration, only useful files should be retained. + */ +public class CleanedFileStoreExpireTest extends FileStoreExpireTestBase { @AfterEach public void afterEach() throws IOException { @@ -265,33 +212,4 @@ public class FileStoreExpireTest { } } } - - private void commit(int numCommits, List<KeyValue> allData, List<Integer> snapshotPositions) - throws Exception { - for (int i = 0; i < numCommits; i++) { - int numRecords = ThreadLocalRandom.current().nextInt(100) + 1; - List<KeyValue> data = new ArrayList<>(); - for (int j = 0; j < numRecords; j++) { - data.add(gen.next()); - } - allData.addAll(data); - List<Snapshot> snapshots = store.commitData(data, gen::getPartition, kv -> 0); - for (int j = 0; j < snapshots.size(); j++) { - snapshotPositions.add(allData.size()); - } - } - } - - private void assertSnapshot( - int snapshotId, List<KeyValue> allData, List<Integer> snapshotPositions) - throws Exception { - Map<BinaryRowData, BinaryRowData> expected = - store.toKvMap(allData.subList(0, snapshotPositions.get(snapshotId - 1))); - List<KeyValue> actualKvs = - store.readKvsFromManifestEntries( - store.newScan().withSnapshot(snapshotId).plan().files(), false); - gen.sort(actualKvs); - Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs); - assertThat(actual).isEqualTo(expected); - } } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTestBase.java new file mode 100644 index 00000000..3cddfb60 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/FileStoreExpireTestBase.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.operation; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.CoreOptions; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.Snapshot; +import org.apache.flink.table.store.file.TestFileStore; +import org.apache.flink.table.store.file.TestKeyValueGenerator; +import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction; +import org.apache.flink.table.store.file.schema.SchemaManager; +import org.apache.flink.table.store.file.schema.UpdateSchema; +import org.apache.flink.table.store.file.utils.SnapshotManager; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Base test class for {@link FileStoreExpireImpl}. */ +public class FileStoreExpireTestBase { + + protected TestKeyValueGenerator gen; + @TempDir java.nio.file.Path tempDir; + protected TestFileStore store; + protected SnapshotManager snapshotManager; + + @BeforeEach + public void beforeEach() throws Exception { + gen = new TestKeyValueGenerator(); + store = createStore(); + snapshotManager = store.snapshotManager(); + SchemaManager schemaManager = new SchemaManager(new Path(tempDir.toUri())); + schemaManager.commitNewVersion( + new UpdateSchema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + Collections.emptyMap(), + null)); + } + + private TestFileStore createStore() { + ThreadLocalRandom random = ThreadLocalRandom.current(); + + CoreOptions.ChangelogProducer changelogProducer; + if (random.nextBoolean()) { + changelogProducer = CoreOptions.ChangelogProducer.INPUT; + } else { + changelogProducer = CoreOptions.ChangelogProducer.NONE; + } + + return new TestFileStore.Builder( + "avro", + tempDir.toString(), + 1, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory()) + .changelogProducer(changelogProducer) + .build(); + } + + protected void commit(int numCommits, List<KeyValue> allData, List<Integer> snapshotPositions) + throws Exception { + for (int i = 0; i < numCommits; i++) { + int numRecords = ThreadLocalRandom.current().nextInt(100) + 1; + List<KeyValue> data = new ArrayList<>(); + for (int j = 0; j < numRecords; j++) { + data.add(gen.next()); + } + allData.addAll(data); + List<Snapshot> snapshots = store.commitData(data, gen::getPartition, kv -> 0); + for (int j = 0; j < snapshots.size(); j++) { + snapshotPositions.add(allData.size()); + } + } + } + + protected void assertSnapshot( + int snapshotId, List<KeyValue> allData, List<Integer> snapshotPositions) + throws Exception { + Map<BinaryRowData, BinaryRowData> expected = + store.toKvMap(allData.subList(0, snapshotPositions.get(snapshotId - 1))); + List<KeyValue> actualKvs = + store.readKvsFromManifestEntries( + store.newScan().withSnapshot(snapshotId).plan().files(), false); + gen.sort(actualKvs); + Map<BinaryRowData, BinaryRowData> actual = store.toKvMap(actualKvs); + assertThat(actual).isEqualTo(expected); + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/UncleanedFileStoreExpireTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/UncleanedFileStoreExpireTest.java new file mode 100644 index 00000000..0f7b38df --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/UncleanedFileStoreExpireTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.operation; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.utils.FileUtils; + +import org.junit.jupiter.api.Test; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link FileStoreExpireImpl}. Some files not in use may still remain after the test due + * to the testing methods. + */ +public class UncleanedFileStoreExpireTest extends FileStoreExpireTestBase { + + @Test + public void testExpireWithMissingFiles() throws Exception { + FileStoreExpire expire = store.newExpire(1, 1, 1); + + List<KeyValue> allData = new ArrayList<>(); + List<Integer> snapshotPositions = new ArrayList<>(); + commit(5, allData, snapshotPositions); + + int latestSnapshotId = snapshotManager.latestSnapshotId().intValue(); + Set<Path> filesInUse = store.getFilesInUse(latestSnapshotId); + List<Path> unusedFileList = + Files.walk(Paths.get(tempDir.toString())) + .filter(Files::isRegularFile) + .filter(p -> !p.getFileName().toString().startsWith("snapshot")) + .filter(p -> !p.getFileName().toString().startsWith("schema")) + .map(p -> new Path(p.toString())) + .filter(p -> !filesInUse.contains(p)) + .collect(Collectors.toList()); + + // shuffle list + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = unusedFileList.size() - 1; i > 0; i--) { + int j = random.nextInt(i + 1); + Collections.swap(unusedFileList, i, j); + } + + // delete some unused files + int numFilesToDelete = random.nextInt(unusedFileList.size()); + for (int i = 0; i < numFilesToDelete; i++) { + FileUtils.deleteOrWarn(unusedFileList.get(i)); + } + + expire.expire(); + + for (int i = 1; i < latestSnapshotId; i++) { + assertThat(snapshotManager.snapshotExists(i)).isFalse(); + } + assertThat(snapshotManager.snapshotExists(latestSnapshotId)).isTrue(); + assertSnapshot(latestSnapshotId, allData, snapshotPositions); + } +}