This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit 6194ecba528d0d257bc1123ee1b1e3f5666b493e Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Wed Jan 12 19:40:12 2022 +0800 [FLINK-25628] Introduce RecordReader interface Co-authored-by: tsreaper <tsreape...@gmail.com> Co-authored-by: Jane Chan <55568005+ladyfor...@users.noreply.github.com> --- .../flink/table/store/file/utils/RecordReader.java | 64 +++++++++++++++ .../store/file/utils/RecordReaderIterator.java | 95 ++++++++++++++++++++++ 2 files changed, 159 insertions(+) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java new file mode 100644 index 0000000..89194e1 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.utils; + +import org.apache.flink.table.store.file.KeyValue; + + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; + +/** The reader that reads the batches of records. */ +public interface RecordReader extends Closeable { + + /** + * Reads one batch. The method should return null when reaching the end of the input. + * + * <p>The returned iterator object and any contained objects may be held onto by the source for + * some time, so it should not be immediately reused by the reader. + */ + @Nullable + RecordIterator readBatch() throws IOException; + + /** Closes the reader and should release all resources. */ + @Override + void close() throws IOException; + + /** + * An internal iterator interface which presents a more restrictive API than {@link Iterator}. + */ + interface RecordIterator { + + /** + * Gets the next record from the iterator. Returns null if this iterator has no more + * elements. + */ + KeyValue next() throws IOException; + + /** + * Releases the batch that this iterator iterated over. This is not supposed to close the + * reader and its resources, but is simply a signal that this iterator is not used anymore. + * This method can be used as a hook to recycle/reuse heavyweight object structures. + */ + void releaseBatch(); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java new file mode 100644 index 0000000..58ad8e2 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.utils; + +import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.util.CloseableIterator; + + +import java.io.IOException; + +/** Wrap a {@link RecordReader} as an {@link CloseableIterator}. */ +public class RecordReaderIterator implements CloseableIterator<KeyValue> { + + private final RecordReader reader; + private RecordReader.RecordIterator currentIterator; + private boolean advanced; + private KeyValue currentResult; + + public RecordReaderIterator(RecordReader reader) { + this.reader = reader; + try { + this.currentIterator = reader.readBatch(); + } catch (IOException e) { + throw new RuntimeException(e); + } + this.advanced = false; + this.currentResult = null; + } + + @Override + public boolean hasNext() { + if (currentIterator == null) { + return false; + } + advanceIfNeeded(); + return currentResult != null; + } + + @Override + public KeyValue next() { + if (!hasNext()) { + return null; + } + advanced = false; + return currentResult; + } + + private void advanceIfNeeded() { + if (advanced) { + return; + } + advanced = true; + + try { + while (true) { + currentResult = currentIterator.next(); + if (currentResult != null) { + break; + } else { + currentIterator.releaseBatch(); + currentIterator = reader.readBatch(); + if (currentIterator == null) { + break; + } + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws Exception { + if (currentIterator != null) { + currentIterator.releaseBatch(); + } + reader.close(); + } +}