This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 925e8ce  [FLINK-27546] Add append only writer which implements the 
RecordWriter interface
925e8ce is described below

commit 925e8cea9f207d1a6f78ab3b0122d1c2d5bd7023
Author: openinx <open...@gmail.com>
AuthorDate: Wed May 11 16:41:22 2022 +0800

    [FLINK-27546] Add append only writer which implements the RecordWriter 
interface
    
    This closes #115
---
 .../flink/table/store/file/data/DataFileMeta.java  |  28 +++
 .../table/store/file/mergetree/Increment.java      |   7 +
 .../table/store/file/writer/AppendOnlyWriter.java  | 169 ++++++++++++++++
 .../store/file/writer/AppendOnlyWriterTest.java    | 223 +++++++++++++++++++++
 4 files changed, 427 insertions(+)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
index 6de27e6..b520e92 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/DataFileMeta.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.file.data;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.data.binary.BinaryRowDataUtil;
 import org.apache.flink.table.store.file.stats.FieldStats;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.types.logical.BigIntType;
@@ -36,6 +37,13 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
 /** Metadata of a data file. */
 public class DataFileMeta {
 
+    // Append only data files don't have any key columns and meaningful level 
value. it will use
+    // the following dummy values.
+    public static final FieldStats[] EMPTY_KEY_STATS = new FieldStats[0];
+    public static final BinaryRowData EMPTY_MIN_KEY = 
BinaryRowDataUtil.EMPTY_ROW;
+    public static final BinaryRowData EMPTY_MAX_KEY = 
BinaryRowDataUtil.EMPTY_ROW;
+    public static final int DUMMY_LEVEL = 0;
+
     private final String fileName;
     private final long fileSize;
     private final long rowCount;
@@ -49,6 +57,26 @@ public class DataFileMeta {
     private final long maxSequenceNumber;
     private final int level;
 
+    public static DataFileMeta forAppend(
+            String fileName,
+            long fileSize,
+            long rowCount,
+            FieldStats[] rowStats,
+            long minSequenceNumber,
+            long maxSequenceNumber) {
+        return new DataFileMeta(
+                fileName,
+                fileSize,
+                rowCount,
+                EMPTY_MIN_KEY,
+                EMPTY_MAX_KEY,
+                EMPTY_KEY_STATS,
+                rowStats,
+                minSequenceNumber,
+                maxSequenceNumber,
+                DUMMY_LEVEL);
+    }
+
     public DataFileMeta(
             String fileName,
             long fileSize,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
index 88bd96d..965d1d8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Increment.java
@@ -35,12 +35,19 @@ import java.util.Objects;
  */
 public class Increment {
 
+    private static final List<DataFileMeta> EMPTY_COMPACT_BEFORE = 
Collections.emptyList();
+    private static final List<DataFileMeta> EMPTY_COMPACT_AFTER = 
Collections.emptyList();
+
     private final List<DataFileMeta> newFiles;
 
     private final List<DataFileMeta> compactBefore;
 
     private final List<DataFileMeta> compactAfter;
 
+    public static Increment forAppend(List<DataFileMeta> newFiles) {
+        return new Increment(newFiles, EMPTY_COMPACT_BEFORE, 
EMPTY_COMPACT_AFTER);
+    }
+
     public Increment(
             List<DataFileMeta> newFiles,
             List<DataFileMeta> beCompacted,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
new file mode 100644
index 0000000..c22c540
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/AppendOnlyWriter.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.store.file.stats.FieldStatsCollector;
+import org.apache.flink.table.store.file.stats.FileStatsExtractor;
+import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * A {@link RecordWriter} implementation that only accepts records which are 
always insert
+ * operations and don't have any unique keys or sort keys.
+ */
+public class AppendOnlyWriter implements RecordWriter {
+    private final BulkWriter.Factory<RowData> writerFactory;
+    private final RowType writeSchema;
+    private final long targetFileSize;
+    private final DataFilePathFactory pathFactory;
+    private final FileStatsExtractor fileStatsExtractor;
+    private long nextSeqNum;
+
+    private RowRollingWriter writer;
+
+    public AppendOnlyWriter(
+            FileFormat fileFormat,
+            long targetFileSize,
+            RowType writeSchema,
+            long maxWroteSeqNumber,
+            DataFilePathFactory pathFactory) {
+
+        this.writerFactory = fileFormat.createWriterFactory(writeSchema);
+        this.writeSchema = writeSchema;
+        this.targetFileSize = targetFileSize;
+        this.pathFactory = pathFactory;
+        this.fileStatsExtractor = 
fileFormat.createStatsExtractor(writeSchema).orElse(null);
+        this.nextSeqNum = maxWroteSeqNumber + 1;
+
+        this.writer = createRollingRowWriter();
+    }
+
+    @Override
+    public void write(ValueKind valueKind, RowData key, RowData value) throws 
Exception {
+        Preconditions.checkArgument(
+                valueKind == ValueKind.ADD,
+                "Append-only writer cannot accept ValueKind: %s",
+                valueKind);
+
+        writer.write(value);
+    }
+
+    @Override
+    public Increment prepareCommit() throws Exception {
+        List<DataFileMeta> newFiles = new ArrayList<>();
+
+        if (writer != null) {
+            writer.close();
+            newFiles.addAll(writer.result());
+
+            // Reopen the writer to accept further records.
+            writer = createRollingRowWriter();
+        }
+
+        return Increment.forAppend(newFiles);
+    }
+
+    @Override
+    public void sync() throws Exception {
+        // Do nothing here, as this writer don't introduce any async 
compaction thread currently.
+    }
+
+    @Override
+    public List<DataFileMeta> close() throws Exception {
+        sync();
+
+        List<DataFileMeta> result = new ArrayList<>();
+        if (writer != null) {
+            // Abort this writer to clear uncommitted files.
+            writer.abort();
+
+            result.addAll(writer.result());
+            writer = null;
+        }
+
+        return result;
+    }
+
+    private RowRollingWriter createRollingRowWriter() {
+        return new RowRollingWriter(
+                () -> new RowFileWriter(writerFactory, pathFactory.newPath()), 
targetFileSize);
+    }
+
+    private class RowRollingWriter extends RollingFileWriter<RowData, 
DataFileMeta> {
+
+        public RowRollingWriter(Supplier<RowFileWriter> writerFactory, long 
targetFileSize) {
+            super(writerFactory, targetFileSize);
+        }
+    }
+
+    private class RowFileWriter extends BaseFileWriter<RowData, DataFileMeta> {
+        private final long minSeqNum;
+        private final FieldStatsCollector fieldStatsCollector;
+
+        public RowFileWriter(BulkWriter.Factory<RowData> writerFactory, Path 
path) {
+            super(writerFactory, path);
+            this.minSeqNum = nextSeqNum;
+            this.fieldStatsCollector = new FieldStatsCollector(writeSchema);
+        }
+
+        @Override
+        public void write(RowData row) throws IOException {
+            super.write(row);
+
+            nextSeqNum += 1;
+            if (fileStatsExtractor == null) {
+                fieldStatsCollector.collect(row);
+            }
+        }
+
+        @Override
+        protected DataFileMeta createFileMeta(Path path) throws IOException {
+            FieldStats[] stats;
+            if (fileStatsExtractor != null) {
+                stats = fileStatsExtractor.extract(path);
+            } else {
+                stats = fieldStatsCollector.extract();
+            }
+
+            return DataFileMeta.forAppend(
+                    path.getName(),
+                    FileUtils.getFileSize(path),
+                    recordCount(),
+                    stats,
+                    minSeqNum,
+                    Math.max(minSeqNum, nextSeqNum - 1));
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
new file mode 100644
index 0000000..97b8419
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/writer/AppendOnlyWriterTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.table.store.file.writer;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryRowDataUtil;
+import org.apache.flink.table.store.file.ValueKind;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.data.DataFilePathFactory;
+import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.stats.FieldStats;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test the correctness for {@link AppendOnlyWriter}. */
+public class AppendOnlyWriterTest {
+
+    private static final RowData EMPTY_ROW = BinaryRowDataUtil.EMPTY_ROW;
+    private static final RowType SCHEMA =
+            RowType.of(
+                    new LogicalType[] {new IntType(), new VarCharType(), new 
VarCharType()},
+                    new String[] {"id", "name", "dt"});
+
+    @TempDir public java.nio.file.Path tempDir;
+    public DataFilePathFactory pathFactory;
+
+    private static final String AVRO = "avro";
+    private static final String PART = "2022-05-01";
+
+    @BeforeEach
+    public void before() {
+        pathFactory = createPathFactory();
+    }
+
+    @Test
+    public void testEmptyCommits() throws Exception {
+        RecordWriter writer = createWriter(1024 * 1024L, SCHEMA, 0);
+
+        for (int i = 0; i < 3; i++) {
+            writer.sync();
+            Increment inc = writer.prepareCommit();
+
+            assertThat(inc.newFiles()).isEqualTo(Collections.emptyList());
+            assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList());
+            assertThat(inc.compactAfter()).isEqualTo(Collections.emptyList());
+        }
+    }
+
+    @Test
+    public void testSingleWrite() throws Exception {
+        RecordWriter writer = createWriter(1024 * 1024L, SCHEMA, 0);
+        writer.write(ValueKind.ADD, EMPTY_ROW, row(1, "AAA", PART));
+
+        List<DataFileMeta> result = writer.close();
+
+        assertThat(result.size()).isEqualTo(1);
+        DataFileMeta meta = result.get(0);
+        assertThat(meta).isNotNull();
+
+        Path path = pathFactory.toPath(meta.fileName());
+        assertThat(path.getFileSystem().exists(path)).isFalse();
+
+        assertThat(meta.rowCount()).isEqualTo(1L);
+        assertThat(meta.minKey()).isEqualTo(EMPTY_ROW);
+        assertThat(meta.maxKey()).isEqualTo(EMPTY_ROW);
+        assertThat(meta.keyStats()).isEqualTo(DataFileMeta.EMPTY_KEY_STATS);
+
+        FieldStats[] expected =
+                new FieldStats[] {
+                    initStats(1, 1, 0), initStats("AAA", "AAA", 0), 
initStats(PART, PART, 0)
+                };
+        assertThat(meta.valueStats()).isEqualTo(expected);
+
+        assertThat(meta.minSequenceNumber()).isEqualTo(1);
+        assertThat(meta.maxSequenceNumber()).isEqualTo(1);
+        assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL);
+    }
+
+    @Test
+    public void testMultipleCommits() throws Exception {
+        RecordWriter writer = createWriter(1024 * 1024L, SCHEMA, 0);
+
+        // Commit 5 continues txn.
+        for (int txn = 0; txn < 5; txn += 1) {
+
+            // Write the records with range [ txn*100, (txn+1)*100 ).
+            int start = txn * 100;
+            int end = txn * 100 + 100;
+            for (int i = start; i < end; i++) {
+                writer.write(ValueKind.ADD, EMPTY_ROW, row(i, 
String.format("%03d", i), PART));
+            }
+
+            writer.sync();
+            Increment inc = writer.prepareCommit();
+            assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList());
+            assertThat(inc.compactAfter()).isEqualTo(Collections.emptyList());
+
+            assertThat(inc.newFiles().size()).isEqualTo(1);
+            DataFileMeta meta = inc.newFiles().get(0);
+
+            Path path = pathFactory.toPath(meta.fileName());
+            assertThat(path.getFileSystem().exists(path)).isTrue();
+
+            assertThat(meta.rowCount()).isEqualTo(100L);
+            assertThat(meta.minKey()).isEqualTo(EMPTY_ROW);
+            assertThat(meta.maxKey()).isEqualTo(EMPTY_ROW);
+            
assertThat(meta.keyStats()).isEqualTo(DataFileMeta.EMPTY_KEY_STATS);
+
+            FieldStats[] expected =
+                    new FieldStats[] {
+                        initStats(start, end - 1, 0),
+                        initStats(String.format("%03d", start), 
String.format("%03d", end - 1), 0),
+                        initStats(PART, PART, 0)
+                    };
+            assertThat(meta.valueStats()).isEqualTo(expected);
+
+            assertThat(meta.minSequenceNumber()).isEqualTo(start + 1);
+            assertThat(meta.maxSequenceNumber()).isEqualTo(end);
+            assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL);
+        }
+    }
+
+    @Test
+    public void testRollingWrite() throws Exception {
+        // Set a very small target file size, so that we will roll over to a 
new file even if
+        // writing one record.
+        RecordWriter writer = createWriter(10L, SCHEMA, 0);
+
+        for (int i = 0; i < 10; i++) {
+            writer.write(ValueKind.ADD, EMPTY_ROW, row(i, 
String.format("%03d", i), PART));
+        }
+
+        writer.sync();
+        Increment inc = writer.prepareCommit();
+        assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList());
+        assertThat(inc.compactAfter()).isEqualTo(Collections.emptyList());
+
+        assertThat(inc.newFiles().size()).isEqualTo(10);
+
+        int id = 0;
+        for (DataFileMeta meta : inc.newFiles()) {
+            Path path = pathFactory.toPath(meta.fileName());
+            assertThat(path.getFileSystem().exists(path)).isTrue();
+
+            assertThat(meta.rowCount()).isEqualTo(1L);
+            assertThat(meta.minKey()).isEqualTo(EMPTY_ROW);
+            assertThat(meta.maxKey()).isEqualTo(EMPTY_ROW);
+            
assertThat(meta.keyStats()).isEqualTo(DataFileMeta.EMPTY_KEY_STATS);
+
+            FieldStats[] expected =
+                    new FieldStats[] {
+                        initStats(id, id, 0),
+                        initStats(String.format("%03d", id), 
String.format("%03d", id), 0),
+                        initStats(PART, PART, 0)
+                    };
+            assertThat(meta.valueStats()).isEqualTo(expected);
+
+            assertThat(meta.minSequenceNumber()).isEqualTo(id + 1);
+            assertThat(meta.maxSequenceNumber()).isEqualTo(id + 1);
+            assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL);
+
+            id += 1;
+        }
+    }
+
+    private FieldStats initStats(Integer min, Integer max, long nullCount) {
+        return new FieldStats(min, max, nullCount);
+    }
+
+    private FieldStats initStats(String min, String max, long nullCount) {
+        return new FieldStats(StringData.fromString(min), 
StringData.fromString(max), nullCount);
+    }
+
+    private RowData row(int id, String name, String dt) {
+        return GenericRowData.of(id, StringData.fromString(name), 
StringData.fromString(dt));
+    }
+
+    private DataFilePathFactory createPathFactory() {
+        return new DataFilePathFactory(new Path(tempDir.toString()), "dt=" + 
PART, 1);
+    }
+
+    private RecordWriter createWriter(long targetFileSize, RowType 
writeSchema, long maxSeqNum) {
+        FileFormat fileFormat =
+                FileFormat.fromIdentifier(
+                        Thread.currentThread().getContextClassLoader(), AVRO, 
new Configuration());
+
+        return new AppendOnlyWriter(
+                fileFormat, targetFileSize, writeSchema, maxSeqNum, 
pathFactory);
+    }
+}

Reply via email to