This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 610269895727efaa314d54253f5b1acfc3f769f5 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Wed Jan 12 19:44:01 2022 +0800 [FLINK-25628] Introduce ConcatRecordReader Co-authored-by: Jane Chan <55568005+ladyfor...@users.noreply.github.com> Co-authored-by: tsreaper <tsreape...@gmail.com> --- .../file/mergetree/compact/ConcatRecordReader.java | 84 ++++++++++++++++++++++ .../mergetree/compact/ConcatRecordReaderTest.java | 67 +++++++++++++++++ 2 files changed, 151 insertions(+) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java new file mode 100644 index 0000000..e8c6e44 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReader.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +/** + * This reader is to concatenate a list of {@link RecordReader}s and read them sequentially. The + * input list is already sorted by key and sequence number, and the key intervals do not overlap + * each other. + */ +public class ConcatRecordReader implements RecordReader { + + private final Queue<ReaderSupplier> queue; + + private RecordReader current; + + protected ConcatRecordReader(List<ReaderSupplier> readerFactories) { + readerFactories.forEach( + supplier -> + Preconditions.checkNotNull(supplier, "Reader factory must not be null.")); + this.queue = new LinkedList<>(readerFactories); + } + + public static RecordReader create(List<ReaderSupplier> readers) throws IOException { + return readers.size() == 1 ? readers.get(0).get() : new ConcatRecordReader(readers); + } + + @Nullable + @Override + public RecordIterator readBatch() throws IOException { + while (true) { + if (current != null) { + RecordIterator iterator = current.readBatch(); + if (iterator != null) { + return iterator; + } + current.close(); + current = null; + } else if (queue.size() > 0) { + current = queue.poll().get(); + } else { + return null; + } + } + } + + @Override + public void close() throws IOException { + if (current != null) { + current.close(); + } + } + + /** Supplier to get {@link RecordReader}. */ + @FunctionalInterface + public interface ReaderSupplier { + RecordReader get() throws IOException; + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java new file mode 100644 index 0000000..6ed2edf --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/ConcatRecordReaderTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.store.file.utils.RecordReader; +import org.apache.flink.table.store.file.utils.ReusingTestData; +import org.apache.flink.table.store.file.utils.TestReusingRecordReader; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +/** Tests for {@link ConcatRecordReader}. */ +public class ConcatRecordReaderTest extends CombiningRecordReaderTestBase { + + @Override + protected boolean addOnly() { + return false; + } + + @Override + protected List<ReusingTestData> getExpected(List<ReusingTestData> input) { + return input; + } + + @Override + protected RecordReader createRecordReader(List<TestReusingRecordReader> readers) { + return new ConcatRecordReader( + readers.stream() + .map(r -> (ConcatRecordReader.ReaderSupplier) () -> r) + .collect(Collectors.toList())); + } + + @Test + public void testSmallData() throws IOException { + runTest( + parseData( + "1, 1, +, 100 | 3, 2, +, 300 | 5, 3, -, 500 | " + + "7, 4, +, 700 | 9, 20, +, 900", + "", + "12, 6, +, 1200 | 14, 7, +, 1400 | 16, 8, -, 1600 | 18, 9, -, 1800")); + runTest( + parseData( + " 1, 10, +, 100 | 3, 20, +, 300 | 5, 30, -, 500 | " + + " 7, 40, +, 700 | 9, 200, -, 900", + "", + " 12, 60, +, 1200 | 14, 70, -, 1400 | 16, 80, +, 1600 | 18, 90, -, 1800")); + } +}