This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bbba01714 [core] Make FormatReaderFactory return FileRecordReader to
reduce cast (#4512)
bbba01714 is described below
commit bbba0171423983714c106e4e62a050f757ecdc12
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 13 15:43:16 2024 +0800
[core] Make FormatReaderFactory return FileRecordReader to reduce cast
(#4512)
---
.../bitmap/ApplyBitmapIndexRecordReader.java | 20 ++++-----
.../apache/paimon/format/FormatReaderFactory.java | 3 +-
.../EmptyFileRecordReader.java} | 31 +++++---------
.../apache/paimon/reader/FileRecordIterator.java | 4 +-
.../FileRecordReader.java} | 28 ++++---------
.../deletionvectors/ApplyDeletionVectorReader.java | 20 ++++-----
...RecordReader.java => DataFileRecordReader.java} | 47 +++++++++++-----------
.../paimon/io/KeyValueDataFileRecordReader.java | 12 +++---
.../paimon/io/KeyValueFileReaderFactory.java | 7 ++--
.../apache/paimon/operation/RawFileSplitRead.java | 13 +++---
.../CompactedChangelogFormatReaderFactory.java | 4 +-
.../apache/paimon/format/avro/AvroBulkFormat.java | 8 ++--
.../apache/paimon/format/orc/OrcReaderFactory.java | 8 ++--
.../format/parquet/ParquetReaderFactory.java | 9 ++---
14 files changed, 91 insertions(+), 123 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReader.java
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReader.java
index d5d15095f..3b1207c8b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReader.java
@@ -20,41 +20,35 @@ package org.apache.paimon.fileindex.bitmap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.RecordReader;
import javax.annotation.Nullable;
import java.io.IOException;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/** A {@link RecordReader} which apply {@link BitmapIndexResult} to filter
record. */
-public class ApplyBitmapIndexRecordReader implements RecordReader<InternalRow>
{
+public class ApplyBitmapIndexRecordReader implements
FileRecordReader<InternalRow> {
- private final RecordReader<InternalRow> reader;
+ private final FileRecordReader<InternalRow> reader;
private final BitmapIndexResult fileIndexResult;
public ApplyBitmapIndexRecordReader(
- RecordReader<InternalRow> reader, BitmapIndexResult
fileIndexResult) {
+ FileRecordReader<InternalRow> reader, BitmapIndexResult
fileIndexResult) {
this.reader = reader;
this.fileIndexResult = fileIndexResult;
}
@Nullable
@Override
- public RecordIterator<InternalRow> readBatch() throws IOException {
- RecordIterator<InternalRow> batch = reader.readBatch();
+ public FileRecordIterator<InternalRow> readBatch() throws IOException {
+ FileRecordIterator<InternalRow> batch = reader.readBatch();
if (batch == null) {
return null;
}
- checkArgument(
- batch instanceof FileRecordIterator,
- "There is a bug, RecordIterator in
ApplyBitmapIndexRecordReader must be FileRecordIterator");
-
- return new ApplyBitmapIndexFileRecordIterator(
- (FileRecordIterator<InternalRow>) batch, fileIndexResult);
+ return new ApplyBitmapIndexFileRecordIterator(batch, fileIndexResult);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
index 420d44e0f..d8af3e2fe 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.RecordReader;
import java.io.IOException;
@@ -29,7 +30,7 @@ import java.io.IOException;
/** A factory to create {@link RecordReader} for file. */
public interface FormatReaderFactory {
- RecordReader<InternalRow> createReader(Context context) throws IOException;
+ FileRecordReader<InternalRow> createReader(Context context) throws
IOException;
/** Context for creating reader. */
interface Context {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
b/paimon-common/src/main/java/org/apache/paimon/reader/EmptyFileRecordReader.java
similarity index 58%
copy from
paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
copy to
paimon-common/src/main/java/org/apache/paimon/reader/EmptyFileRecordReader.java
index 420d44e0f..3fa25dce5 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/EmptyFileRecordReader.java
@@ -16,30 +16,21 @@
* limitations under the License.
*/
-package org.apache.paimon.format;
+package org.apache.paimon.reader;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fileindex.FileIndexResult;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.reader.RecordReader;
+import javax.annotation.Nullable;
import java.io.IOException;
-/** A factory to create {@link RecordReader} for file. */
-public interface FormatReaderFactory {
+/** An empty {@link FileRecordReader}. */
+public class EmptyFileRecordReader<T> implements FileRecordReader<T> {
- RecordReader<InternalRow> createReader(Context context) throws IOException;
-
- /** Context for creating reader. */
- interface Context {
-
- FileIO fileIO();
-
- Path filePath();
-
- long fileSize();
-
- FileIndexResult fileIndex();
+ @Nullable
+ @Override
+ public FileRecordIterator<T> readBatch() throws IOException {
+ return null;
}
+
+ @Override
+ public void close() throws IOException {}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
index d22b27053..2d3c85f19 100644
---
a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
@@ -27,10 +27,8 @@ import java.io.IOException;
import java.util.function.Function;
/**
- * Wrap {@link RecordReader.RecordIterator} to support returning the record's
row position and file
+ * A {@link RecordReader.RecordIterator} to support returning the record's row
position and file
* Path.
- *
- * @param <T> The type of the record.
*/
public interface FileRecordIterator<T> extends RecordReader.RecordIterator<T> {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java
similarity index 58%
copy from
paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
copy to
paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java
index 420d44e0f..4d5356edf 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java
@@ -16,30 +16,16 @@
* limitations under the License.
*/
-package org.apache.paimon.format;
+package org.apache.paimon.reader;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fileindex.FileIndexResult;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.reader.RecordReader;
+import javax.annotation.Nullable;
import java.io.IOException;
-/** A factory to create {@link RecordReader} for file. */
-public interface FormatReaderFactory {
+/** A {@link RecordReader} to support returning {@link FileRecordIterator}. */
+public interface FileRecordReader<T> extends RecordReader<T> {
- RecordReader<InternalRow> createReader(Context context) throws IOException;
-
- /** Context for creating reader. */
- interface Context {
-
- FileIO fileIO();
-
- Path filePath();
-
- long fileSize();
-
- FileIndexResult fileIndex();
- }
+ @Override
+ @Nullable
+ FileRecordIterator<T> readBatch() throws IOException;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
index c1dc16a78..2fc292e54 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
@@ -20,23 +20,22 @@ package org.apache.paimon.deletionvectors;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.RecordReader;
import javax.annotation.Nullable;
import java.io.IOException;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
/** A {@link RecordReader} which apply {@link DeletionVector} to filter
record. */
-public class ApplyDeletionVectorReader implements RecordReader<InternalRow> {
+public class ApplyDeletionVectorReader implements
FileRecordReader<InternalRow> {
- private final RecordReader<InternalRow> reader;
+ private final FileRecordReader<InternalRow> reader;
private final DeletionVector deletionVector;
public ApplyDeletionVectorReader(
- RecordReader<InternalRow> reader, DeletionVector deletionVector) {
+ FileRecordReader<InternalRow> reader, DeletionVector
deletionVector) {
this.reader = reader;
this.deletionVector = deletionVector;
}
@@ -51,19 +50,14 @@ public class ApplyDeletionVectorReader implements
RecordReader<InternalRow> {
@Nullable
@Override
- public RecordIterator<InternalRow> readBatch() throws IOException {
- RecordIterator<InternalRow> batch = reader.readBatch();
+ public FileRecordIterator<InternalRow> readBatch() throws IOException {
+ FileRecordIterator<InternalRow> batch = reader.readBatch();
if (batch == null) {
return null;
}
- checkArgument(
- batch instanceof FileRecordIterator,
- "There is a bug, RecordIterator in ApplyDeletionVectorReader
must be FileRecordIterator");
-
- return new ApplyDeletionFileRecordIterator(
- (FileRecordIterator<InternalRow>) batch, deletionVector);
+ return new ApplyDeletionFileRecordIterator(batch, deletionVector);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
similarity index 88%
rename from paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
rename to
paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index 1e12025ba..d2559fe62 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -25,7 +25,8 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.format.FormatReaderFactory;
-import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.ProjectedRow;
@@ -34,17 +35,35 @@ import javax.annotation.Nullable;
import java.io.IOException;
/** Reads {@link InternalRow} from data files. */
-public class FileRecordReader implements RecordReader<InternalRow> {
+public class DataFileRecordReader implements FileRecordReader<InternalRow> {
- private final RecordReader<InternalRow> reader;
+ private final FileRecordReader<InternalRow> reader;
@Nullable private final int[] indexMapping;
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
+ public DataFileRecordReader(
+ FormatReaderFactory readerFactory,
+ FormatReaderFactory.Context context,
+ @Nullable int[] indexMapping,
+ @Nullable CastFieldGetter[] castMapping,
+ @Nullable PartitionInfo partitionInfo)
+ throws IOException {
+ try {
+ this.reader = readerFactory.createReader(context);
+ } catch (Exception e) {
+ FileUtils.checkExists(context.fileIO(), context.filePath());
+ throw e;
+ }
+ this.indexMapping = indexMapping;
+ this.partitionInfo = partitionInfo;
+ this.castMapping = castMapping;
+ }
+
@Nullable
@Override
- public RecordReader.RecordIterator<InternalRow> readBatch() throws
IOException {
- RecordIterator<InternalRow> iterator = reader.readBatch();
+ public FileRecordIterator<InternalRow> readBatch() throws IOException {
+ FileRecordIterator<InternalRow> iterator = reader.readBatch();
if (iterator == null) {
return null;
}
@@ -71,24 +90,6 @@ public class FileRecordReader implements
RecordReader<InternalRow> {
return iterator;
}
- public FileRecordReader(
- FormatReaderFactory readerFactory,
- FormatReaderFactory.Context context,
- @Nullable int[] indexMapping,
- @Nullable CastFieldGetter[] castMapping,
- @Nullable PartitionInfo partitionInfo)
- throws IOException {
- try {
- this.reader = readerFactory.createReader(context);
- } catch (Exception e) {
- FileUtils.checkExists(context.fileIO(), context.filePath());
- throw e;
- }
- this.indexMapping = indexMapping;
- this.partitionInfo = partitionInfo;
- this.castMapping = castMapping;
- }
-
@Override
public void close() throws IOException {
reader.close();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index e44ad79ff..6cf087697 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -21,6 +21,8 @@ package org.apache.paimon.io;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueSerializer;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.RowType;
@@ -29,14 +31,14 @@ import javax.annotation.Nullable;
import java.io.IOException;
/** {@link RecordReader} for reading {@link KeyValue} data files. */
-public class KeyValueDataFileRecordReader implements RecordReader<KeyValue> {
+public class KeyValueDataFileRecordReader implements
FileRecordReader<KeyValue> {
- private final RecordReader<InternalRow> reader;
+ private final FileRecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
public KeyValueDataFileRecordReader(
- RecordReader<InternalRow> reader, RowType keyType, RowType
valueType, int level) {
+ FileRecordReader<InternalRow> reader, RowType keyType, RowType
valueType, int level) {
this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
@@ -44,8 +46,8 @@ public class KeyValueDataFileRecordReader implements
RecordReader<KeyValue> {
@Nullable
@Override
- public RecordIterator<KeyValue> readBatch() throws IOException {
- RecordReader.RecordIterator<InternalRow> iterator = reader.readBatch();
+ public FileRecordIterator<KeyValue> readBatch() throws IOException {
+ FileRecordIterator<InternalRow> iterator = reader.readBatch();
if (iterator == null) {
return null;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index fdbb727e5..7d3acd729 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -32,6 +32,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
@@ -109,7 +110,7 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
return createRecordReader(schemaId, fileName, level, true, null,
fileSize);
}
- private RecordReader<KeyValue> createRecordReader(
+ private FileRecordReader<KeyValue> createRecordReader(
long schemaId,
String fileName,
int level,
@@ -134,8 +135,8 @@ public class KeyValueFileReaderFactory implements
FileReaderFactory<KeyValue> {
: formatSupplier.get();
Path filePath = pathFactory.toPath(fileName);
- RecordReader<InternalRow> fileRecordReader =
- new FileRecordReader(
+ FileRecordReader<InternalRow> fileRecordReader =
+ new DataFileRecordReader(
bulkFormatMapping.getReaderFactory(),
orcPoolSize == null
? new FormatReaderContext(fileIO, filePath,
fileSize)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 4a6fa5b3d..46977457c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -32,12 +32,13 @@ import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataFileRecordReader;
import org.apache.paimon.io.FileIndexEvaluator;
-import org.apache.paimon.io.FileRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.EmptyRecordReader;
+import org.apache.paimon.reader.EmptyFileRecordReader;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.ReaderSupplier;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
@@ -187,7 +188,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
return ConcatRecordReader.create(suppliers);
}
- private RecordReader<InternalRow> createFileReader(
+ private FileRecordReader<InternalRow> createFileReader(
BinaryRow partition,
DataFileMeta file,
DataFilePathFactory dataFilePathFactory,
@@ -204,7 +205,7 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
dataFilePathFactory,
file);
if (!fileIndexResult.remain()) {
- return new EmptyRecordReader<>();
+ return new EmptyFileRecordReader<>();
}
}
@@ -214,8 +215,8 @@ public class RawFileSplitRead implements
SplitRead<InternalRow> {
dataFilePathFactory.toPath(file.fileName()),
file.fileSize(),
fileIndexResult);
- RecordReader<InternalRow> fileRecordReader =
- new FileRecordReader(
+ FileRecordReader<InternalRow> fileRecordReader =
+ new DataFileRecordReader(
bulkFormatMapping.getReaderFactory(),
formatReaderContext,
bulkFormatMapping.getIndexMapping(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
index e17566f30..e0aed448d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
@@ -27,7 +27,7 @@ import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.FileRecordReader;
import java.io.EOFException;
import java.io.IOException;
@@ -60,7 +60,7 @@ public class CompactedChangelogFormatReaderFactory implements
FormatReaderFactor
}
@Override
- public RecordReader<InternalRow> createReader(Context context) throws
IOException {
+ public FileRecordReader<InternalRow> createReader(Context context) throws
IOException {
OffsetReadOnlyFileIO fileIO = new
OffsetReadOnlyFileIO(context.fileIO());
long length = decodePath(context.filePath()).length;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
index 7f3e27518..a06ca9948 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
@@ -22,7 +22,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.IteratorResultIterator;
@@ -49,12 +49,12 @@ public class AvroBulkFormat implements FormatReaderFactory {
}
@Override
- public RecordReader<InternalRow> createReader(FormatReaderFactory.Context
context)
+ public FileRecordReader<InternalRow>
createReader(FormatReaderFactory.Context context)
throws IOException {
return new AvroReader(context.fileIO(), context.filePath(),
context.fileSize());
}
- private class AvroReader implements RecordReader<InternalRow> {
+ private class AvroReader implements FileRecordReader<InternalRow> {
private final FileIO fileIO;
private final DataFileReader<InternalRow> reader;
@@ -90,7 +90,7 @@ public class AvroBulkFormat implements FormatReaderFactory {
@Nullable
@Override
- public RecordIterator<InternalRow> readBatch() throws IOException {
+ public IteratorResultIterator readBatch() throws IOException {
Object ticket;
try {
ticket = pool.pollEntry();
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index dbc5de265..05f3dd785 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -30,6 +30,7 @@ import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem;
import org.apache.paimon.format.orc.filter.OrcFilters;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.RecordReader.RecordIterator;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
@@ -184,7 +185,7 @@ public class OrcReaderFactory implements
FormatReaderFactory {
return orcVectorizedRowBatch;
}
- private RecordIterator<InternalRow> convertAndGetIterator(
+ private ColumnarRowIterator convertAndGetIterator(
VectorizedRowBatch orcBatch, long rowNumber) {
// no copying from the ORC column vectors to the Paimon columns
vectors necessary,
// because they point to the same data arrays internally design
@@ -209,8 +210,7 @@ public class OrcReaderFactory implements
FormatReaderFactory {
* batch is addressed by the starting row number of the batch, plus the
number of records to be
* skipped before.
*/
- private static final class OrcVectorizedReader
- implements org.apache.paimon.reader.RecordReader<InternalRow> {
+ private static final class OrcVectorizedReader implements
FileRecordReader<InternalRow> {
private final RecordReader orcReader;
private final Pool<OrcReaderBatch> pool;
@@ -222,7 +222,7 @@ public class OrcReaderFactory implements
FormatReaderFactory {
@Nullable
@Override
- public RecordIterator<InternalRow> readBatch() throws IOException {
+ public ColumnarRowIterator readBatch() throws IOException {
final OrcReaderBatch batch = getCachedEntry();
final VectorizedRowBatch orcVectorBatch =
batch.orcVectorizedRowBatch();
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 2a62c0bc8..2e792d153 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -31,8 +31,7 @@ import
org.apache.paimon.format.parquet.reader.ParquetTimestampVector;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.reader.RecordReader.RecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
@@ -307,7 +306,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
return new VectorizedColumnBatch(vectors);
}
- private class ParquetReader implements RecordReader<InternalRow> {
+ private class ParquetReader implements FileRecordReader<InternalRow> {
private ParquetFileReader reader;
@@ -360,7 +359,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
@Nullable
@Override
- public RecordIterator<InternalRow> readBatch() throws IOException {
+ public ColumnarRowIterator readBatch() throws IOException {
final ParquetReaderBatch batch = getCachedEntry();
if (!nextBatch(batch)) {
@@ -488,7 +487,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
recycler.recycle(this);
}
- public RecordIterator<InternalRow> convertAndGetIterator(long
rowNumber) {
+ public ColumnarRowIterator convertAndGetIterator(long rowNumber) {
result.reset(rowNumber);
return result;
}