This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 925e8ce [FLINK-27546] Add append only writer which implements the RecordWriter interface 925e8ce is described below commit 925e8cea9f207d1a6f78ab3b0122d1c2d5bd7023 Author: openinx <open...@gmail.com> AuthorDate: Wed May 11 16:41:22 2022 +0800 [FLINK-27546] Add append only writer which implements the RecordWriter interface This closes #115 --- .../flink/table/store/file/data/DataFileMeta.java | 28 +++ .../table/store/file/mergetree/Increment.java | 7 + .../table/store/file/writer/AppendOnlyWriter.java | 169 ++++++++++++++++ .../store/file/writer/AppendOnlyWriterTest.java | 223 +++++++++++++++++++++ 4 files changed, 427 insertions(+) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java index 6de27e6..b520e92 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java @@ -19,6 +19,7 @@ package org.apache.flink.table.store.file.data; import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.data.binary.BinaryRowDataUtil; import org.apache.flink.table.store.file.stats.FieldStats; import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer; import org.apache.flink.table.types.logical.BigIntType; @@ -36,6 +37,13 @@ import static org.apache.flink.util.Preconditions.checkArgument; /** Metadata of a data file. */ public class DataFileMeta { + // Append only data files don't have any key columns and meaningful level value. it will use + // the following dummy values. + public static final FieldStats[] EMPTY_KEY_STATS = new FieldStats[0]; + public static final BinaryRowData EMPTY_MIN_KEY = BinaryRowDataUtil.EMPTY_ROW; + public static final BinaryRowData EMPTY_MAX_KEY = BinaryRowDataUtil.EMPTY_ROW; + public static final int DUMMY_LEVEL = 0; + private final String fileName; private final long fileSize; private final long rowCount; @@ -49,6 +57,26 @@ public class DataFileMeta { private final long maxSequenceNumber; private final int level; + public static DataFileMeta forAppend( + String fileName, + long fileSize, + long rowCount, + FieldStats[] rowStats, + long minSequenceNumber, + long maxSequenceNumber) { + return new DataFileMeta( + fileName, + fileSize, + rowCount, + EMPTY_MIN_KEY, + EMPTY_MAX_KEY, + EMPTY_KEY_STATS, + rowStats, + minSequenceNumber, + maxSequenceNumber, + DUMMY_LEVEL); + } + public DataFileMeta( String fileName, long fileSize, diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java index 88bd96d..965d1d8 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java @@ -35,12 +35,19 @@ import java.util.Objects; */ public class Increment { + private static final List<DataFileMeta> EMPTY_COMPACT_BEFORE = Collections.emptyList(); + private static final List<DataFileMeta> EMPTY_COMPACT_AFTER = Collections.emptyList(); + private final List<DataFileMeta> newFiles; private final List<DataFileMeta> compactBefore; private final List<DataFileMeta> compactAfter; + public static Increment forAppend(List<DataFileMeta> newFiles) { + return new Increment(newFiles, EMPTY_COMPACT_BEFORE, EMPTY_COMPACT_AFTER); + } + public Increment( List<DataFileMeta> newFiles, List<DataFileMeta> beCompacted, diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java new file mode 100644 index 0000000..c22c540 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.writer; + +import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.ValueKind; +import org.apache.flink.table.store.file.data.DataFileMeta; +import org.apache.flink.table.store.file.data.DataFilePathFactory; +import org.apache.flink.table.store.file.format.FileFormat; +import org.apache.flink.table.store.file.mergetree.Increment; +import org.apache.flink.table.store.file.stats.FieldStats; +import org.apache.flink.table.store.file.stats.FieldStatsCollector; +import org.apache.flink.table.store.file.stats.FileStatsExtractor; +import org.apache.flink.table.store.file.utils.FileUtils; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +/** + * A {@link RecordWriter} implementation that only accepts records which are always insert + * operations and don't have any unique keys or sort keys. + */ +public class AppendOnlyWriter implements RecordWriter { + private final BulkWriter.Factory<RowData> writerFactory; + private final RowType writeSchema; + private final long targetFileSize; + private final DataFilePathFactory pathFactory; + private final FileStatsExtractor fileStatsExtractor; + private long nextSeqNum; + + private RowRollingWriter writer; + + public AppendOnlyWriter( + FileFormat fileFormat, + long targetFileSize, + RowType writeSchema, + long maxWroteSeqNumber, + DataFilePathFactory pathFactory) { + + this.writerFactory = fileFormat.createWriterFactory(writeSchema); + this.writeSchema = writeSchema; + this.targetFileSize = targetFileSize; + this.pathFactory = pathFactory; + this.fileStatsExtractor = fileFormat.createStatsExtractor(writeSchema).orElse(null); + this.nextSeqNum = maxWroteSeqNumber + 1; + + this.writer = createRollingRowWriter(); + } + + @Override + public void write(ValueKind valueKind, RowData key, RowData value) throws Exception { + Preconditions.checkArgument( + valueKind == ValueKind.ADD, + "Append-only writer cannot accept ValueKind: %s", + valueKind); + + writer.write(value); + } + + @Override + public Increment prepareCommit() throws Exception { + List<DataFileMeta> newFiles = new ArrayList<>(); + + if (writer != null) { + writer.close(); + newFiles.addAll(writer.result()); + + // Reopen the writer to accept further records. + writer = createRollingRowWriter(); + } + + return Increment.forAppend(newFiles); + } + + @Override + public void sync() throws Exception { + // Do nothing here, as this writer don't introduce any async compaction thread currently. + } + + @Override + public List<DataFileMeta> close() throws Exception { + sync(); + + List<DataFileMeta> result = new ArrayList<>(); + if (writer != null) { + // Abort this writer to clear uncommitted files. + writer.abort(); + + result.addAll(writer.result()); + writer = null; + } + + return result; + } + + private RowRollingWriter createRollingRowWriter() { + return new RowRollingWriter( + () -> new RowFileWriter(writerFactory, pathFactory.newPath()), targetFileSize); + } + + private class RowRollingWriter extends RollingFileWriter<RowData, DataFileMeta> { + + public RowRollingWriter(Supplier<RowFileWriter> writerFactory, long targetFileSize) { + super(writerFactory, targetFileSize); + } + } + + private class RowFileWriter extends BaseFileWriter<RowData, DataFileMeta> { + private final long minSeqNum; + private final FieldStatsCollector fieldStatsCollector; + + public RowFileWriter(BulkWriter.Factory<RowData> writerFactory, Path path) { + super(writerFactory, path); + this.minSeqNum = nextSeqNum; + this.fieldStatsCollector = new FieldStatsCollector(writeSchema); + } + + @Override + public void write(RowData row) throws IOException { + super.write(row); + + nextSeqNum += 1; + if (fileStatsExtractor == null) { + fieldStatsCollector.collect(row); + } + } + + @Override + protected DataFileMeta createFileMeta(Path path) throws IOException { + FieldStats[] stats; + if (fileStatsExtractor != null) { + stats = fileStatsExtractor.extract(path); + } else { + stats = fieldStatsCollector.extract(); + } + + return DataFileMeta.forAppend( + path.getName(), + FileUtils.getFileSize(path), + recordCount(), + stats, + minSeqNum, + Math.max(minSeqNum, nextSeqNum - 1)); + } + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java new file mode 100644 index 0000000..97b8419 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.table.store.file.writer; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryRowDataUtil; +import org.apache.flink.table.store.file.ValueKind; +import org.apache.flink.table.store.file.data.DataFileMeta; +import org.apache.flink.table.store.file.data.DataFilePathFactory; +import org.apache.flink.table.store.file.format.FileFormat; +import org.apache.flink.table.store.file.mergetree.Increment; +import org.apache.flink.table.store.file.stats.FieldStats; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test the correctness for {@link AppendOnlyWriter}. */ +public class AppendOnlyWriterTest { + + private static final RowData EMPTY_ROW = BinaryRowDataUtil.EMPTY_ROW; + private static final RowType SCHEMA = + RowType.of( + new LogicalType[] {new IntType(), new VarCharType(), new VarCharType()}, + new String[] {"id", "name", "dt"}); + + @TempDir public java.nio.file.Path tempDir; + public DataFilePathFactory pathFactory; + + private static final String AVRO = "avro"; + private static final String PART = "2022-05-01"; + + @BeforeEach + public void before() { + pathFactory = createPathFactory(); + } + + @Test + public void testEmptyCommits() throws Exception { + RecordWriter writer = createWriter(1024 * 1024L, SCHEMA, 0); + + for (int i = 0; i < 3; i++) { + writer.sync(); + Increment inc = writer.prepareCommit(); + + assertThat(inc.newFiles()).isEqualTo(Collections.emptyList()); + assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList()); + assertThat(inc.compactAfter()).isEqualTo(Collections.emptyList()); + } + } + + @Test + public void testSingleWrite() throws Exception { + RecordWriter writer = createWriter(1024 * 1024L, SCHEMA, 0); + writer.write(ValueKind.ADD, EMPTY_ROW, row(1, "AAA", PART)); + + List<DataFileMeta> result = writer.close(); + + assertThat(result.size()).isEqualTo(1); + DataFileMeta meta = result.get(0); + assertThat(meta).isNotNull(); + + Path path = pathFactory.toPath(meta.fileName()); + assertThat(path.getFileSystem().exists(path)).isFalse(); + + assertThat(meta.rowCount()).isEqualTo(1L); + assertThat(meta.minKey()).isEqualTo(EMPTY_ROW); + assertThat(meta.maxKey()).isEqualTo(EMPTY_ROW); + assertThat(meta.keyStats()).isEqualTo(DataFileMeta.EMPTY_KEY_STATS); + + FieldStats[] expected = + new FieldStats[] { + initStats(1, 1, 0), initStats("AAA", "AAA", 0), initStats(PART, PART, 0) + }; + assertThat(meta.valueStats()).isEqualTo(expected); + + assertThat(meta.minSequenceNumber()).isEqualTo(1); + assertThat(meta.maxSequenceNumber()).isEqualTo(1); + assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL); + } + + @Test + public void testMultipleCommits() throws Exception { + RecordWriter writer = createWriter(1024 * 1024L, SCHEMA, 0); + + // Commit 5 continues txn. + for (int txn = 0; txn < 5; txn += 1) { + + // Write the records with range [ txn*100, (txn+1)*100 ). + int start = txn * 100; + int end = txn * 100 + 100; + for (int i = start; i < end; i++) { + writer.write(ValueKind.ADD, EMPTY_ROW, row(i, String.format("%03d", i), PART)); + } + + writer.sync(); + Increment inc = writer.prepareCommit(); + assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList()); + assertThat(inc.compactAfter()).isEqualTo(Collections.emptyList()); + + assertThat(inc.newFiles().size()).isEqualTo(1); + DataFileMeta meta = inc.newFiles().get(0); + + Path path = pathFactory.toPath(meta.fileName()); + assertThat(path.getFileSystem().exists(path)).isTrue(); + + assertThat(meta.rowCount()).isEqualTo(100L); + assertThat(meta.minKey()).isEqualTo(EMPTY_ROW); + assertThat(meta.maxKey()).isEqualTo(EMPTY_ROW); + assertThat(meta.keyStats()).isEqualTo(DataFileMeta.EMPTY_KEY_STATS); + + FieldStats[] expected = + new FieldStats[] { + initStats(start, end - 1, 0), + initStats(String.format("%03d", start), String.format("%03d", end - 1), 0), + initStats(PART, PART, 0) + }; + assertThat(meta.valueStats()).isEqualTo(expected); + + assertThat(meta.minSequenceNumber()).isEqualTo(start + 1); + assertThat(meta.maxSequenceNumber()).isEqualTo(end); + assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL); + } + } + + @Test + public void testRollingWrite() throws Exception { + // Set a very small target file size, so that we will roll over to a new file even if + // writing one record. + RecordWriter writer = createWriter(10L, SCHEMA, 0); + + for (int i = 0; i < 10; i++) { + writer.write(ValueKind.ADD, EMPTY_ROW, row(i, String.format("%03d", i), PART)); + } + + writer.sync(); + Increment inc = writer.prepareCommit(); + assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList()); + assertThat(inc.compactAfter()).isEqualTo(Collections.emptyList()); + + assertThat(inc.newFiles().size()).isEqualTo(10); + + int id = 0; + for (DataFileMeta meta : inc.newFiles()) { + Path path = pathFactory.toPath(meta.fileName()); + assertThat(path.getFileSystem().exists(path)).isTrue(); + + assertThat(meta.rowCount()).isEqualTo(1L); + assertThat(meta.minKey()).isEqualTo(EMPTY_ROW); + assertThat(meta.maxKey()).isEqualTo(EMPTY_ROW); + assertThat(meta.keyStats()).isEqualTo(DataFileMeta.EMPTY_KEY_STATS); + + FieldStats[] expected = + new FieldStats[] { + initStats(id, id, 0), + initStats(String.format("%03d", id), String.format("%03d", id), 0), + initStats(PART, PART, 0) + }; + assertThat(meta.valueStats()).isEqualTo(expected); + + assertThat(meta.minSequenceNumber()).isEqualTo(id + 1); + assertThat(meta.maxSequenceNumber()).isEqualTo(id + 1); + assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL); + + id += 1; + } + } + + private FieldStats initStats(Integer min, Integer max, long nullCount) { + return new FieldStats(min, max, nullCount); + } + + private FieldStats initStats(String min, String max, long nullCount) { + return new FieldStats(StringData.fromString(min), StringData.fromString(max), nullCount); + } + + private RowData row(int id, String name, String dt) { + return GenericRowData.of(id, StringData.fromString(name), StringData.fromString(dt)); + } + + private DataFilePathFactory createPathFactory() { + return new DataFilePathFactory(new Path(tempDir.toString()), "dt=" + PART, 1); + } + + private RecordWriter createWriter(long targetFileSize, RowType writeSchema, long maxSeqNum) { + FileFormat fileFormat = + FileFormat.fromIdentifier( + Thread.currentThread().getContextClassLoader(), AVRO, new Configuration()); + + return new AppendOnlyWriter( + fileFormat, targetFileSize, writeSchema, maxSeqNum, pathFactory); + } +}