This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit 6194ecba528d0d257bc1123ee1b1e3f5666b493e
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Wed Jan 12 19:40:12 2022 +0800

    [FLINK-25628] Introduce RecordReader interface
    
    Co-authored-by: tsreaper <tsreape...@gmail.com>
    Co-authored-by: Jane Chan <55568005+ladyfor...@users.noreply.github.com>
---
 .../flink/table/store/file/utils/RecordReader.java | 64 +++++++++++++++
 .../store/file/utils/RecordReaderIterator.java     | 95 ++++++++++++++++++++++
 2 files changed, 159 insertions(+)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
new file mode 100644
index 0000000..89194e1
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.store.file.KeyValue;
+
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/** The reader that reads the batches of records. */
+public interface RecordReader extends Closeable {
+
+    /**
+     * Reads one batch. The method should return null when reaching the end of 
the input.
+     *
+     * <p>The returned iterator object and any contained objects may be held 
onto by the source for
+     * some time, so it should not be immediately reused by the reader.
+     */
+    @Nullable
+    RecordIterator readBatch() throws IOException;
+
+    /** Closes the reader and should release all resources. */
+    @Override
+    void close() throws IOException;
+
+    /**
+     * An internal iterator interface which presents a more restrictive API 
than {@link Iterator}.
+     */
+    interface RecordIterator {
+
+        /**
+         * Gets the next record from the iterator. Returns null if this 
iterator has no more
+         * elements.
+         */
+        KeyValue next() throws IOException;
+
+        /**
+         * Releases the batch that this iterator iterated over. This is not 
supposed to close the
+         * reader and its resources, but is simply a signal that this iterator 
is not used anymore.
+         * This method can be used as a hook to recycle/reuse heavyweight 
object structures.
+         */
+        void releaseBatch();
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
new file mode 100644
index 0000000..58ad8e2
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordReaderIterator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.utils;
+
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.util.CloseableIterator;
+
+
+import java.io.IOException;
+
+/** Wrap a {@link RecordReader} as an {@link CloseableIterator}. */
+public class RecordReaderIterator implements CloseableIterator<KeyValue> {
+
+    private final RecordReader reader;
+    private RecordReader.RecordIterator currentIterator;
+    private boolean advanced;
+    private KeyValue currentResult;
+
+    public RecordReaderIterator(RecordReader reader) {
+        this.reader = reader;
+        try {
+            this.currentIterator = reader.readBatch();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        this.advanced = false;
+        this.currentResult = null;
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (currentIterator == null) {
+            return false;
+        }
+        advanceIfNeeded();
+        return currentResult != null;
+    }
+
+    @Override
+    public KeyValue next() {
+        if (!hasNext()) {
+            return null;
+        }
+        advanced = false;
+        return currentResult;
+    }
+
+    private void advanceIfNeeded() {
+        if (advanced) {
+            return;
+        }
+        advanced = true;
+
+        try {
+            while (true) {
+                currentResult = currentIterator.next();
+                if (currentResult != null) {
+                    break;
+                } else {
+                    currentIterator.releaseBatch();
+                    currentIterator = reader.readBatch();
+                    if (currentIterator == null) {
+                        break;
+                    }
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (currentIterator != null) {
+            currentIterator.releaseBatch();
+        }
+        reader.close();
+    }
+}

Reply via email to