This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 7ac835ffe828a511689ac080d90d2a729aa08d89 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Wed Jan 12 19:56:26 2022 +0800 [FLINK-25628] Introduce SstFile and SstFileMeta This closes #4 Co-authored-by: tsreaper <tsreape...@gmail.com> --- .../table/store/file/mergetree/sst/SstFile.java | 269 +++++++++++++++++++++ .../store/file/mergetree/sst/SstFileMeta.java | 186 ++++++++++++++ .../file/mergetree/sst/SstFileMetaSerializer.java | 69 ++++++ .../flink/table/store/file/utils/FileUtils.java | 76 ++++++ .../mergetree/sst/SstFileMetaSerializerTest.java | 39 +++ .../store/file/mergetree/sst/SstFileTest.java | 245 +++++++++++++++++++ .../file/mergetree/sst/SstTestDataGenerator.java | 178 ++++++++++++++ .../file/utils/FailingAtomicRenameFileSystem.java | 143 +++++++++++ .../store/file/utils/FileStorePathFactory.java | 112 +++++++++ .../file/utils/TestAtomicRenameFileSystem.java | 121 +++++++++ .../org.apache.flink.core.fs.FileSystemFactory | 17 ++ 11 files changed, 1455 insertions(+) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java new file mode 100644 index 0000000..d73ff45 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFile.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.sst; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.reader.BulkFormat; +import org.apache.flink.connector.file.src.util.RecordAndPosition; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.store.file.FileFormat; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.KeyValueSerializer; +import org.apache.flink.table.store.file.stats.FieldStats; +import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.CloseableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * This file includes several {@link KeyValue}s, representing the changes inserted into the file + * storage. + */ +public class SstFile { + + private static final Logger LOG = LoggerFactory.getLogger(SstFile.class); + + private final RowType keyType; + private final RowType valueType; + + private final BulkFormat<RowData, FileSourceSplit> readerFactory; + private final BulkWriter.Factory<RowData> writerFactory; + private final SstPathFactory pathFactory; + private final long suggestedFileSize; + + public SstFile( + RowType keyType, + RowType valueType, + FileFormat fileFormat, + SstPathFactory pathFactory, + long suggestedFileSize) { + this.keyType = keyType; + this.valueType = valueType; + + RowType recordType = KeyValue.schema(keyType, valueType); + this.readerFactory = fileFormat.createReaderFactory(recordType); + this.writerFactory = fileFormat.createWriterFactory(recordType); + this.pathFactory = pathFactory; + this.suggestedFileSize = suggestedFileSize; + } + + public RowType keyType() { + return keyType; + } + + public RowType valueType() { + return valueType; + } + + @VisibleForTesting + public long suggestedFileSize() { + return suggestedFileSize; + } + + public RecordReader read(String fileName) throws IOException { + return new SstFileRecordReader(pathFactory.toPath(fileName)); + } + + /** + * Write several {@link KeyValue}s into an sst file of a given level. + * + * <p>NOTE: This method is atomic. + */ + public List<SstFileMeta> write(CloseableIterator<KeyValue> iterator, int level) + throws Exception { + List<SstFileMeta> result = new ArrayList<>(); + + RollingFile rollingFile = null; + Path currentPath = null; + try { + while (iterator.hasNext()) { + if (rollingFile == null) { + currentPath = pathFactory.newPath(); + rollingFile = new RollingFile(currentPath, suggestedFileSize); + } + rollingFile.write(iterator.next()); + if (rollingFile.exceedsSuggestedFileSize()) { + result.add(rollingFile.finish(level)); + rollingFile = null; + } + } + // finish last file + if (rollingFile != null) { + result.add(rollingFile.finish(level)); + } + iterator.close(); + } catch (Throwable e) { + LOG.warn("Exception occurs when writing sst files. Cleaning up.", e); + // clean up finished files + for (SstFileMeta meta : result) { + FileUtils.deleteOrWarn(pathFactory.toPath(meta.fileName())); + } + // clean up in-progress file + if (currentPath != null) { + FileUtils.deleteOrWarn(currentPath); + } + throw e; + } + + return result; + } + + public void delete(SstFileMeta file) { + FileUtils.deleteOrWarn(pathFactory.toPath(file.fileName())); + } + + private class SstFileRecordReader implements RecordReader { + + private final BulkFormat.Reader<RowData> reader; + private final KeyValueSerializer serializer; + + private SstFileRecordReader(Path path) throws IOException { + long fileSize = FileUtils.getFileSize(path); + FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize, 0, fileSize); + this.reader = readerFactory.createReader(FileUtils.DEFAULT_READER_CONFIG, split); + this.serializer = new KeyValueSerializer(keyType, valueType); + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + BulkFormat.RecordIterator<RowData> iterator = reader.readBatch(); + return iterator == null ? null : new SstFileRecordIterator(iterator, serializer); + } + + @Override + public void close() throws IOException { + reader.close(); + } + } + + private static class SstFileRecordIterator implements RecordReader.RecordIterator { + + private final BulkFormat.RecordIterator<RowData> iterator; + private final KeyValueSerializer serializer; + + private SstFileRecordIterator( + BulkFormat.RecordIterator<RowData> iterator, KeyValueSerializer serializer) { + this.iterator = iterator; + this.serializer = serializer; + } + + @Override + public KeyValue next() throws IOException { + RecordAndPosition<RowData> result = iterator.next(); + return result == null ? null : serializer.fromRow(result.getRecord()); + } + + @Override + public void releaseBatch() { + iterator.releaseBatch(); + } + } + + private class RollingFile { + private final Path path; + private final long suggestedFileSize; + + private final FSDataOutputStream out; + private final BulkWriter<RowData> writer; + private final KeyValueSerializer serializer; + private final RowDataSerializer keySerializer; + + private long rowCount; + private BinaryRowData minKey; + private RowData maxKey; + private long minSequenceNumber; + private long maxSequenceNumber; + + private RollingFile(Path path, long suggestedFileSize) throws IOException { + this.path = path; + this.suggestedFileSize = suggestedFileSize; + + this.out = + this.path.getFileSystem().create(this.path, FileSystem.WriteMode.NO_OVERWRITE); + this.writer = writerFactory.create(out); + this.serializer = new KeyValueSerializer(keyType, valueType); + this.keySerializer = new RowDataSerializer(keyType); + + this.rowCount = 0; + this.minKey = null; + this.maxKey = null; + this.minSequenceNumber = Long.MAX_VALUE; + this.maxSequenceNumber = Long.MIN_VALUE; + } + + private void write(KeyValue kv) throws IOException { + writer.addElement(serializer.toRow(kv)); + + rowCount++; + if (minKey == null) { + minKey = keySerializer.toBinaryRow(kv.key()).copy(); + } + maxKey = kv.key(); + minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber()); + maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber()); + } + + private boolean exceedsSuggestedFileSize() throws IOException { + // NOTE: this method is inaccurate for formats buffering changes in memory + return out.getPos() >= suggestedFileSize; + } + + private SstFileMeta finish(int level) throws IOException { + writer.finish(); + out.close(); + + // TODO + // 1. Read statistics directly from the written orc/parquet files. + // 2. For other file formats use StatsCollector. Make sure fields are not reused + // otherwise we need copying. + FieldStats[] stats = new FieldStats[valueType.getFieldCount()]; + for (int i = 0; i < stats.length; i++) { + stats[i] = new FieldStats(null, null, 0); + } + + return new SstFileMeta( + path.getName(), + FileUtils.getFileSize(path), + rowCount, + minKey, + keySerializer.toBinaryRow(maxKey).copy(), + stats, + minSequenceNumber, + maxSequenceNumber, + level); + } + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java new file mode 100644 index 0000000..5a7e99a --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMeta.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.sst; + +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.stats.FieldStats; +import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Metadata of a SST file. */ +public class SstFileMeta { + + private final String fileName; + private final long fileSize; + private final long rowCount; + + private final BinaryRowData minKey; + private final BinaryRowData maxKey; + private final FieldStats[] stats; + + private final long minSequenceNumber; + private final long maxSequenceNumber; + private final int level; + + public SstFileMeta( + String fileName, + long fileSize, + long rowCount, + BinaryRowData minKey, + BinaryRowData maxKey, + FieldStats[] stats, + long minSequenceNumber, + long maxSequenceNumber, + int level) { + this.fileName = fileName; + this.fileSize = fileSize; + this.rowCount = rowCount; + + this.minKey = minKey; + this.maxKey = maxKey; + this.stats = stats; + + this.minSequenceNumber = minSequenceNumber; + this.maxSequenceNumber = maxSequenceNumber; + this.level = level; + } + + public String fileName() { + return fileName; + } + + public long fileSize() { + return fileSize; + } + + public long rowCount() { + return rowCount; + } + + public BinaryRowData minKey() { + return minKey; + } + + public BinaryRowData maxKey() { + return maxKey; + } + + /** Element in the array may be null, indicating the statistics of this field is unknown. */ + public FieldStats[] stats() { + return stats; + } + + public long minSequenceNumber() { + return minSequenceNumber; + } + + public long maxSequenceNumber() { + return maxSequenceNumber; + } + + public int level() { + return level; + } + + public SstFileMeta upgrade(int newLevel) { + checkArgument(newLevel > this.level); + return new SstFileMeta( + fileName, + fileSize, + rowCount, + minKey, + maxKey, + stats, + minSequenceNumber, + maxSequenceNumber, + newLevel); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof SstFileMeta)) { + return false; + } + SstFileMeta that = (SstFileMeta) o; + return Objects.equals(fileName, that.fileName) + && fileSize == that.fileSize + && rowCount == that.rowCount + && Objects.equals(minKey, that.minKey) + && Objects.equals(maxKey, that.maxKey) + && Arrays.equals(stats, that.stats) + && minSequenceNumber == that.minSequenceNumber + && maxSequenceNumber == that.maxSequenceNumber + && level == that.level; + } + + @Override + public int hashCode() { + return Objects.hash( + fileName, + fileSize, + rowCount, + minKey, + maxKey, + // by default, hash code of arrays are computed by reference, not by content. + // so we must use Arrays.hashCode to hash by content. + Arrays.hashCode(stats), + minSequenceNumber, + maxSequenceNumber, + level); + } + + @Override + public String toString() { + return String.format( + "{%s, %d, %d, %s, %s, %s, %d, %d, %d}", + fileName, + fileSize, + rowCount, + minKey, + maxKey, + Arrays.toString(stats), + minSequenceNumber, + maxSequenceNumber, + level); + } + + public static RowType schema(RowType keyType, RowType rowType) { + List<RowType.RowField> fields = new ArrayList<>(); + fields.add(new RowType.RowField("_FILE_NAME", new VarCharType(false, Integer.MAX_VALUE))); + fields.add(new RowType.RowField("_FILE_SIZE", new BigIntType(false))); + fields.add(new RowType.RowField("_ROW_COUNT", new BigIntType(false))); + fields.add(new RowType.RowField("_MIN_KEY", keyType)); + fields.add(new RowType.RowField("_MAX_KEY", keyType)); + fields.add(new RowType.RowField("_STATS", FieldStatsArraySerializer.schema(rowType))); + fields.add(new RowType.RowField("_MIN_SEQUENCE_NUMBER", new BigIntType(false))); + fields.add(new RowType.RowField("_MAX_SEQUENCE_NUMBER", new BigIntType(false))); + fields.add(new RowType.RowField("_LEVEL", new IntType(false))); + return new RowType(fields); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java new file mode 100644 index 0000000..0e176d1 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.sst; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer; +import org.apache.flink.table.store.file.utils.ObjectSerializer; +import org.apache.flink.table.types.logical.RowType; + +/** Serializer for {@link SstFileMeta}. */ +public class SstFileMetaSerializer extends ObjectSerializer<SstFileMeta> { + + private final RowDataSerializer keySerializer; + private final FieldStatsArraySerializer statsArraySerializer; + + public SstFileMetaSerializer(RowType keyType, RowType rowType) { + super(SstFileMeta.schema(keyType, rowType)); + this.keySerializer = new RowDataSerializer(keyType); + this.statsArraySerializer = new FieldStatsArraySerializer(rowType); + } + + @Override + public RowData toRow(SstFileMeta meta) { + return GenericRowData.of( + StringData.fromString(meta.fileName()), + meta.fileSize(), + meta.rowCount(), + meta.minKey(), + meta.maxKey(), + statsArraySerializer.toRow(meta.stats()), + meta.minSequenceNumber(), + meta.maxSequenceNumber(), + meta.level()); + } + + @Override + public SstFileMeta fromRow(RowData row) { + int keyFieldCount = keySerializer.getArity(); + return new SstFileMeta( + row.getString(0).toString(), + row.getLong(1), + row.getLong(2), + keySerializer.toBinaryRow(row.getRow(3, keyFieldCount)).copy(), + keySerializer.toBinaryRow(row.getRow(4, keyFieldCount)).copy(), + statsArraySerializer.fromRow(row.getRow(5, statsArraySerializer.numFields())), + row.getLong(6), + row.getLong(7), + row.getInt(8)); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java new file mode 100644 index 0000000..b787b9a --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/FileUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.SourceReaderOptions; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.reader.BulkFormat; +import org.apache.flink.connector.file.src.util.Utils; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** Utils for file reading and writing. */ +public class FileUtils { + + private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class); + + public static final Configuration DEFAULT_READER_CONFIG = new Configuration(); + + static { + DEFAULT_READER_CONFIG.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1); + } + + public static <T> List<T> readListFromFile( + Path path, + ObjectSerializer<T> serializer, + BulkFormat<RowData, FileSourceSplit> readerFactory) + throws IOException { + List<T> result = new ArrayList<>(); + long fileSize = FileUtils.getFileSize(path); + FileSourceSplit split = new FileSourceSplit("ignore", path, 0, fileSize, 0, fileSize); + BulkFormat.Reader<RowData> reader = + readerFactory.createReader(DEFAULT_READER_CONFIG, split); + Utils.forEachRemaining(reader, row -> result.add(serializer.fromRow(row))); + return result; + } + + public static long getFileSize(Path path) throws IOException { + return path.getFileSystem().getFileStatus(path).getLen(); + } + + public static void deleteOrWarn(Path file) { + try { + FileSystem fs = file.getFileSystem(); + if (!fs.delete(file, false) && fs.exists(file)) { + LOG.warn("Failed to delete file " + file); + } + } catch (IOException e) { + LOG.warn("Exception occurs when deleting file " + file, e); + } + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java new file mode 100644 index 0000000..28e03b6 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileMetaSerializerTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.sst; + +import org.apache.flink.table.store.file.TestKeyValueGenerator; +import org.apache.flink.table.store.file.utils.ObjectSerializerTestBase; + +/** Tests for {@link SstFileMetaSerializer}. */ +public class SstFileMetaSerializerTest extends ObjectSerializerTestBase<SstFileMeta> { + + private final SstTestDataGenerator gen = SstTestDataGenerator.builder().build(); + + @Override + protected SstFileMetaSerializer serializer() { + return new SstFileMetaSerializer( + TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE); + } + + @Override + protected SstFileMeta object() { + return gen.next().meta; + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java new file mode 100644 index 0000000..de6ed25 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstFileTest.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.sst; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.reader.BulkFormat; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.store.file.FileFormat; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.KeyValueSerializerTest; +import org.apache.flink.table.store.file.TestKeyValueGenerator; +import org.apache.flink.table.store.file.stats.FieldStats; +import org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem; +import org.apache.flink.table.store.file.utils.FileStorePathFactory; +import org.apache.flink.table.store.file.utils.RecordReaderIterator; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link SstFile}. */ +public class SstFileTest { + + private final SstTestDataGenerator gen = + SstTestDataGenerator.builder().memTableCapacity(20).build(); + private final FileFormat flushingAvro = new FlushingAvroFormat(); + + @TempDir java.nio.file.Path tempDir; + + @RepeatedTest(10) + public void testWriteAndReadSstFile() throws Exception { + SstTestDataGenerator.SstFile data = gen.next(); + SstFile sstFile = createSstFile(tempDir.toString()); + SstFileMetaSerializer serializer = + new SstFileMetaSerializer( + TestKeyValueGenerator.KEY_TYPE, TestKeyValueGenerator.ROW_TYPE); + + List<SstFileMeta> actualMetas = + sstFile.write(CloseableIterator.fromList(data.content, kv -> {}), 0); + + checkRollingFiles(data.meta, actualMetas, sstFile.suggestedFileSize()); + + Iterator<KeyValue> expectedIterator = data.content.iterator(); + for (SstFileMeta meta : actualMetas) { + // check the contents of sst file + CloseableIterator<KeyValue> actualKvsIterator = + new RecordReaderIterator(sstFile.read(meta.fileName())); + while (actualKvsIterator.hasNext()) { + assertThat(expectedIterator.hasNext()).isTrue(); + KeyValue actualKv = actualKvsIterator.next(); + assertThat( + KeyValueSerializerTest.equals( + expectedIterator.next(), + actualKv, + TestKeyValueGenerator.KEY_SERIALIZER, + TestKeyValueGenerator.ROW_SERIALIZER)) + .isTrue(); + } + actualKvsIterator.close(); + + // check that each sst file meta is serializable + assertThat(serializer.fromRow(serializer.toRow(meta))).isEqualTo(meta); + } + assertThat(expectedIterator.hasNext()).isFalse(); + } + + @RepeatedTest(10) + public void testCleanUpForException() throws IOException { + FailingAtomicRenameFileSystem.resetFailCounter(1); + FailingAtomicRenameFileSystem.setFailPossibility(10); + SstTestDataGenerator.SstFile data = gen.next(); + SstFile sstFile = + createSstFile(FailingAtomicRenameFileSystem.SCHEME + "://" + tempDir.toString()); + + try { + sstFile.write(CloseableIterator.fromList(data.content, kv -> {}), 0); + } catch (Throwable e) { + assertThat(e) + .isExactlyInstanceOf(FailingAtomicRenameFileSystem.ArtificialException.class); + Path root = new Path(tempDir.toString()); + FileSystem fs = root.getFileSystem(); + for (FileStatus bucketStatus : fs.listStatus(root)) { + assertThat(bucketStatus.isDir()).isTrue(); + assertThat(fs.listStatus(bucketStatus.getPath())).isEmpty(); + } + } + } + + private SstFile createSstFile(String path) { + FileStorePathFactory fileStorePathFactory = new FileStorePathFactory(new Path(path)); + SstPathFactory sstPathFactory = fileStorePathFactory.createSstPathFactory(null, 0); + int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024; + return new SstFile( + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.ROW_TYPE, + // normal avro format will buffer changes in memory and we can't determine + // if the written file size is really larger than suggested, so we use a + // special avro format which flushes for every added element + flushingAvro, + sstPathFactory, + suggestedFileSize); + } + + private void checkRollingFiles( + SstFileMeta expected, List<SstFileMeta> actual, long suggestedFileSize) { + // all but last file should be no smaller than suggestedFileSize + for (int i = 0; i + 1 < actual.size(); i++) { + assertThat(actual.get(i).fileSize() >= suggestedFileSize).isTrue(); + } + + // expected.rowCount == sum(rowCount) + assertThat(actual.stream().mapToLong(SstFileMeta::rowCount).sum()) + .isEqualTo(expected.rowCount()); + + // expected.minKey == firstFile.minKey + assertThat(actual.get(0).minKey()).isEqualTo(expected.minKey()); + + // expected.maxKey == lastFile.maxKey + assertThat(actual.get(actual.size() - 1).maxKey()).isEqualTo(expected.maxKey()); + + // TODO check stats after they're collected + /* + for (int i = 0; i < expected.stats().length; i++) { + List<FieldStats> actualStats = new ArrayList<>(); + for (SstFileMeta meta : actual) { + actualStats.add(meta.stats()[i]); + } + checkRollingFileStats(expected.stats()[i], actualStats); + } + */ + + // expected.minSequenceNumber == min(minSequenceNumber) + assertThat(actual.stream().mapToLong(SstFileMeta::minSequenceNumber).min().orElse(-1)) + .isEqualTo(expected.minSequenceNumber()); + + // expected.maxSequenceNumber == max(maxSequenceNumber) + assertThat(actual.stream().mapToLong(SstFileMeta::maxSequenceNumber).max().orElse(-1)) + .isEqualTo(expected.maxSequenceNumber()); + + // expected.level == eachFile.level + for (SstFileMeta meta : actual) { + assertThat(meta.level()).isEqualTo(expected.level()); + } + } + + @SuppressWarnings("unchecked") + private void checkRollingFileStats(FieldStats expected, List<FieldStats> actual) { + if (expected.minValue() instanceof Comparable) { + Object actualMin = null; + Object actualMax = null; + for (FieldStats stats : actual) { + if (stats.minValue() != null + && (actualMin == null + || ((Comparable<Object>) stats.minValue()).compareTo(actualMin) + < 0)) { + actualMin = stats.minValue(); + } + if (stats.maxValue() != null + && (actualMax == null + || ((Comparable<Object>) stats.maxValue()).compareTo(actualMax) + > 0)) { + actualMax = stats.maxValue(); + } + } + assertThat(actualMin).isEqualTo(expected.minValue()); + assertThat(actualMax).isEqualTo(expected.maxValue()); + } else { + for (FieldStats stats : actual) { + assertThat(stats.minValue()).isNull(); + assertThat(stats.maxValue()).isNull(); + } + } + assertThat(actual.stream().mapToLong(FieldStats::nullCount).sum()) + .isEqualTo(expected.nullCount()); + } + + /** A special avro {@link FileFormat} which flushes for every added element. */ + public static class FlushingAvroFormat implements FileFormat { + + private final FileFormat avro = + FileFormat.fromIdentifier( + SstFileTest.class.getClassLoader(), "avro", new Configuration()); + + @Override + public BulkFormat<RowData, FileSourceSplit> createReaderFactory( + RowType type, List<ResolvedExpression> filters) { + return avro.createReaderFactory(type, filters); + } + + @Override + public BulkWriter.Factory<RowData> createWriterFactory(RowType type) { + return fsDataOutputStream -> { + BulkWriter<RowData> wrapped = + avro.createWriterFactory(type).create(fsDataOutputStream); + return new BulkWriter<RowData>() { + @Override + public void addElement(RowData rowData) throws IOException { + wrapped.addElement(rowData); + wrapped.flush(); + } + + @Override + public void flush() throws IOException { + wrapped.flush(); + } + + @Override + public void finish() throws IOException { + wrapped.finish(); + } + }; + }; + } + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java new file mode 100644 index 0000000..94b2f08 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/sst/SstTestDataGenerator.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.sst; + +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.TestKeyValueGenerator; +import org.apache.flink.table.store.file.stats.FieldStatsCollector; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** Random {@link SstFileMeta} generator. */ +public class SstTestDataGenerator { + + private final int numBuckets; + private final int memTableCapacity; + + private final List<Map<BinaryRowData, List<KeyValue>>> memTables; + private final TestKeyValueGenerator gen; + + private SstTestDataGenerator(int numBuckets, int memTableCapacity) { + this.numBuckets = numBuckets; + this.memTableCapacity = memTableCapacity; + + this.memTables = new ArrayList<>(); + for (int i = 0; i < numBuckets; i++) { + memTables.add(new HashMap<>()); + } + this.gen = new TestKeyValueGenerator(); + } + + public SstFile next() { + while (true) { + KeyValue kv = gen.next(); + BinaryRowData key = (BinaryRowData) kv.key(); + BinaryRowData partition = gen.getPartition(kv); + int bucket = (key.hashCode() % numBuckets + numBuckets) % numBuckets; + List<KeyValue> memTable = + memTables.get(bucket).computeIfAbsent(partition, k -> new ArrayList<>()); + memTable.add(kv); + + if (memTable.size() >= memTableCapacity) { + List<SstFile> result = createSstFiles(memTable, 0, partition, bucket); + memTable.clear(); + assert result.size() == 1; + return result.get(0); + } + } + } + + public List<SstFile> createSstFiles( + List<KeyValue> kvs, int level, BinaryRowData partition, int bucket) { + gen.sort(kvs); + List<KeyValue> combined = new ArrayList<>(); + for (int i = 0; i + 1 < kvs.size(); i++) { + KeyValue now = kvs.get(i); + KeyValue next = kvs.get(i + 1); + if (!now.key().equals(next.key())) { + combined.add(now); + } + } + combined.add(kvs.get(kvs.size() - 1)); + + int capacity = memTableCapacity; + for (int i = 0; i < level; i++) { + capacity *= memTableCapacity; + } + List<SstFile> result = new ArrayList<>(); + for (int i = 0; i < combined.size(); i += capacity) { + result.add( + createSstFile( + combined.subList(i, Math.min(i + capacity, combined.size())), + level, + partition, + bucket)); + } + return result; + } + + private SstFile createSstFile( + List<KeyValue> kvs, int level, BinaryRowData partition, int bucket) { + FieldStatsCollector collector = new FieldStatsCollector(TestKeyValueGenerator.ROW_TYPE); + long totalSize = 0; + BinaryRowData minKey = null; + BinaryRowData maxKey = null; + long minSequenceNumber = Long.MAX_VALUE; + long maxSequenceNumber = Long.MIN_VALUE; + for (KeyValue kv : kvs) { + BinaryRowData key = (BinaryRowData) kv.key(); + BinaryRowData value = (BinaryRowData) kv.value(); + totalSize += key.getSizeInBytes() + value.getSizeInBytes(); + collector.collect(value); + if (minKey == null || gen.compareKeys(key, minKey) < 0) { + minKey = key; + } + if (maxKey == null || gen.compareKeys(key, maxKey) > 0) { + maxKey = key; + } + minSequenceNumber = Math.min(minSequenceNumber, kv.sequenceNumber()); + maxSequenceNumber = Math.max(maxSequenceNumber, kv.sequenceNumber()); + } + + return new SstFile( + partition, + bucket, + new SstFileMeta( + "sst-" + UUID.randomUUID(), + totalSize, + kvs.size(), + minKey, + maxKey, + collector.extract(), + minSequenceNumber, + maxSequenceNumber, + level), + kvs); + } + + /** An in-memory SST file. */ + public static class SstFile { + public final BinaryRowData partition; + public final int bucket; + public final SstFileMeta meta; + public final List<KeyValue> content; + + private SstFile( + BinaryRowData partition, int bucket, SstFileMeta meta, List<KeyValue> content) { + this.partition = partition; + this.bucket = bucket; + this.meta = meta; + this.content = content; + } + } + + public static Builder builder() { + return new Builder(); + } + + /** Builder for {@link SstTestDataGenerator}. */ + public static class Builder { + private int numBuckets = 3; + private int memTableCapacity = 3; + + public Builder numBuckets(int value) { + this.numBuckets = value; + return this; + } + + public Builder memTableCapacity(int value) { + this.memTableCapacity = value; + return this; + } + + public SstTestDataGenerator build() { + return new SstTestDataGenerator(numBuckets, memTableCapacity); + } + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java new file mode 100644 index 0000000..eb3b32a --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FailingAtomicRenameFileSystem.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.utils; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataInputStreamWrapper; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FSDataOutputStreamWrapper; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.core.fs.Path; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A {@link TestAtomicRenameFileSystem} which may fail when reading and writing. Mainly used to + * check if components deal with failures correctly. + */ +public class FailingAtomicRenameFileSystem extends TestAtomicRenameFileSystem { + + public static final String SCHEME = "fail"; + + private static final AtomicInteger failCounter = new AtomicInteger(); + private static int failPossibility = 1000; + + public static void resetFailCounter(int maxValue) { + failCounter.set(maxValue); + } + + public static void setFailPossibility(int v) { + failPossibility = v; + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return new FailingFSDataInputStreamWrapper(super.open(f, bufferSize)); + } + + @Override + public FSDataInputStream open(Path f) throws IOException { + return new FailingFSDataInputStreamWrapper(super.open(f)); + } + + @Override + public FSDataOutputStream create(Path filePath, FileSystem.WriteMode overwrite) + throws IOException { + return new FailingFSDataOutputStreamWrapper(super.create(filePath, overwrite)); + } + + @Override + public URI getUri() { + return URI.create(SCHEME + ":///"); + } + + /** {@link FileSystemFactory} for {@link FailingAtomicRenameFileSystem}. */ + public static final class FailingAtomicRenameFileSystemFactory implements FileSystemFactory { + + @Override + public String getScheme() { + return SCHEME; + } + + @Override + public FileSystem create(URI uri) throws IOException { + return new FailingAtomicRenameFileSystem(); + } + } + + /** Specific {@link IOException} produced by {@link FailingAtomicRenameFileSystem}. */ + public static final class ArtificialException extends IOException { + + public ArtificialException() { + super("Artificial exception"); + } + } + + private static class FailingFSDataInputStreamWrapper extends FSDataInputStreamWrapper { + + public FailingFSDataInputStreamWrapper(FSDataInputStream inputStream) { + super(inputStream); + } + + @Override + public int read() throws IOException { + if (ThreadLocalRandom.current().nextInt(failPossibility) == 0 + && failCounter.getAndDecrement() > 0) { + throw new ArtificialException(); + } + return super.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (ThreadLocalRandom.current().nextInt(failPossibility) == 0 + && failCounter.getAndDecrement() > 0) { + throw new ArtificialException(); + } + return super.read(b, off, len); + } + } + + private static class FailingFSDataOutputStreamWrapper extends FSDataOutputStreamWrapper { + + public FailingFSDataOutputStreamWrapper(FSDataOutputStream outputStream) { + super(outputStream); + } + + @Override + public void write(int b) throws IOException { + if (ThreadLocalRandom.current().nextInt(failPossibility) == 0) { + throw new ArtificialException(); + } + super.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (ThreadLocalRandom.current().nextInt(failPossibility) == 0) { + throw new ArtificialException(); + } + super.write(b, off, len); + } + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java new file mode 100644 index 0000000..2ae6946 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/FileStorePathFactory.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.utils; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.connector.file.table.FileSystemConnectorOptions; +import org.apache.flink.connector.file.table.RowDataPartitionComputer; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.mergetree.sst.SstPathFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter; +import org.apache.flink.table.utils.PartitionPathUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.UUID; + +/** Factory which produces {@link Path}s for each type of files. */ +public class FileStorePathFactory { + + private final Path root; + private final String uuid; + @Nullable private final RowDataPartitionComputer partitionComputer; + + private int manifestFileCount; + private int manifestListCount; + + public FileStorePathFactory(Path root) { + this(root, null, FileSystemConnectorOptions.PARTITION_DEFAULT_NAME.defaultValue()); + } + + public FileStorePathFactory( + Path root, @Nullable RowType partitionType, String defaultPartValue) { + this.root = root; + this.uuid = UUID.randomUUID().toString(); + + if (partitionType == null) { + this.partitionComputer = null; + } else { + String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]); + this.partitionComputer = + new RowDataPartitionComputer( + defaultPartValue, + partitionColumns, + partitionType.getFields().stream() + .map(f -> LogicalTypeDataTypeConverter.toDataType(f.getType())) + .toArray(DataType[]::new), + partitionColumns); + } + + this.manifestFileCount = 0; + this.manifestListCount = 0; + } + + public Path newManifestFile() { + return new Path(root + "/manifest/manifest-" + uuid + "-" + (manifestFileCount++)); + } + + public Path newManifestList() { + return new Path(root + "/manifest/manifest-list-" + uuid + "-" + (manifestListCount++)); + } + + public Path toManifestFilePath(String manifestFileName) { + return new Path(root + "/manifest/" + manifestFileName); + } + + public Path toManifestListPath(String manifestListName) { + return new Path(root + "/manifest/" + manifestListName); + } + + public Path toSnapshotPath(long id) { + return new Path(root + "/snapshot/snapshot-" + id); + } + + public SstPathFactory createSstPathFactory(@Nullable BinaryRowData partition, int bucket) { + return new SstPathFactory(root, getPartitionString(partition), bucket); + } + + public @Nullable String getPartitionString(@Nullable BinaryRowData partition) { + if (partitionComputer == null) { + return null; + } + return PartitionPathUtils.generatePartitionPath( + partitionComputer.generatePartValues( + Preconditions.checkNotNull( + partition, "Partition row data is null. This is unexpected."))); + } + + @VisibleForTesting + public String uuid() { + return uuid; + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java new file mode 100644 index 0000000..98f4101 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/utils/TestAtomicRenameFileSystem.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.utils; + +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalFileStatus; +import org.apache.flink.core.fs.local.LocalFileSystem; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.nio.file.AccessDeniedException; +import java.nio.file.DirectoryNotEmptyException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; + +/** A modified {@link LocalFileSystem} supporting atomic rename. */ +public class TestAtomicRenameFileSystem extends LocalFileSystem { + + public static final String SCHEME = "test"; + + // the same file system object is cached and shared in the same JVM, + // so we can use java locks to ensure atomic renaming + private final ReentrantLock renameLock; + + public TestAtomicRenameFileSystem() { + this.renameLock = new ReentrantLock(); + } + + @Override + public boolean rename(final Path src, final Path dst) throws IOException { + File srcFile = pathToFile(src); + File dstFile = pathToFile(dst); + File dstParent = dstFile.getParentFile(); + dstParent.mkdirs(); + try { + renameLock.lock(); + Files.move(srcFile.toPath(), dstFile.toPath()); + return true; + } catch (NoSuchFileException + | AccessDeniedException + | DirectoryNotEmptyException + | SecurityException + | FileAlreadyExistsException e) { + return false; + } finally { + renameLock.unlock(); + } + } + + @Override + public FileStatus[] listStatus(final Path f) throws IOException { + // TODO remove this method once FLINK-25453 is fixed + File localf = pathToFile(f); + if (!localf.exists()) { + return null; + } + if (localf.isFile()) { + return new FileStatus[] {new LocalFileStatus(localf, this)}; + } + + final String[] names = localf.list(); + if (names == null) { + return null; + } + List<FileStatus> results = new ArrayList<>(); + for (String name : names) { + try { + results.add(getFileStatus(new Path(f, name))); + } catch (FileNotFoundException e) { + // ignore the files not found since the dir list may have have changed + // since the names[] list was generated. + } + } + + return results.toArray(new FileStatus[0]); + } + + @Override + public URI getUri() { + return URI.create(SCHEME + ":///"); + } + + /** {@link FileSystemFactory} for {@link TestAtomicRenameFileSystem}. */ + public static final class TestAtomicRenameFileSystemFactory implements FileSystemFactory { + + @Override + public String getScheme() { + return SCHEME; + } + + @Override + public FileSystem create(URI uri) throws IOException { + return new TestAtomicRenameFileSystem(); + } + } +} diff --git a/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory new file mode 100644 index 0000000..82e0c2d --- /dev/null +++ b/flink-table-store-core/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.store.file.utils.TestAtomicRenameFileSystem$TestAtomicRenameFileSystemFactory +org.apache.flink.table.store.file.utils.FailingAtomicRenameFileSystem$FailingAtomicRenameFileSystemFactory