This is an automated email from the ASF dual-hosted git repository. czweng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 736f936a [FLINK-29297] Group Table Store file writers into SingleFileWriter and RollingFileWriter 736f936a is described below commit 736f936aa1c8b1ad4f3acae8d52821ed6597f751 Author: tsreaper <tsreape...@gmail.com> AuthorDate: Wed Sep 21 10:25:22 2022 +0800 [FLINK-29297] Group Table Store file writers into SingleFileWriter and RollingFileWriter This closes #295 --- .../source/FileStoreSourceSplitReaderTest.java | 2 +- .../source/TestChangelogDataReadWrite.java | 2 +- .../table/store/file/data/AppendOnlyWriter.java | 135 ++-------------- .../table/store/file/data/DataFileReader.java | 56 +------ .../table/store/file/data/DataFileWriter.java | 179 ++++----------------- .../store/file/{writer => io}/FileWriter.java | 54 ++++--- .../KeyValueDataFileRecordReader.java} | 33 ++-- .../store/file/io/KeyValueDataFileWriter.java | 146 +++++++++++++++++ .../file/{writer => io}/RollingFileWriter.java | 87 ++++++---- .../RowDataFileRecordReader.java} | 14 +- .../table/store/file/io/RowDataFileWriter.java | 79 +++++++++ .../store/file/io/RowDataRollingFileWriter.java | 50 ++++++ .../table/store/file/io/SingleFileWriter.java | 152 +++++++++++++++++ .../file/io/StatsCollectingSingleFileWriter.java | 76 +++++++++ .../table/store/file/manifest/ManifestFile.java | 52 ++---- .../store/file/mergetree/MergeTreeWriter.java | 5 +- .../file/operation/AppendOnlyFileStoreRead.java | 4 +- .../file/operation/AppendOnlyFileStoreWrite.java | 11 +- .../table/store/file/operation/FileStoreWrite.java | 2 +- .../file/operation/KeyValueFileStoreWrite.java | 2 +- .../store/file/{writer => utils}/RecordWriter.java | 13 +- .../table/store/file/writer/BaseFileWriter.java | 118 -------------- .../flink/table/store/file/writer/Metric.java | 48 ------ .../table/store/file/writer/MetricFileWriter.java | 179 --------------------- .../store/table/AppendOnlyFileStoreTable.java | 2 +- .../table/ChangelogValueCountFileStoreTable.java | 2 +- .../table/ChangelogWithKeyFileStoreTable.java | 2 +- .../table/store/table/sink/AbstractTableWrite.java | 2 +- .../table/store/table/sink/MemoryTableWrite.java | 2 +- .../flink/table/store/file/TestFileStore.java | 2 +- .../store/file/data/AppendOnlyWriterTest.java | 12 +- .../flink/table/store/file/data/DataFileTest.java | 11 +- .../store/file/format/FileFormatSuffixTest.java | 7 +- .../table/store/file/mergetree/MergeTreeTest.java | 14 +- 34 files changed, 723 insertions(+), 832 deletions(-) diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java index 25211fe6..049b6c85 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java @@ -30,7 +30,7 @@ import org.apache.flink.table.store.file.KeyValue; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.schema.UpdateSchema; -import org.apache.flink.table.store.file.writer.RecordWriter; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java index 8c7a504b..4145bf54 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java @@ -36,8 +36,8 @@ import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.store.file.utils.SnapshotManager; -import org.apache.flink.table.store.file.writer.RecordWriter; import org.apache.flink.table.store.format.FileFormat; import org.apache.flink.table.store.table.source.KeyValueTableRead; import org.apache.flink.table.store.table.source.TableRead; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java index 1476a287..355467e3 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java @@ -20,33 +20,20 @@ package org.apache.flink.table.store.file.data; import org.apache.flink.api.common.accumulators.LongCounter; -import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.RowData; import org.apache.flink.table.store.file.compact.CompactManager; +import org.apache.flink.table.store.file.io.RowDataRollingFileWriter; import org.apache.flink.table.store.file.mergetree.Increment; -import org.apache.flink.table.store.file.stats.BinaryTableStats; -import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer; -import org.apache.flink.table.store.file.utils.FileUtils; -import org.apache.flink.table.store.file.writer.BaseFileWriter; -import org.apache.flink.table.store.file.writer.FileWriter; -import org.apache.flink.table.store.file.writer.Metric; -import org.apache.flink.table.store.file.writer.MetricFileWriter; -import org.apache.flink.table.store.file.writer.RecordWriter; -import org.apache.flink.table.store.file.writer.RollingFileWriter; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.store.format.FileFormat; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; -import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Preconditions; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.function.Function; -import java.util.function.Supplier; -import static org.apache.flink.table.store.file.data.AppendOnlyWriter.RowRollingWriter.createRollingRowWriter; import static org.apache.flink.table.store.file.data.DataFileMeta.getMaxSequenceNumber; /** @@ -66,7 +53,7 @@ public class AppendOnlyWriter implements RecordWriter<RowData> { private final List<DataFileMeta> compactAfter; private final LongCounter seqNumCounter; - private RowRollingWriter writer; + private RowDataRollingFileWriter writer; public AppendOnlyWriter( long schemaId, @@ -87,14 +74,7 @@ public class AppendOnlyWriter implements RecordWriter<RowData> { this.compactBefore = new ArrayList<>(); this.compactAfter = new ArrayList<>(); this.seqNumCounter = new LongCounter(maxSequenceNumber + 1); - this.writer = - createRollingRowWriter( - schemaId, - fileFormat, - targetFileSize, - writeSchema, - pathFactory, - seqNumCounter); + this.writer = createRollingRowWriter(); } @Override @@ -116,14 +96,7 @@ public class AppendOnlyWriter implements RecordWriter<RowData> { // Reopen the writer to accept further records. seqNumCounter.resetLocal(); seqNumCounter.add(getMaxSequenceNumber(newFiles) + 1); - writer = - createRollingRowWriter( - schemaId, - fileFormat, - targetFileSize, - writeSchema, - pathFactory, - seqNumCounter); + writer = createRollingRowWriter(); } // add new generated files newFiles.forEach(compactManager::addNewFile); @@ -141,21 +114,20 @@ public class AppendOnlyWriter implements RecordWriter<RowData> { } @Override - public List<DataFileMeta> close() throws Exception { + public void close() throws Exception { // cancel compaction so that it does not block job cancelling compactManager.cancelCompaction(); sync(); - List<DataFileMeta> result = new ArrayList<>(); if (writer != null) { - // Abort this writer to clear uncommitted files. writer.abort(); - - result.addAll(writer.result()); writer = null; } + } - return result; + private RowDataRollingFileWriter createRollingRowWriter() { + return new RowDataRollingFileWriter( + schemaId, fileFormat, targetFileSize, writeSchema, pathFactory, seqNumCounter); } private void submitCompaction() throws ExecutionException, InterruptedException { @@ -182,91 +154,4 @@ public class AppendOnlyWriter implements RecordWriter<RowData> { compactAfter.clear(); return increment; } - - /** Rolling file writer for append-only table. */ - public static class RowRollingWriter extends RollingFileWriter<RowData, DataFileMeta> { - - public RowRollingWriter(Supplier<RowFileWriter> writerFactory, long targetFileSize) { - super(writerFactory, targetFileSize); - } - - public static RowRollingWriter createRollingRowWriter( - long schemaId, - FileFormat fileFormat, - long targetFileSize, - RowType writeSchema, - DataFilePathFactory pathFactory, - LongCounter seqNumCounter) { - return new RowRollingWriter( - () -> - new RowFileWriter( - MetricFileWriter.createFactory( - fileFormat.createWriterFactory(writeSchema), - Function.identity(), - writeSchema, - fileFormat - .createStatsExtractor(writeSchema) - .orElse(null)), - pathFactory.newPath(), - writeSchema, - schemaId, - seqNumCounter), - targetFileSize); - } - - public List<DataFileMeta> write(CloseableIterator<RowData> iterator) throws Exception { - try { - super.write(iterator); - super.close(); - return super.result(); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - iterator.close(); - } - } - } - - /** - * A {@link BaseFileWriter} impl with a counter with an initial value to record each row's - * sequence number. - */ - public static class RowFileWriter extends BaseFileWriter<RowData, DataFileMeta> { - - private final FieldStatsArraySerializer statsArraySerializer; - private final long schemaId; - private final LongCounter seqNumCounter; - - public RowFileWriter( - FileWriter.Factory<RowData, Metric> writerFactory, - Path path, - RowType writeSchema, - long schemaId, - LongCounter seqNumCounter) { - super(writerFactory, path); - this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema); - this.schemaId = schemaId; - this.seqNumCounter = seqNumCounter; - } - - @Override - public void write(RowData row) throws IOException { - super.write(row); - seqNumCounter.add(1L); - } - - @Override - protected DataFileMeta createResult(Path path, Metric metric) throws IOException { - BinaryTableStats stats = statsArraySerializer.toBinary(metric.fieldStats()); - - return DataFileMeta.forAppend( - path.getName(), - FileUtils.getFileSize(path), - recordCount(), - stats, - seqNumCounter.getLocalValue() - super.recordCount(), - seqNumCounter.getLocalValue() - 1, - schemaId); - } - } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java index 7a0c6eef..3d578dea 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileReader.java @@ -20,16 +20,13 @@ package org.apache.flink.table.store.file.data; import org.apache.flink.connector.file.src.FileSourceSplit; import org.apache.flink.connector.file.src.reader.BulkFormat; -import org.apache.flink.connector.file.src.util.RecordAndPosition; -import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.store.file.KeyValue; -import org.apache.flink.table.store.file.KeyValueSerializer; +import org.apache.flink.table.store.file.io.KeyValueDataFileRecordReader; import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.utils.FileStorePathFactory; -import org.apache.flink.table.store.file.utils.FileUtils; import org.apache.flink.table.store.file.utils.RecordReader; import org.apache.flink.table.store.format.FileFormat; import org.apache.flink.table.store.utils.Projection; @@ -74,55 +71,8 @@ public class DataFileReader { } public RecordReader<KeyValue> read(String fileName) throws IOException { - return new DataFileRecordReader(pathFactory.toPath(fileName)); - } - - private class DataFileRecordReader implements RecordReader<KeyValue> { - - private final BulkFormat.Reader<RowData> reader; - private final KeyValueSerializer serializer; - - private DataFileRecordReader(Path path) throws IOException { - this.reader = FileUtils.createFormatReader(readerFactory, path); - this.serializer = new KeyValueSerializer(keyType, valueType); - } - - @Nullable - @Override - public RecordIterator<KeyValue> readBatch() throws IOException { - BulkFormat.RecordIterator<RowData> iterator = reader.readBatch(); - return iterator == null ? null : new DataFileRecordIterator(iterator, serializer); - } - - @Override - public void close() throws IOException { - reader.close(); - } - } - - private static class DataFileRecordIterator implements RecordReader.RecordIterator<KeyValue> { - - private final BulkFormat.RecordIterator<RowData> iterator; - private final KeyValueSerializer serializer; - - private DataFileRecordIterator( - BulkFormat.RecordIterator<RowData> iterator, KeyValueSerializer serializer) { - this.iterator = iterator; - this.serializer = serializer; - } - - @Override - public KeyValue next() throws IOException { - RecordAndPosition<RowData> result = iterator.next(); - - // TODO schema evolution - return result == null ? null : serializer.fromRow(result.getRecord()); - } - - @Override - public void releaseBatch() { - iterator.releaseBatch(); - } + return new KeyValueDataFileRecordReader( + readerFactory, pathFactory.toPath(fileName), keyType, valueType); } /** Creates {@link DataFileReader}. */ diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java index 22ce2aa5..febc1cd7 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileWriter.java @@ -23,48 +23,32 @@ import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; -import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.store.file.KeyValue; import org.apache.flink.table.store.file.KeyValueSerializer; -import org.apache.flink.table.store.file.stats.BinaryTableStats; -import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer; +import org.apache.flink.table.store.file.io.KeyValueDataFileWriter; +import org.apache.flink.table.store.file.io.RollingFileWriter; +import org.apache.flink.table.store.file.io.SingleFileWriter; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.FileUtils; -import org.apache.flink.table.store.file.writer.BaseFileWriter; -import org.apache.flink.table.store.file.writer.FileWriter; -import org.apache.flink.table.store.file.writer.Metric; -import org.apache.flink.table.store.file.writer.MetricFileWriter; -import org.apache.flink.table.store.file.writer.RollingFileWriter; -import org.apache.flink.table.store.format.FieldStats; import org.apache.flink.table.store.format.FileFormat; import org.apache.flink.table.store.format.FileStatsExtractor; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.CloseableIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import javax.annotation.Nullable; import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.function.Supplier; /** Writes {@link KeyValue}s into data files. */ public class DataFileWriter { - private static final Logger LOG = LoggerFactory.getLogger(DataFileWriter.class); - private final long schemaId; private final RowType keyType; private final RowType valueType; private final BulkWriter.Factory<RowData> writerFactory; private final FileStatsExtractor fileStatsExtractor; - private final FieldStatsArraySerializer keyStatsConverter; - private final FieldStatsArraySerializer valueStatsConverter; private final DataFilePathFactory pathFactory; private final long suggestedFileSize; @@ -81,8 +65,6 @@ public class DataFileWriter { this.valueType = valueType; this.writerFactory = writerFactory; this.fileStatsExtractor = fileStatsExtractor; - this.keyStatsConverter = new FieldStatsArraySerializer(keyType); - this.valueStatsConverter = new FieldStatsArraySerializer(valueType); this.pathFactory = pathFactory; this.suggestedFileSize = suggestedFileSize; @@ -108,9 +90,18 @@ public class DataFileWriter { /** Write raw {@link KeyValue} iterator into a changelog file. */ public Path writeLevel0Changelog(CloseableIterator<KeyValue> iterator) throws Exception { - FileWriter.Factory<KeyValue, Metric> writerFactory = createFileWriterFactory(); Path changelogPath = pathFactory.newChangelogPath(); - doWrite(writerFactory.create(changelogPath), iterator); + KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType); + SingleFileWriter<KeyValue, Void> writer = + new SingleFileWriter<KeyValue, Void>( + writerFactory, changelogPath, kvSerializer::toRow) { + @Override + public Void result() throws IOException { + return null; + } + }; + writer.write(iterator); + writer.close(); return changelogPath; } @@ -137,21 +128,25 @@ public class DataFileWriter { throws Exception { // Don't roll file for level 0 long suggestedFileSize = level == 0 ? Long.MAX_VALUE : this.suggestedFileSize; - return doWrite(createRollingKvWriter(level, suggestedFileSize), iterator); + RollingFileWriter<KeyValue, DataFileMeta> writer = + new RollingFileWriter<>(() -> createDataFileWriter(level), suggestedFileSize); + writer.write(iterator); + writer.close(); + return writer.result(); } - private <R> R doWrite(FileWriter<KeyValue, R> fileWriter, CloseableIterator<KeyValue> iterator) - throws Exception { - try (FileWriter<KeyValue, R> writer = fileWriter) { - writer.write(iterator); - } catch (Throwable e) { - LOG.warn("Exception occurs when writing data files. Cleaning up.", e); - fileWriter.abort(); - throw e; - } finally { - iterator.close(); - } - return fileWriter.result(); + private KeyValueDataFileWriter createDataFileWriter(int level) { + Path path = pathFactory.newPath(); + KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType); + return new KeyValueDataFileWriter( + writerFactory, + path, + kvSerializer::toRow, + keyType, + valueType, + fileStatsExtractor, + schemaId, + level); } public void delete(DataFileMeta file) { @@ -162,118 +157,6 @@ public class DataFileWriter { FileUtils.deleteOrWarn(pathFactory.toPath(file)); } - private class KvFileWriter extends BaseFileWriter<KeyValue, DataFileMeta> { - private final int level; - private final RowDataSerializer keySerializer; - - private BinaryRowData minKey = null; - private RowData maxKey = null; - private long minSeqNumber = Long.MAX_VALUE; - private long maxSeqNumber = Long.MIN_VALUE; - - public KvFileWriter( - FileWriter.Factory<KeyValue, Metric> writerFactory, Path path, int level) - throws IOException { - super(writerFactory, path); - - this.level = level; - this.keySerializer = new RowDataSerializer(keyType); - } - - @Override - public void write(KeyValue kv) throws IOException { - super.write(kv); - - updateMinKey(kv); - updateMaxKey(kv); - - updateMinSeqNumber(kv); - updateMaxSeqNumber(kv); - - if (LOG.isDebugEnabled()) { - LOG.debug("Write key value " + kv.toString(keyType, valueType)); - } - } - - private void updateMinKey(KeyValue kv) { - if (minKey == null) { - minKey = keySerializer.toBinaryRow(kv.key()).copy(); - } - } - - private void updateMaxKey(KeyValue kv) { - maxKey = kv.key(); - } - - private void updateMinSeqNumber(KeyValue kv) { - minSeqNumber = Math.min(minSeqNumber, kv.sequenceNumber()); - } - - private void updateMaxSeqNumber(KeyValue kv) { - maxSeqNumber = Math.max(maxSeqNumber, kv.sequenceNumber()); - } - - @Override - protected DataFileMeta createResult(Path path, Metric metric) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Closing data file " + path); - } - - FieldStats[] rowStats = metric.fieldStats(); - int numKeyFields = keyType.getFieldCount(); - - FieldStats[] keyFieldStats = Arrays.copyOfRange(rowStats, 0, numKeyFields); - BinaryTableStats keyStats = keyStatsConverter.toBinary(keyFieldStats); - - FieldStats[] valFieldStats = - Arrays.copyOfRange(rowStats, numKeyFields + 2, rowStats.length); - BinaryTableStats valueStats = valueStatsConverter.toBinary(valFieldStats); - - return new DataFileMeta( - path.getName(), - FileUtils.getFileSize(path), - recordCount(), - minKey, - keySerializer.toBinaryRow(maxKey).copy(), - keyStats, - valueStats, - minSeqNumber, - maxSeqNumber, - schemaId, - level); - } - } - - private static class RollingKvWriter extends RollingFileWriter<KeyValue, DataFileMeta> { - - public RollingKvWriter(Supplier<KvFileWriter> writerFactory, long targetFileSize) { - super(writerFactory, targetFileSize); - } - } - - private Supplier<KvFileWriter> createWriterFactory(int level) { - return () -> { - try { - return new KvFileWriter(createFileWriterFactory(), pathFactory.newPath(), level); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }; - } - - private FileWriter.Factory<KeyValue, Metric> createFileWriterFactory() { - KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType); - return MetricFileWriter.createFactory( - writerFactory, - kvSerializer::toRow, - KeyValue.schema(keyType, valueType), - fileStatsExtractor); - } - - private RollingKvWriter createRollingKvWriter(int level, long targetFileSize) { - return new RollingKvWriter(createWriterFactory(level), targetFileSize); - } - /** Creates {@link DataFileWriter}. */ public static class Factory { diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/FileWriter.java similarity index 65% rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/FileWriter.java index 87f7a188..d8feb947 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/FileWriter.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/FileWriter.java @@ -17,13 +17,12 @@ * under the License. */ -package org.apache.flink.table.store.file.writer; +package org.apache.flink.table.store.file.io; -import org.apache.flink.core.fs.Path; +import org.apache.flink.util.CloseableIterator; import java.io.Closeable; import java.io.IOException; -import java.io.Serializable; import java.util.Iterator; /** @@ -37,6 +36,9 @@ public interface FileWriter<T, R> extends Closeable { /** * Add only one record to this file writer. * + * <p>NOTE: If any exception occurs during writing, the writer should clean up useless files for + * the user. + * * @param record to write. * @throws IOException if encounter any IO error. */ @@ -45,18 +47,43 @@ public interface FileWriter<T, R> extends Closeable { /** * Add records from {@link Iterator} to this file writer. * + * <p>NOTE: If any exception occurs during writing, the writer should clean up useless files for + * the user. + * * @param records to write * @throws IOException if encounter any IO error. */ - default void write(Iterator<T> records) throws IOException { + default void write(Iterator<T> records) throws Exception { while (records.hasNext()) { write(records.next()); } } + /** + * Add records from {@link CloseableIterator} to this file writer. + * + * <p>NOTE: If any exception occurs during writing, the writer should clean up useless files for + * the user. + * + * @param records to write + * @throws IOException if encounter any IO error. + */ + default void write(CloseableIterator<T> records) throws Exception { + try { + while (records.hasNext()) { + write(records.next()); + } + } finally { + records.close(); + } + } + /** * Add records from {@link Iterable} to file writer. * + * <p>NOTE: If any exception occurs during writing, the writer should clean up useless files for + * the user. + * * @param records to write. * @throws IOException if encounter any IO error. */ @@ -81,22 +108,13 @@ public interface FileWriter<T, R> extends Closeable { */ long length() throws IOException; - /** Abort to clear orphan file(s) if encounter any error. */ + /** + * Abort to clear orphan file(s) if encounter any error. + * + * <p>NOTE: This implementation must be reentrant. + */ void abort(); /** @return the result for this closed file writer. */ R result() throws IOException; - - /** A factory that creates a {@link FileWriter}. */ - interface Factory<T, R> extends Serializable { - - /** - * Creates a writer that writes to the given stream. - * - * @param path the path to write records. - * @return the file format writer. - * @throws IOException if any IO error was encountered then open the writer. - */ - FileWriter<T, R> create(Path path) throws IOException; - } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileRecordReader.java similarity index 60% copy from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java copy to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileRecordReader.java index 536918f8..cd03c3de 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileRecordReader.java @@ -16,35 +16,44 @@ * limitations under the License. */ -package org.apache.flink.table.store.file.data; +package org.apache.flink.table.store.file.io; import org.apache.flink.connector.file.src.FileSourceSplit; import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.connector.file.src.util.RecordAndPosition; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.KeyValueSerializer; import org.apache.flink.table.store.file.utils.FileUtils; import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.types.logical.RowType; import javax.annotation.Nullable; import java.io.IOException; -/** Reads {@link RowData} from data files. */ -public class AppendOnlyReader implements RecordReader<RowData> { +/** {@link RecordReader} for reading {@link KeyValue} data files. */ +public class KeyValueDataFileRecordReader implements RecordReader<KeyValue> { private final BulkFormat.Reader<RowData> reader; + private final KeyValueSerializer serializer; - public AppendOnlyReader(Path path, BulkFormat<RowData, FileSourceSplit> readerFactory) + public KeyValueDataFileRecordReader( + BulkFormat<RowData, FileSourceSplit> readerFactory, + Path path, + RowType keyType, + RowType valueType) throws IOException { this.reader = FileUtils.createFormatReader(readerFactory, path); + this.serializer = new KeyValueSerializer(keyType, valueType); } @Nullable @Override - public RecordIterator<RowData> readBatch() throws IOException { + public RecordIterator<KeyValue> readBatch() throws IOException { BulkFormat.RecordIterator<RowData> iterator = reader.readBatch(); - return iterator == null ? null : new AppendOnlyRecordIterator(iterator); + return iterator == null ? null : new KeyValueDataFileRecordIterator(iterator, serializer); } @Override @@ -52,20 +61,24 @@ public class AppendOnlyReader implements RecordReader<RowData> { reader.close(); } - private static class AppendOnlyRecordIterator implements RecordReader.RecordIterator<RowData> { + private static class KeyValueDataFileRecordIterator + implements RecordReader.RecordIterator<KeyValue> { private final BulkFormat.RecordIterator<RowData> iterator; + private final KeyValueSerializer serializer; - private AppendOnlyRecordIterator(BulkFormat.RecordIterator<RowData> iterator) { + private KeyValueDataFileRecordIterator( + BulkFormat.RecordIterator<RowData> iterator, KeyValueSerializer serializer) { this.iterator = iterator; + this.serializer = serializer; } @Override - public RowData next() throws IOException { + public KeyValue next() throws IOException { RecordAndPosition<RowData> result = iterator.next(); // TODO schema evolution - return result == null ? null : result.getRecord(); + return result == null ? null : serializer.fromRow(result.getRecord()); } @Override diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileWriter.java new file mode 100644 index 00000000..3aad8089 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/KeyValueDataFileWriter.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.io; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.data.DataFileMeta; +import org.apache.flink.table.store.file.stats.BinaryTableStats; +import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer; +import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.table.store.format.FieldStats; +import org.apache.flink.table.store.format.FileStatsExtractor; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Arrays; +import java.util.function.Function; + +/** + * A {@link StatsCollectingSingleFileWriter} to write data files containing {@link KeyValue}s. Also + * produces {@link DataFileMeta} after writing a file. + */ +public class KeyValueDataFileWriter + extends StatsCollectingSingleFileWriter<KeyValue, DataFileMeta> { + + private static final Logger LOG = LoggerFactory.getLogger(KeyValueDataFileWriter.class); + + private final RowType keyType; + private final RowType valueType; + private final long schemaId; + private final int level; + + private final FieldStatsArraySerializer keyStatsConverter; + private final FieldStatsArraySerializer valueStatsConverter; + private final RowDataSerializer keySerializer; + + private BinaryRowData minKey = null; + private RowData maxKey = null; + private long minSeqNumber = Long.MAX_VALUE; + private long maxSeqNumber = Long.MIN_VALUE; + + public KeyValueDataFileWriter( + BulkWriter.Factory<RowData> factory, + Path path, + Function<KeyValue, RowData> converter, + RowType keyType, + RowType valueType, + @Nullable FileStatsExtractor fileStatsExtractor, + long schemaId, + int level) { + super(factory, path, converter, KeyValue.schema(keyType, valueType), fileStatsExtractor); + + this.keyType = keyType; + this.valueType = valueType; + this.schemaId = schemaId; + this.level = level; + + this.keyStatsConverter = new FieldStatsArraySerializer(keyType); + this.valueStatsConverter = new FieldStatsArraySerializer(valueType); + this.keySerializer = new RowDataSerializer(keyType); + } + + @Override + public void write(KeyValue kv) throws IOException { + super.write(kv); + + updateMinKey(kv); + updateMaxKey(kv); + + updateMinSeqNumber(kv); + updateMaxSeqNumber(kv); + + if (LOG.isDebugEnabled()) { + LOG.debug("Write key value " + kv.toString(keyType, valueType)); + } + } + + private void updateMinKey(KeyValue kv) { + if (minKey == null) { + minKey = keySerializer.toBinaryRow(kv.key()).copy(); + } + } + + private void updateMaxKey(KeyValue kv) { + maxKey = kv.key(); + } + + private void updateMinSeqNumber(KeyValue kv) { + minSeqNumber = Math.min(minSeqNumber, kv.sequenceNumber()); + } + + private void updateMaxSeqNumber(KeyValue kv) { + maxSeqNumber = Math.max(maxSeqNumber, kv.sequenceNumber()); + } + + @Override + public DataFileMeta result() throws IOException { + FieldStats[] rowStats = fieldStats(); + int numKeyFields = keyType.getFieldCount(); + + FieldStats[] keyFieldStats = Arrays.copyOfRange(rowStats, 0, numKeyFields); + BinaryTableStats keyStats = keyStatsConverter.toBinary(keyFieldStats); + + FieldStats[] valFieldStats = + Arrays.copyOfRange(rowStats, numKeyFields + 2, rowStats.length); + BinaryTableStats valueStats = valueStatsConverter.toBinary(valFieldStats); + + return new DataFileMeta( + path.getName(), + FileUtils.getFileSize(path), + recordCount(), + minKey, + keySerializer.toBinaryRow(maxKey).copy(), + keyStats, + valueStats, + minSeqNumber, + maxSeqNumber, + schemaId, + level); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java similarity index 59% rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java index 298d5014..f635ac00 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RollingFileWriter.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RollingFileWriter.java @@ -17,13 +17,15 @@ * under the License. */ -package org.apache.flink.table.store.file.writer; +package org.apache.flink.table.store.file.io; -import org.apache.flink.util.IOUtils; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; @@ -36,36 +38,53 @@ import java.util.function.Supplier; */ public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> { - private final Supplier<? extends FileWriter<T, R>> writerFactory; + private static final Logger LOG = LoggerFactory.getLogger(RollingFileWriter.class); + + private final Supplier<? extends SingleFileWriter<T, R>> writerFactory; private final long targetFileSize; - private final List<FileWriter<T, R>> openedWriters; + private final List<SingleFileWriter<T, R>> openedWriters; private final List<R> results; - private FileWriter<T, R> currentWriter = null; + private SingleFileWriter<T, R> currentWriter = null; private long lengthOfClosedFiles = 0L; private long recordCount = 0; private boolean closed = false; public RollingFileWriter( - Supplier<? extends FileWriter<T, R>> writerFactory, long targetFileSize) { + Supplier<? extends SingleFileWriter<T, R>> writerFactory, long targetFileSize) { this.writerFactory = writerFactory; this.targetFileSize = targetFileSize; this.openedWriters = new ArrayList<>(); this.results = new ArrayList<>(); } + @VisibleForTesting + public long targetFileSize() { + return targetFileSize; + } + @Override public void write(T row) throws IOException { - // Open the current writer if write the first record or roll over happen before. - if (currentWriter == null) { - openCurrentWriter(); - } + try { + // Open the current writer if write the first record or roll over happen before. + if (currentWriter == null) { + openCurrentWriter(); + } - currentWriter.write(row); - recordCount += 1; + currentWriter.write(row); + recordCount += 1; - if (currentWriter.length() >= targetFileSize) { - closeCurrentWriter(); + if (currentWriter.length() >= targetFileSize) { + closeCurrentWriter(); + } + } catch (Throwable e) { + LOG.warn( + "Exception occurs when writing file " + + (currentWriter == null ? null : currentWriter.path()) + + ". Cleaning up.", + e); + abort(); + throw e; } } @@ -74,19 +93,15 @@ public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> { openedWriters.add(currentWriter); } - private void closeCurrentWriter() { - if (currentWriter != null) { - try { - lengthOfClosedFiles += currentWriter.length(); - - currentWriter.close(); - results.add(currentWriter.result()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - - currentWriter = null; + private void closeCurrentWriter() throws IOException { + if (currentWriter == null) { + return; } + + lengthOfClosedFiles += currentWriter.length(); + currentWriter.close(); + results.add(currentWriter.result()); + currentWriter = null; } @Override @@ -106,9 +121,6 @@ public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> { @Override public void abort() { - IOUtils.closeQuietly(this); - - // Abort all those writers. for (FileWriter<T, R> writer : openedWriters) { writer.abort(); } @@ -117,15 +129,24 @@ public class RollingFileWriter<T, R> implements FileWriter<T, List<R>> { @Override public List<R> result() { Preconditions.checkState(closed, "Cannot access the results unless close all writers."); - return results; } @Override public void close() throws IOException { - if (!closed) { - closeCurrentWriter(); + if (closed) { + return; + } + try { + closeCurrentWriter(); + } catch (IOException e) { + LOG.warn( + "Exception occurs when writing file " + currentWriter.path() + ". Cleaning up.", + e); + abort(); + throw e; + } finally { closed = true; } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataFileRecordReader.java similarity index 79% rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataFileRecordReader.java index 536918f8..89d87fea 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyReader.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataFileRecordReader.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.store.file.data; +package org.apache.flink.table.store.file.io; import org.apache.flink.connector.file.src.FileSourceSplit; import org.apache.flink.connector.file.src.reader.BulkFormat; @@ -31,20 +31,20 @@ import javax.annotation.Nullable; import java.io.IOException; /** Reads {@link RowData} from data files. */ -public class AppendOnlyReader implements RecordReader<RowData> { +public class RowDataFileRecordReader implements RecordReader<RowData> { private final BulkFormat.Reader<RowData> reader; - public AppendOnlyReader(Path path, BulkFormat<RowData, FileSourceSplit> readerFactory) + public RowDataFileRecordReader(Path path, BulkFormat<RowData, FileSourceSplit> readerFactory) throws IOException { this.reader = FileUtils.createFormatReader(readerFactory, path); } @Nullable @Override - public RecordIterator<RowData> readBatch() throws IOException { + public RecordReader.RecordIterator<RowData> readBatch() throws IOException { BulkFormat.RecordIterator<RowData> iterator = reader.readBatch(); - return iterator == null ? null : new AppendOnlyRecordIterator(iterator); + return iterator == null ? null : new RowDataFileRecordIterator(iterator); } @Override @@ -52,11 +52,11 @@ public class AppendOnlyReader implements RecordReader<RowData> { reader.close(); } - private static class AppendOnlyRecordIterator implements RecordReader.RecordIterator<RowData> { + private static class RowDataFileRecordIterator implements RecordReader.RecordIterator<RowData> { private final BulkFormat.RecordIterator<RowData> iterator; - private AppendOnlyRecordIterator(BulkFormat.RecordIterator<RowData> iterator) { + private RowDataFileRecordIterator(BulkFormat.RecordIterator<RowData> iterator) { this.iterator = iterator; } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataFileWriter.java new file mode 100644 index 00000000..8983ba43 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataFileWriter.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.io; + +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.data.DataFileMeta; +import org.apache.flink.table.store.file.stats.BinaryTableStats; +import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer; +import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.table.store.format.FileStatsExtractor; +import org.apache.flink.table.types.logical.RowType; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.function.Function; + +/** + * A {@link StatsCollectingSingleFileWriter} to write data files containing {@link RowData}. Also + * produces {@link DataFileMeta} after writing a file. + */ +public class RowDataFileWriter extends StatsCollectingSingleFileWriter<RowData, DataFileMeta> { + + private final long schemaId; + private final LongCounter seqNumCounter; + private final FieldStatsArraySerializer statsArraySerializer; + + public RowDataFileWriter( + BulkWriter.Factory<RowData> factory, + Path path, + RowType writeSchema, + @Nullable FileStatsExtractor fileStatsExtractor, + long schemaId, + LongCounter seqNumCounter) { + super(factory, path, Function.identity(), writeSchema, fileStatsExtractor); + this.schemaId = schemaId; + this.seqNumCounter = seqNumCounter; + this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema); + } + + @Override + public void write(RowData row) throws IOException { + super.write(row); + seqNumCounter.add(1L); + } + + @Override + public DataFileMeta result() throws IOException { + BinaryTableStats stats = statsArraySerializer.toBinary(fieldStats()); + return DataFileMeta.forAppend( + path.getName(), + FileUtils.getFileSize(path), + recordCount(), + stats, + seqNumCounter.getLocalValue() - super.recordCount(), + seqNumCounter.getLocalValue() - 1, + schemaId); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataRollingFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataRollingFileWriter.java new file mode 100644 index 00000000..60376442 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/RowDataRollingFileWriter.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.io; + +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.data.DataFileMeta; +import org.apache.flink.table.store.file.data.DataFilePathFactory; +import org.apache.flink.table.store.format.FileFormat; +import org.apache.flink.table.types.logical.RowType; + +/** {@link RollingFileWriter} for data files containing {@link RowData}. */ +public class RowDataRollingFileWriter extends RollingFileWriter<RowData, DataFileMeta> { + + public RowDataRollingFileWriter( + long schemaId, + FileFormat fileFormat, + long targetFileSize, + RowType writeSchema, + DataFilePathFactory pathFactory, + LongCounter seqNumCounter) { + super( + () -> + new RowDataFileWriter( + fileFormat.createWriterFactory(writeSchema), + pathFactory.newPath(), + writeSchema, + fileFormat.createStatsExtractor(writeSchema).orElse(null), + schemaId, + seqNumCounter), + targetFileSize); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/SingleFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/SingleFileWriter.java new file mode 100644 index 00000000..38402aea --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/SingleFileWriter.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.io; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.util.IOUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.function.Function; + +/** + * A {@link FileWriter} to produce a single file. + * + * @param <T> type of records to write. + * @param <R> type of result to produce after writing a file. + */ +public abstract class SingleFileWriter<T, R> implements FileWriter<T, R> { + + private static final Logger LOG = LoggerFactory.getLogger(SingleFileWriter.class); + + protected final Path path; + private final Function<T, RowData> converter; + + private final BulkWriter<RowData> writer; + private FSDataOutputStream out; + + private long recordCount; + private long length; + protected boolean closed; + + public SingleFileWriter( + BulkWriter.Factory<RowData> factory, Path path, Function<T, RowData> converter) { + this.path = path; + this.converter = converter; + + try { + FileSystem fs = path.getFileSystem(); + out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); + writer = factory.create(out); + } catch (IOException e) { + LOG.warn( + "Failed to open the bulk writer, closing the output stream and throw the error.", + e); + if (out != null) { + abort(); + } + throw new UncheckedIOException(e); + } + + this.recordCount = 0; + this.length = 0; + this.closed = false; + } + + public Path path() { + return path; + } + + @Override + public void write(T record) throws IOException { + writeImpl(record); + } + + protected RowData writeImpl(T record) throws IOException { + if (closed) { + throw new RuntimeException("Writer has already closed!"); + } + + try { + RowData rowData = converter.apply(record); + writer.addElement(rowData); + recordCount++; + return rowData; + } catch (Throwable e) { + LOG.warn("Exception occurs when writing file " + path + ". Cleaning up.", e); + abort(); + throw e; + } + } + + @Override + public long recordCount() { + return recordCount; + } + + @Override + public long length() throws IOException { + if (closed) { + return length; + } else { + return out.getPos(); + } + } + + @Override + public void abort() { + IOUtils.closeQuietly(out); + FileUtils.deleteOrWarn(path); + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Closing file " + path); + } + + try { + writer.flush(); + writer.finish(); + + out.flush(); + length = out.getPos(); + out.close(); + } catch (IOException e) { + LOG.warn("Exception occurs when closing file " + path + ". Cleaning up.", e); + abort(); + throw e; + } finally { + closed = true; + } + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/StatsCollectingSingleFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/StatsCollectingSingleFileWriter.java new file mode 100644 index 00000000..d2539b28 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/io/StatsCollectingSingleFileWriter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.io; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.format.FieldStats; +import org.apache.flink.table.store.format.FieldStatsCollector; +import org.apache.flink.table.store.format.FileStatsExtractor; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.function.Function; + +/** + * A {@link SingleFileWriter} which also produces statistics for each written field. + * + * @param <T> type of records to write. + * @param <R> type of result to produce after writing a file. + */ +public abstract class StatsCollectingSingleFileWriter<T, R> extends SingleFileWriter<T, R> { + + @Nullable private final FileStatsExtractor fileStatsExtractor; + @Nullable private FieldStatsCollector fieldStatsCollector = null; + + public StatsCollectingSingleFileWriter( + BulkWriter.Factory<RowData> factory, + Path path, + Function<T, RowData> converter, + RowType writeSchema, + @Nullable FileStatsExtractor fileStatsExtractor) { + super(factory, path, converter); + this.fileStatsExtractor = fileStatsExtractor; + if (this.fileStatsExtractor == null) { + this.fieldStatsCollector = new FieldStatsCollector(writeSchema); + } + } + + @Override + public void write(T record) throws IOException { + RowData rowData = writeImpl(record); + if (fieldStatsCollector != null) { + fieldStatsCollector.collect(rowData); + } + } + + public FieldStats[] fieldStats() throws IOException { + Preconditions.checkState(closed, "Cannot access metric unless the writer is closed."); + if (fileStatsExtractor != null) { + return fileStatsExtractor.extract(path); + } else { + return fieldStatsCollector.extract(); + } + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java index 3e3c4f35..e600db1d 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/manifest/ManifestFile.java @@ -24,28 +24,20 @@ import org.apache.flink.connector.file.src.FileSourceSplit; import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.io.RollingFileWriter; +import org.apache.flink.table.store.file.io.SingleFileWriter; import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.FileUtils; import org.apache.flink.table.store.file.utils.VersionedObjectSerializer; -import org.apache.flink.table.store.file.writer.BaseFileWriter; -import org.apache.flink.table.store.file.writer.FileWriter; -import org.apache.flink.table.store.file.writer.Metric; -import org.apache.flink.table.store.file.writer.MetricFileWriter; -import org.apache.flink.table.store.file.writer.RollingFileWriter; import org.apache.flink.table.store.format.FieldStatsCollector; import org.apache.flink.table.store.format.FileFormat; -import org.apache.flink.table.store.format.FileStatsExtractor; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; -import java.io.UncheckedIOException; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -58,26 +50,22 @@ import java.util.function.Supplier; */ public class ManifestFile { - private static final Logger LOG = LoggerFactory.getLogger(ManifestFile.class); - private final SchemaManager schemaManager; private final long schemaId; private final RowType partitionType; private final ManifestEntrySerializer serializer; private final BulkFormat<RowData, FileSourceSplit> readerFactory; + private final BulkWriter.Factory<RowData> writerFactory; private final FileStorePathFactory pathFactory; private final long suggestedFileSize; - private final FileWriter.Factory<ManifestEntry, Metric> fileWriterFactory; private ManifestFile( SchemaManager schemaManager, long schemaId, RowType partitionType, - RowType entryType, ManifestEntrySerializer serializer, BulkFormat<RowData, FileSourceSplit> readerFactory, BulkWriter.Factory<RowData> writerFactory, - FileStatsExtractor fileStatsExtractor, FileStorePathFactory pathFactory, long suggestedFileSize) { this.schemaManager = schemaManager; @@ -85,13 +73,9 @@ public class ManifestFile { this.partitionType = partitionType; this.serializer = serializer; this.readerFactory = readerFactory; + this.writerFactory = writerFactory; this.pathFactory = pathFactory; this.suggestedFileSize = suggestedFileSize; - - // Initialize the metric file writer factory to write manifest entry and generate metrics. - this.fileWriterFactory = - MetricFileWriter.createFactory( - writerFactory, serializer::toRow, entryType, fileStatsExtractor); } @VisibleForTesting @@ -132,18 +116,12 @@ public class ManifestFile { * <p>NOTE: This method is atomic. */ public List<ManifestFileMeta> write(List<ManifestEntry> entries) { - ManifestRollingWriter rollingWriter = createManifestRollingWriter(suggestedFileSize); try (ManifestRollingWriter writer = rollingWriter) { writer.write(entries); - } catch (Exception e) { - LOG.warn("Exception occurs when writing manifest files. Cleaning up.", e); - - rollingWriter.abort(); throw new RuntimeException(e); } - return rollingWriter.result(); } @@ -151,16 +129,16 @@ public class ManifestFile { FileUtils.deleteOrWarn(pathFactory.toManifestFilePath(fileName)); } - private class ManifestEntryWriter extends BaseFileWriter<ManifestEntry, ManifestFileMeta> { + private class ManifestEntryWriter extends SingleFileWriter<ManifestEntry, ManifestFileMeta> { private final FieldStatsCollector partitionStatsCollector; private final FieldStatsArraySerializer partitionStatsSerializer; + private long numAddedFiles = 0; private long numDeletedFiles = 0; - ManifestEntryWriter(FileWriter.Factory<ManifestEntry, Metric> writerFactory, Path path) - throws IOException { - super(writerFactory, path); + ManifestEntryWriter(BulkWriter.Factory<RowData> factory, Path path) { + super(factory, path, serializer::toRow); this.partitionStatsCollector = new FieldStatsCollector(partitionType); this.partitionStatsSerializer = new FieldStatsArraySerializer(partitionType); @@ -185,9 +163,7 @@ public class ManifestFile { } @Override - protected ManifestFileMeta createResult(Path path, Metric ignore) throws IOException { - // The input metric will be ignored because it includes all the column's stats, rather - // than the partition stats. + public ManifestFileMeta result() throws IOException { return new ManifestFileMeta( path.getName(), path.getFileSystem().getFileStatus(path).getLen(), @@ -208,13 +184,7 @@ public class ManifestFile { } private Supplier<ManifestEntryWriter> createWriterFactory() { - return () -> { - try { - return new ManifestEntryWriter(fileWriterFactory, pathFactory.newManifestFile()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }; + return () -> new ManifestEntryWriter(writerFactory, pathFactory.newManifestFile()); } private ManifestRollingWriter createManifestRollingWriter(long targetFileSize) { @@ -255,11 +225,9 @@ public class ManifestFile { schemaManager, schemaId, partitionType, - entryType, new ManifestEntrySerializer(), fileFormat.createReaderFactory(entryType), fileFormat.createWriterFactory(entryType), - fileFormat.createStatsExtractor(entryType).orElse(null), pathFactory, suggestedFileSize); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java index b48942f0..1956f96d 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java @@ -29,7 +29,7 @@ import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.data.DataFileWriter; import org.apache.flink.table.store.file.memory.MemoryOwner; import org.apache.flink.table.store.file.mergetree.compact.MergeFunction; -import org.apache.flink.table.store.file.writer.RecordWriter; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.CloseableIterator; @@ -228,7 +228,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { } @Override - public List<DataFileMeta> close() throws Exception { + public void close() throws Exception { // cancel compaction so that it does not block job cancelling compactManager.cancelCompaction(); sync(); @@ -247,6 +247,5 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { } newFiles.clear(); compactAfter.clear(); - return delete; } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java index 7f4f21a6..4b12d2c5 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreRead.java @@ -21,9 +21,9 @@ package org.apache.flink.table.store.file.operation; import org.apache.flink.connector.file.src.FileSourceSplit; import org.apache.flink.connector.file.src.reader.BulkFormat; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.store.file.data.AppendOnlyReader; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.data.DataFilePathFactory; +import org.apache.flink.table.store.file.io.RowDataFileRecordReader; import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader; import org.apache.flink.table.store.file.predicate.Predicate; import org.apache.flink.table.store.file.schema.SchemaManager; @@ -91,7 +91,7 @@ public class AppendOnlyFileStoreRead implements FileStoreRead<RowData> { for (DataFileMeta file : split.files()) { suppliers.add( () -> - new AppendOnlyReader( + new RowDataFileRecordReader( dataFilePathFactory.toPath(file.fileName()), readerFactory)); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java index 8911df14..979462c8 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java @@ -29,10 +29,11 @@ import org.apache.flink.table.store.file.data.AppendOnlyCompactManager; import org.apache.flink.table.store.file.data.AppendOnlyWriter; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.data.DataFilePathFactory; +import org.apache.flink.table.store.file.io.RowDataRollingFileWriter; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReaderIterator; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.store.file.utils.SnapshotManager; -import org.apache.flink.table.store.file.writer.RecordWriter; import org.apache.flink.table.store.format.FileFormat; import org.apache.flink.table.store.table.source.Split; import org.apache.flink.table.types.logical.RowType; @@ -149,17 +150,19 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> { if (toCompact.isEmpty()) { return Collections.emptyList(); } - AppendOnlyWriter.RowRollingWriter rewriter = - AppendOnlyWriter.RowRollingWriter.createRollingRowWriter( + RowDataRollingFileWriter rewriter = + new RowDataRollingFileWriter( schemaId, fileFormat, targetFileSize, rowType, pathFactory.createDataFilePathFactory(partition, bucket), new LongCounter(toCompact.get(0).minSequenceNumber())); - return rewriter.write( + rewriter.write( new RecordReaderIterator<>( read.createReader(new Split(partition, bucket, toCompact, false)))); + rewriter.close(); + return rewriter.result(); }; } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java index 95cf0991..ebd68ba0 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java @@ -21,7 +21,7 @@ package org.apache.flink.table.store.file.operation; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.store.file.compact.CompactResult; import org.apache.flink.table.store.file.data.DataFileMeta; -import org.apache.flink.table.store.file.writer.RecordWriter; +import org.apache.flink.table.store.file.utils.RecordWriter; import javax.annotation.Nullable; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java index 9e430b01..d354dedf 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java @@ -41,8 +41,8 @@ import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction; import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReaderIterator; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.store.file.utils.SnapshotManager; -import org.apache.flink.table.store.file.writer.RecordWriter; import org.apache.flink.table.types.logical.RowType; import javax.annotation.Nullable; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java similarity index 84% rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java index e072a797..41ac970d 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/RecordWriter.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java @@ -16,13 +16,10 @@ * limitations under the License. */ -package org.apache.flink.table.store.file.writer; +package org.apache.flink.table.store.file.utils; -import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.mergetree.Increment; -import java.util.List; - /** * The {@code RecordWriter} is responsible for writing data and handling in-progress files used to * write yet un-staged data. The incremental files ready to commit is returned to the system by the @@ -49,10 +46,6 @@ public interface RecordWriter<T> { */ void sync() throws Exception; - /** - * Close this writer, the call will delete newly generated but not committed files. - * - * @return Deleted files. - */ - List<DataFileMeta> close() throws Exception; + /** Close this writer, the call will delete newly generated but not committed files. */ + void close() throws Exception; } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java deleted file mode 100644 index ab444781..00000000 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/BaseFileWriter.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flink.table.store.file.writer; - -import org.apache.flink.core.fs.Path; -import org.apache.flink.table.store.file.utils.FileUtils; -import org.apache.flink.util.IOUtils; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; - -/** - * The abstracted base file writer implementation for {@link FileWriter}. - * - * @param <T> record data type. - * @param <R> file meta data type. - */ -public abstract class BaseFileWriter<T, R> implements FileWriter<T, R> { - - private final FileWriter.Factory<T, Metric> writerFactory; - private final Path path; - - private FileWriter<T, Metric> writer = null; - private Metric metric = null; - - private boolean closed = false; - - public BaseFileWriter(FileWriter.Factory<T, Metric> writerFactory, Path path) { - this.writerFactory = writerFactory; - this.path = path; - } - - public Path path() { - return path; - } - - private void openCurrentWriter() throws IOException { - this.writer = writerFactory.create(path); - } - - @Override - public void write(T row) throws IOException { - if (writer == null) { - openCurrentWriter(); - } - - writer.write(row); - } - - @Override - public long recordCount() { - if (writer != null) { - return writer.recordCount(); - } else if (metric != null) { - return metric.recordCount(); - } - return 0L; - } - - @Override - public long length() throws IOException { - if (writer != null) { - return writer.length(); - } else if (metric != null) { - return metric.length(); - } - return 0; - } - - protected abstract R createResult(Path path, Metric metric) throws IOException; - - @Override - public void abort() { - IOUtils.closeQuietly(this); - - // Abort to clean the orphan file. - FileUtils.deleteOrWarn(path); - } - - @Override - public R result() throws IOException { - Preconditions.checkState(closed, "Cannot access the file meta unless close this writer."); - Preconditions.checkNotNull(metric, "Metric cannot be null."); - - return createResult(path, metric); - } - - @Override - public void close() throws IOException { - if (!closed) { - if (writer != null) { - writer.close(); - metric = writer.result(); - - writer = null; - } - - closed = true; - } - } -} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java deleted file mode 100644 index b41545c3..00000000 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/Metric.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flink.table.store.file.writer; - -import org.apache.flink.table.store.format.FieldStats; - -/** Metric information to describe the column's max-min values, record count etc. */ -public class Metric { - - private final FieldStats[] fieldStats; - private final long recordCount; - private final long length; - - public Metric(FieldStats[] stats, long recordCount, long length) { - this.fieldStats = stats; - this.recordCount = recordCount; - this.length = length; - } - - public FieldStats[] fieldStats() { - return fieldStats; - } - - public long recordCount() { - return recordCount; - } - - public long length() { - return length; - } -} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java deleted file mode 100644 index 26e26953..00000000 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/MetricFileWriter.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flink.table.store.file.writer; - -import org.apache.flink.api.common.serialization.BulkWriter; -import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.store.file.utils.FileUtils; -import org.apache.flink.table.store.format.FieldStats; -import org.apache.flink.table.store.format.FieldStatsCollector; -import org.apache.flink.table.store.format.FileStatsExtractor; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.util.IOUtils; -import org.apache.flink.util.Preconditions; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.function.Function; - -/** - * An {@link FileWriter} to write generic record and generate {@link Metric} once closing it. - * - * @param <T> generic record type. - */ -public class MetricFileWriter<T> implements FileWriter<T, Metric> { - private static final Logger LOG = LoggerFactory.getLogger(MetricFileWriter.class); - - private final BulkWriter<RowData> writer; - private final Function<T, RowData> converter; - private final FSDataOutputStream out; - private final Path path; - @Nullable private final FileStatsExtractor fileStatsExtractor; - - private FieldStatsCollector fieldStatsCollector = null; - - private long recordCount; - private long length; - private boolean closed = false; - - private MetricFileWriter( - BulkWriter<RowData> writer, - Function<T, RowData> converter, - FSDataOutputStream out, - Path path, - RowType writeSchema, - @Nullable FileStatsExtractor fileStatsExtractor) { - this.writer = writer; - this.converter = converter; - this.out = out; - this.path = path; - - this.fileStatsExtractor = fileStatsExtractor; - if (this.fileStatsExtractor == null) { - this.fieldStatsCollector = new FieldStatsCollector(writeSchema); - } - - this.recordCount = 0L; - this.length = 0L; - } - - @Override - public void write(T record) throws IOException { - RowData rowData = converter.apply(record); - writer.addElement(rowData); - - if (fieldStatsCollector != null) { - fieldStatsCollector.collect(rowData); - } - - recordCount += 1; - } - - @Override - public long recordCount() { - return recordCount; - } - - @Override - public long length() { - try { - return out.getPos(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - @Override - public void abort() { - try { - close(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - - FileUtils.deleteOrWarn(path); - } - - @Override - public Metric result() throws IOException { - Preconditions.checkState(closed, "Cannot access metric unless the writer is closed."); - - FieldStats[] stats; - if (fileStatsExtractor != null) { - stats = fileStatsExtractor.extract(path); - } else { - stats = fieldStatsCollector.extract(); - } - - return new Metric(stats, recordCount, length); - } - - @Override - public void close() throws IOException { - if (!closed) { - if (writer != null) { - writer.flush(); - writer.finish(); - } - - if (out != null) { - out.flush(); - - length = out.getPos(); - out.close(); - } - - closed = true; - } - } - - public static <T> FileWriter.Factory<T, Metric> createFactory( - BulkWriter.Factory<RowData> factory, - Function<T, RowData> converter, - RowType writeSchema, - @Nullable FileStatsExtractor fileStatsExtractor) { - - return path -> { - // Open the output stream. - FileSystem fs = path.getFileSystem(); - FSDataOutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); - - try { - return new MetricFileWriter<>( - factory.create(out), converter, out, path, writeSchema, fileStatsExtractor); - } catch (Throwable e) { - LOG.warn( - "Failed to open the bulk writer, closing the output stream and throw the error.", - e); - - IOUtils.closeQuietly(out); - throw e; - } - }; - } -} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java index 65975cc6..5cc90e27 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java @@ -30,7 +30,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReader; -import org.apache.flink.table.store.file.writer.RecordWriter; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.store.table.sink.AbstractTableWrite; import org.apache.flink.table.store.table.sink.SinkRecord; import org.apache.flink.table.store.table.sink.SinkRecordConverter; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java index 9dea5237..e39216ed 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java @@ -33,7 +33,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReader; -import org.apache.flink.table.store.file.writer.RecordWriter; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.store.table.sink.MemoryTableWrite; import org.apache.flink.table.store.table.sink.SinkRecord; import org.apache.flink.table.store.table.sink.SinkRecordConverter; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java index b72cc634..02e457cd 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java @@ -35,7 +35,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReader; -import org.apache.flink.table.store.file.writer.RecordWriter; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.store.table.sink.MemoryTableWrite; import org.apache.flink.table.store.table.sink.SequenceGenerator; import org.apache.flink.table.store.table.sink.SinkRecord; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java index 50eed358..b5063503 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/AbstractTableWrite.java @@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.store.file.operation.FileStoreWrite; -import org.apache.flink.table.store.file.writer.RecordWriter; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import java.util.ArrayList; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java index 1f49b1b7..06a217d7 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/MemoryTableWrite.java @@ -23,7 +23,7 @@ import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool; import org.apache.flink.table.store.file.memory.MemoryOwner; import org.apache.flink.table.store.file.memory.MemoryPoolFactory; import org.apache.flink.table.store.file.operation.FileStoreWrite; -import org.apache.flink.table.store.file.writer.RecordWriter; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators; diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java index 339f8011..c3f467b0 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java @@ -41,8 +41,8 @@ import org.apache.flink.table.store.file.operation.FileStoreWrite; import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReaderIterator; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.store.file.utils.SnapshotManager; -import org.apache.flink.table.store.file.writer.RecordWriter; import org.apache.flink.table.store.table.source.Split; import org.apache.flink.table.types.logical.RowType; diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java index c450116c..037faeeb 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java @@ -29,7 +29,7 @@ import org.apache.flink.table.data.binary.BinaryRowDataUtil; import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.file.mergetree.Increment; import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer; -import org.apache.flink.table.store.file.writer.RecordWriter; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.store.format.FieldStats; import org.apache.flink.table.store.format.FileFormat; import org.apache.flink.table.types.logical.IntType; @@ -95,15 +95,15 @@ public class AppendOnlyWriterTest { public void testSingleWrite() throws Exception { RecordWriter<RowData> writer = createEmptyWriter(1024 * 1024L); writer.write(row(1, "AAA", PART)); + Increment increment = writer.prepareCommit(true); + writer.close(); - List<DataFileMeta> result = writer.close(); - - assertThat(result.size()).isEqualTo(1); - DataFileMeta meta = result.get(0); + assertThat(increment.newFiles().size()).isEqualTo(1); + DataFileMeta meta = increment.newFiles().get(0); assertThat(meta).isNotNull(); Path path = pathFactory.toPath(meta.fileName()); - assertThat(path.getFileSystem().exists(path)).isFalse(); + assertThat(path.getFileSystem().exists(path)).isTrue(); assertThat(meta.rowCount()).isEqualTo(1L); assertThat(meta.minKey()).isEqualTo(EMPTY_ROW); diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java index 58c1c424..8968db64 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTest.java @@ -122,8 +122,15 @@ public class DataFileTest { try { writer.write(CloseableIterator.fromList(data.content, kv -> {}), 0); } catch (Throwable e) { - assertThat(e) - .isExactlyInstanceOf(FailingAtomicRenameFileSystem.ArtificialException.class); + if (e.getCause() != null) { + assertThat(e) + .hasRootCauseExactlyInstanceOf( + FailingAtomicRenameFileSystem.ArtificialException.class); + } else { + assertThat(e) + .isExactlyInstanceOf( + FailingAtomicRenameFileSystem.ArtificialException.class); + } Path root = new Path(tempDir.toString()); FileSystem fs = root.getFileSystem(); for (FileStatus bucketStatus : fs.listStatus(root)) { diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java index 39ed935e..1e9bf22f 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java @@ -28,6 +28,7 @@ import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.data.DataFilePathFactory; import org.apache.flink.table.store.file.data.DataFileTest; import org.apache.flink.table.store.file.data.DataFileWriter; +import org.apache.flink.table.store.file.mergetree.Increment; import org.apache.flink.table.store.format.FileFormat; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; @@ -39,7 +40,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.util.LinkedList; -import java.util.List; /** test file format suffix. */ public class FileFormatSuffixTest extends DataFileTest { @@ -72,9 +72,10 @@ public class FileFormatSuffixTest extends DataFileTest { dataFilePathFactory); appendOnlyWriter.write( GenericRowData.of(1, StringData.fromString("aaa"), StringData.fromString("1"))); - List<DataFileMeta> result = appendOnlyWriter.close(); + Increment increment = appendOnlyWriter.prepareCommit(true); + appendOnlyWriter.close(); - DataFileMeta meta = result.get(0); + DataFileMeta meta = increment.newFiles().get(0); Assertions.assertTrue(meta.fileName().endsWith(format)); } } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java index 4a82d35f..c95f03d9 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java @@ -43,7 +43,7 @@ import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReader; import org.apache.flink.table.store.file.utils.RecordReaderIterator; -import org.apache.flink.table.store.file.writer.RecordWriter; +import org.apache.flink.table.store.file.utils.RecordWriter; import org.apache.flink.table.store.format.FileFormat; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; @@ -175,16 +175,6 @@ public class MergeTreeTest { assertRecords(expected); } - @Test - public void testClose() throws Exception { - doTestWriteRead(6); - List<DataFileMeta> files = writer.close(); - for (DataFileMeta file : files) { - Path path = dataFileWriter.pathFactory().toPath(file.fileName()); - assertThat(path.getFileSystem().exists(path)).isFalse(); - } - } - @ParameterizedTest @ValueSource(longs = {1, 1024 * 1024}) public void testCloseUpgrade(long targetFileSize) throws Exception { @@ -253,6 +243,8 @@ public class MergeTreeTest { // assert records from increment compacted files assertRecords(expected, compactedFiles, true); + writer.close(); + Path bucketDir = dataFileWriter.pathFactory().toPath("ignore").getParent(); Set<String> files = Arrays.stream(bucketDir.getFileSystem().listStatus(bucketDir))