This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d2ddaa9bf [core] Introduce file-reader-async-threshold to speed up
merging (#2118)
d2ddaa9bf is described below
commit d2ddaa9bfb865a93b12c7a0294cf936071bbc08b
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Oct 13 17:49:03 2023 +0800
[core] Introduce file-reader-async-threshold to speed up merging (#2118)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 10 ++
.../apache/paimon/format/FormatReaderFactory.java | 3 +
.../src/main/java/org/apache/paimon/fs/FileIO.java | 3 +
.../java/org/apache/paimon/KeyValueFileStore.java | 3 +-
.../org/apache/paimon/casting/CastExecutor.java | 3 +
.../apache/paimon/format/FileFormatDiscover.java | 7 +-
.../paimon/io/KeyValueDataFileRecordReader.java | 7 +-
.../paimon/io/KeyValueFileReaderFactory.java | 69 ++++++---
.../apache/paimon/mergetree/MergeTreeReaders.java | 5 +-
.../paimon/operation/KeyValueFileStoreRead.java | 8 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 7 +-
.../paimon/schema/KeyValueFieldsExtractor.java | 4 +
.../org/apache/paimon/schema/SchemaManager.java | 2 +
.../table/ChangelogValueCountFileStoreTable.java | 1 +
.../paimon/table/ChangelogWithKeyTableUtils.java | 1 +
.../org/apache/paimon/utils/AsyncRecordReader.java | 122 ++++++++++++++++
.../org/apache/paimon/utils/BulkFormatMapping.java | 4 +-
.../java/org/apache/paimon/utils/FileUtils.java | 7 +-
.../IOExceptionSupplier.java} | 23 ++-
.../paimon/io/KeyValueFileReadWriteTest.java | 10 +-
.../paimon/mergetree/ContainsLevelsTest.java | 8 +-
.../apache/paimon/mergetree/LookupLevelsTest.java | 8 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 3 +-
.../table/ChangelogWithKeyFileStoreTableTest.java | 38 +++++
.../apache/paimon/utils/AsyncRecordReaderTest.java | 161 +++++++++++++++++++++
.../flink/source/TestChangelogDataReadWrite.java | 3 +-
.../apache/paimon/format/avro/AvroBulkFormat.java | 6 +
.../apache/paimon/format/orc/OrcFileFormat.java | 2 +
.../apache/paimon/format/orc/OrcReaderFactory.java | 10 +-
.../format/parquet/ParquetReaderFactory.java | 6 +
31 files changed, 499 insertions(+), 51 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index ee89b81ee..5e3094e89 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -146,6 +146,12 @@ under the License.
<td>Boolean</td>
<td>Whether only overwrite dynamic partition when overwriting a
partitioned table with dynamic partition columns. Works only when the table has
partition keys.</td>
</tr>
+ <tr>
+ <td><h5>file-reader-async-threshold</h5></td>
+ <td style="word-wrap: break-word;">10 mb</td>
+ <td>MemorySize</td>
+ <td>The threshold for read file async.</td>
+ </tr>
<tr>
<td><h5>file.compression</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 1935e9dd7..bd67a5fa2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -904,6 +904,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"The bytes of types (CHAR, VARCHAR, BINARY,
VARBINARY) devote to the zorder sort.");
+ public static final ConfigOption<MemorySize> FILE_READER_ASYNC_THRESHOLD =
+ key("file-reader-async-threshold")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(10))
+ .withDescription("The threshold for read file async.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -992,6 +998,10 @@ public class CoreOptions implements Serializable {
return options.get(FILE_COMPRESSION);
}
+ public MemorySize fileReaderAsyncThreshold() {
+ return options.get(FILE_READER_ASYNC_THRESHOLD);
+ }
+
public int snapshotNumRetainMin() {
return options.get(SNAPSHOT_NUM_RETAINED_MIN);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
index 7e7855016..b2b179159 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
@@ -30,4 +30,7 @@ import java.io.Serializable;
public interface FormatReaderFactory extends Serializable {
RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws
IOException;
+
+ RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int
poolSize)
+ throws IOException;
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index 535791246..819d70223 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -26,6 +26,8 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.concurrent.ThreadSafe;
+
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -53,6 +55,7 @@ import static org.apache.paimon.fs.FileIOUtils.checkAccess;
* @since 0.4.0
*/
@Public
+@ThreadSafe
public interface FileIO extends Serializable {
Logger LOG = LoggerFactory.getLogger(FileIO.class);
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index a6b22e7c3..ef122ee78 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -116,7 +116,8 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
mfFactory,
FileFormatDiscover.of(options),
pathFactory(),
- keyValueFieldsExtractor);
+ keyValueFieldsExtractor,
+ options);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
index 10b831da3..8f6499dd2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
@@ -18,12 +18,15 @@
package org.apache.paimon.casting;
+import javax.annotation.concurrent.ThreadSafe;
+
/**
* Interface to model a function that performs the casting of a value from one
type to another.
*
* @param <IN> Input internal type
* @param <OUT> Output internal type
*/
+@ThreadSafe
public interface CastExecutor<IN, OUT> {
/** Cast the input value. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
b/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
index d0765efec..f983e2690 100644
--- a/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
+++ b/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
@@ -20,14 +20,17 @@ package org.apache.paimon.format;
import org.apache.paimon.CoreOptions;
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/** A class to discover {@link FileFormat}. */
+@ThreadSafe
public interface FileFormatDiscover {
static FileFormatDiscover of(CoreOptions options) {
- Map<String, FileFormat> formats = new HashMap<>();
+ Map<String, FileFormat> formats = new ConcurrentHashMap<>();
return new FileFormatDiscover() {
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index 8690e3217..ca6fdc89f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -49,10 +49,15 @@ public class KeyValueDataFileRecordReader implements
RecordReader<KeyValue> {
RowType keyType,
RowType valueType,
int level,
+ @Nullable Integer poolSize,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping)
throws IOException {
- this.reader = FileUtils.createFormatReader(fileIO, readerFactory,
path);
+ FileUtils.checkExists(fileIO, path);
+ this.reader =
+ poolSize == null
+ ? readerFactory.createReader(fileIO, path)
+ : readerFactory.createReader(fileIO, path, poolSize);
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.indexMapping = indexMapping;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 71fbbec9a..a2eea014d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -18,6 +18,7 @@
package org.apache.paimon.io;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.format.FileFormatDiscover;
@@ -27,8 +28,8 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.KeyValueFieldsExtractor;
import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.AsyncRecordReader;
import org.apache.paimon.utils.BulkFormatMapping;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Projection;
@@ -40,6 +41,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Supplier;
/** Factory to create {@link RecordReader}s for reading {@link KeyValue}
files. */
public class KeyValueFileReaderFactory {
@@ -51,8 +53,10 @@ public class KeyValueFileReaderFactory {
private final RowType valueType;
private final BulkFormatMapping.BulkFormatMappingBuilder
bulkFormatMappingBuilder;
- private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final DataFilePathFactory pathFactory;
+ private final long asyncThreshold;
+
+ private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private KeyValueFileReaderFactory(
FileIO fileIO,
@@ -61,7 +65,8 @@ public class KeyValueFileReaderFactory {
RowType keyType,
RowType valueType,
BulkFormatMapping.BulkFormatMappingBuilder
bulkFormatMappingBuilder,
- DataFilePathFactory pathFactory) {
+ DataFilePathFactory pathFactory,
+ long asyncThreshold) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
@@ -69,21 +74,41 @@ public class KeyValueFileReaderFactory {
this.valueType = valueType;
this.bulkFormatMappingBuilder = bulkFormatMappingBuilder;
this.pathFactory = pathFactory;
+ this.asyncThreshold = asyncThreshold;
this.bulkFormatMappings = new HashMap<>();
}
- public RecordReader<KeyValue> createRecordReader(long schemaId, String
fileName, int level)
+ public RecordReader<KeyValue> createRecordReader(
+ long schemaId, String fileName, long fileSize, int level) throws
IOException {
+ if (fileSize >= asyncThreshold && fileName.endsWith("orc")) {
+ return new AsyncRecordReader<>(
+ () -> createRecordReader(schemaId, fileName, level, false,
2));
+ }
+ return createRecordReader(schemaId, fileName, level, true, null);
+ }
+
+ private RecordReader<KeyValue> createRecordReader(
+ long schemaId,
+ String fileName,
+ int level,
+ boolean reuseFormat,
+ @Nullable Integer poolSize)
throws IOException {
String formatIdentifier =
DataFilePathFactory.formatIdentifier(fileName);
+
+ Supplier<BulkFormatMapping> formatSupplier =
+ () ->
+ bulkFormatMappingBuilder.build(
+ formatIdentifier,
+ schemaManager.schema(this.schemaId),
+ schemaManager.schema(schemaId));
+
BulkFormatMapping bulkFormatMapping =
- bulkFormatMappings.computeIfAbsent(
- new FormatKey(schemaId, formatIdentifier),
- key -> {
- TableSchema tableSchema =
schemaManager.schema(this.schemaId);
- TableSchema dataSchema =
schemaManager.schema(key.schemaId);
- return bulkFormatMappingBuilder.build(
- formatIdentifier, tableSchema, dataSchema);
- });
+ reuseFormat
+ ? bulkFormatMappings.computeIfAbsent(
+ new FormatKey(schemaId, formatIdentifier),
+ key -> formatSupplier.get())
+ : formatSupplier.get();
return new KeyValueDataFileRecordReader(
fileIO,
bulkFormatMapping.getReaderFactory(),
@@ -91,6 +116,7 @@ public class KeyValueFileReaderFactory {
keyType,
valueType,
level,
+ poolSize,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping());
}
@@ -103,7 +129,8 @@ public class KeyValueFileReaderFactory {
RowType valueType,
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
- KeyValueFieldsExtractor extractor) {
+ KeyValueFieldsExtractor extractor,
+ CoreOptions options) {
return new Builder(
fileIO,
schemaManager,
@@ -112,7 +139,8 @@ public class KeyValueFileReaderFactory {
valueType,
formatDiscover,
pathFactory,
- extractor);
+ extractor,
+ options);
}
/** Builder for {@link KeyValueFileReaderFactory}. */
@@ -126,8 +154,9 @@ public class KeyValueFileReaderFactory {
private final FileFormatDiscover formatDiscover;
private final FileStorePathFactory pathFactory;
private final KeyValueFieldsExtractor extractor;
-
private final int[][] fullKeyProjection;
+ private final CoreOptions options;
+
private int[][] keyProjection;
private int[][] valueProjection;
private RowType projectedKeyType;
@@ -141,7 +170,8 @@ public class KeyValueFileReaderFactory {
RowType valueType,
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
- KeyValueFieldsExtractor extractor) {
+ KeyValueFieldsExtractor extractor,
+ CoreOptions options) {
this.fileIO = fileIO;
this.schemaManager = schemaManager;
this.schemaId = schemaId;
@@ -152,6 +182,7 @@ public class KeyValueFileReaderFactory {
this.extractor = extractor;
this.fullKeyProjection = Projection.range(0,
keyType.getFieldCount()).toNestedIndexes();
+ this.options = options;
this.keyProjection = fullKeyProjection;
this.valueProjection = Projection.range(0,
valueType.getFieldCount()).toNestedIndexes();
applyProjection();
@@ -166,7 +197,8 @@ public class KeyValueFileReaderFactory {
valueType,
formatDiscover,
pathFactory,
- extractor);
+ extractor,
+ options);
}
public Builder withKeyProjection(int[][] projection) {
@@ -205,7 +237,8 @@ public class KeyValueFileReaderFactory {
projectedValueType,
BulkFormatMapping.newBuilder(
formatDiscover, extractor, keyProjection,
valueProjection, filters),
- pathFactory.createDataFilePathFactory(partition, bucket));
+ pathFactory.createDataFilePathFactory(partition, bucket),
+ options.fileReaderAsyncThreshold().getBytes());
}
private void applyProjection() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index aa2bb4a30..3fe28c073 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -86,7 +86,10 @@ public class MergeTreeReaders {
readers.add(
() ->
readerFactory.createRecordReader(
- file.schemaId(), file.fileName(),
file.level()));
+ file.schemaId(),
+ file.fileName(),
+ file.fileSize(),
+ file.level()));
}
return ConcatRecordReader.create(readers);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 0a2a31dcf..d3a3615bc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -95,7 +95,8 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
MergeFunctionFactory<KeyValue> mfFactory,
FileFormatDiscover formatDiscover,
FileStorePathFactory pathFactory,
- KeyValueFieldsExtractor extractor) {
+ KeyValueFieldsExtractor extractor,
+ CoreOptions options) {
this.tableSchema = schemaManager.schema(schemaId);
this.readerFactoryBuilder =
KeyValueFileReaderFactory.builder(
@@ -106,7 +107,8 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
valueType,
formatDiscover,
pathFactory,
- extractor);
+ extractor,
+ options);
this.keyComparator = keyComparator;
this.mfFactory = mfFactory;
this.valueCountMode = tableSchema.trimmedPrimaryKeys().isEmpty();
@@ -261,7 +263,7 @@ public class KeyValueFileStoreRead implements
FileStoreRead<KeyValue> {
// See comments on DataFileMeta#extraFiles.
String fileName =
changelogFile(file).orElse(file.fileName());
return readerFactory.createRecordReader(
- file.schemaId(), fileName, file.level());
+ file.schemaId(), fileName, file.fileSize(),
file.level());
});
}
return ConcatRecordReader.create(suppliers);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 6a27d67d2..6f4735be3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -115,7 +115,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
valueType,
FileFormatDiscover.of(options),
pathFactory,
- extractor);
+ extractor,
+ options);
this.writerFactoryBuilder =
KeyValueFileWriterFactory.builder(
fileIO,
@@ -266,7 +267,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
valueType,
file ->
readerFactory.createRecordReader(
- file.schemaId(), file.fileName(),
file.level()),
+ file.schemaId(), file.fileName(),
file.fileSize(), file.level()),
() -> ioManager.createChannel().getPathFile(),
new HashLookupStoreFactory(
cacheManager,
@@ -287,7 +288,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
keyType,
file ->
readerFactory.createRecordReader(
- file.schemaId(), file.fileName(),
file.level()),
+ file.schemaId(), file.fileName(),
file.fileSize(), file.level()),
() -> ioManager.createChannel().getPathFile(),
new HashLookupStoreFactory(
cacheManager,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/KeyValueFieldsExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/schema/KeyValueFieldsExtractor.java
index 245c4b3a2..1c1d8d7a7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/schema/KeyValueFieldsExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/schema/KeyValueFieldsExtractor.java
@@ -20,11 +20,15 @@ package org.apache.paimon.schema;
import org.apache.paimon.types.DataField;
+import javax.annotation.concurrent.ThreadSafe;
+
import java.io.Serializable;
import java.util.List;
/** Extractor of schema for different tables. */
+@ThreadSafe
public interface KeyValueFieldsExtractor extends Serializable {
+
/**
* Extract key fields from table schema.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 15c4e4b04..42f136c1a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -45,6 +45,7 @@ import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.io.Serializable;
@@ -66,6 +67,7 @@ import static
org.apache.paimon.utils.FileUtils.listVersionedFiles;
import static org.apache.paimon.utils.Preconditions.checkState;
/** Schema Manager to manage schema versions. */
+@ThreadSafe
public class SchemaManager implements Serializable {
private static final String SCHEMA_PREFIX = "schema-";
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index bb7a9a9b4..f7a457f1e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -191,6 +191,7 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
* {@link KeyValueFieldsExtractor} implementation for {@link
ChangelogValueCountFileStoreTable}.
*/
static class ValueCountTableKeyValueFieldsExtractor implements
KeyValueFieldsExtractor {
+
private static final long serialVersionUID = 1L;
static final ValueCountTableKeyValueFieldsExtractor EXTRACTOR =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
index 36770365f..0ae8cc3cd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
@@ -83,6 +83,7 @@ public class ChangelogWithKeyTableUtils {
}
static class ChangelogWithKeyKeyValueFieldsExtractor implements
KeyValueFieldsExtractor {
+
private static final long serialVersionUID = 1L;
static final ChangelogWithKeyKeyValueFieldsExtractor EXTRACTOR =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java
new file mode 100644
index 000000000..123b02453
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.reader.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/** A {@link RecordReader} to use ASYNC_EXECUTOR to read records async. */
+public class AsyncRecordReader<T> implements RecordReader<T> {
+
+ private static final ExecutorService ASYNC_EXECUTOR =
+ Executors.newCachedThreadPool(new
ExecutorThreadFactory("paimon-reader-async-thread"));
+
+ private final BlockingQueue<Element> queue;
+ private final Future<Void> future;
+ private final ClassLoader classLoader;
+
+ private boolean isEnd = false;
+
+ public AsyncRecordReader(IOExceptionSupplier<RecordReader<T>> supplier) {
+ this.queue = new LinkedBlockingQueue<>();
+ this.future = ASYNC_EXECUTOR.submit(() -> asyncRead(supplier));
+ this.classLoader = Thread.currentThread().getContextClassLoader();
+ }
+
+ private Void asyncRead(IOExceptionSupplier<RecordReader<T>> supplier)
throws IOException {
+ // set classloader, otherwise, its classloader belongs to its creator.
It is possible that
+ // its creator's classloader has already exited, which will cause
subsequent reads to report
+ // exceptions
+ Thread.currentThread().setContextClassLoader(classLoader);
+
+ try (RecordReader<T> reader = supplier.get()) {
+ while (true) {
+ RecordIterator<T> batch = reader.readBatch();
+ if (batch == null) {
+ queue.add(new Element(true, null));
+ return null;
+ }
+
+ queue.add(new Element(false, batch));
+ }
+ }
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<T> readBatch() throws IOException {
+ if (isEnd) {
+ return null;
+ }
+
+ try {
+ Element element;
+ do {
+ element = queue.poll(2, TimeUnit.SECONDS);
+ checkException();
+ } while (element == null);
+
+ if (element.isEnd) {
+ isEnd = true;
+ return null;
+ }
+
+ return element.batch;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ }
+ }
+
+ private void checkException() throws IOException, InterruptedException {
+ if (future.isDone()) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ throw new IOException(e.getCause());
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ future.cancel(true);
+ }
+
+ private class Element {
+
+ private final boolean isEnd;
+ private final RecordIterator<T> batch;
+
+ private Element(boolean isEnd, RecordIterator<T> batch) {
+ this.isEnd = isEnd;
+ this.batch = batch;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java
b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java
index 3b6488c6c..e1691381a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java
@@ -36,6 +36,7 @@ import java.util.List;
/** Class with index mapping and bulk format. */
public class BulkFormatMapping {
+
@Nullable private final int[] indexMapping;
@Nullable private final CastFieldGetter[] castMapping;
private final FormatReaderFactory bulkFormat;
@@ -75,6 +76,7 @@ public class BulkFormatMapping {
/** Builder to build {@link BulkFormatMapping}. */
public static class BulkFormatMappingBuilder {
+
private final FileFormatDiscover formatDiscover;
private final KeyValueFieldsExtractor extractor;
private final int[][] keyProjection;
@@ -117,7 +119,7 @@ public class BulkFormatMapping {
int[][] dataProjection =
KeyValue.project(dataKeyProjection, dataValueProjection,
dataKeyFields.size());
- /**
+ /*
* We need to create index mapping on projection instead of key
and value separately
* here, for example
*
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
index e41ba0241..faa27afa4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
@@ -112,8 +112,7 @@ public class FileUtils {
.filter(status ->
status.getPath().getName().startsWith(prefix));
}
- public static RecordReader<InternalRow> createFormatReader(
- FileIO fileIO, FormatReaderFactory format, Path file) throws
IOException {
+ public static void checkExists(FileIO fileIO, Path file) throws
IOException {
if (!fileIO.exists(file)) {
throw new FileNotFoundException(
String.format(
@@ -124,7 +123,11 @@ public class FileUtils {
+ " (For example, increasing
parallelism).",
file));
}
+ }
+ public static RecordReader<InternalRow> createFormatReader(
+ FileIO fileIO, FormatReaderFactory format, Path file) throws
IOException {
+ checkExists(fileIO, file);
return format.createReader(fileIO, file);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
b/paimon-core/src/main/java/org/apache/paimon/utils/IOExceptionSupplier.java
similarity index 64%
copy from paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
copy to
paimon-core/src/main/java/org/apache/paimon/utils/IOExceptionSupplier.java
index 10b831da3..347a8991a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/IOExceptionSupplier.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,16 +16,23 @@
* limitations under the License.
*/
-package org.apache.paimon.casting;
+package org.apache.paimon.utils;
+
+import java.io.IOException;
+import java.util.function.Supplier;
/**
- * Interface to model a function that performs the casting of a value from one
type to another.
+ * A {@link Supplier} throws {@link IOException}.
*
- * @param <IN> Input internal type
- * @param <OUT> Output internal type
+ * @param <T> the type of results supplied by this supplier
*/
-public interface CastExecutor<IN, OUT> {
+@FunctionalInterface
+public interface IOExceptionSupplier<T> {
- /** Cast the input value. */
- OUT cast(IN value);
+ /**
+ * Gets a result.
+ *
+ * @return a result
+ */
+ T get() throws IOException;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index dbfcbcd0b..aab4a5f98 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -75,7 +75,7 @@ public class KeyValueFileReadWriteTest {
public void testReadNonExistentFile() {
KeyValueFileReaderFactory readerFactory =
createReaderFactory(tempDir.toString(), "avro", null, null);
- assertThatThrownBy(() -> readerFactory.createRecordReader(0,
"dummy_file.avro", 0))
+ assertThatThrownBy(() -> readerFactory.createRecordReader(0,
"dummy_file.avro", 1, 0))
.hasMessageContaining(
"you can configure 'snapshot.time-retained' option
with a larger value.");
}
@@ -285,7 +285,8 @@ public class KeyValueFileReadWriteTest {
DEFAULT_ROW_TYPE,
ignore -> new FlushingFileFormat(format),
pathFactory,
- new
TestKeyValueGenerator.TestKeyValueFieldsExtractor());
+ new
TestKeyValueGenerator.TestKeyValueFieldsExtractor(),
+ new CoreOptions(new HashMap<>()));
if (keyProjection != null) {
builder.withKeyProjection(keyProjection);
}
@@ -310,7 +311,10 @@ public class KeyValueFileReadWriteTest {
CloseableIterator<KeyValue> actualKvsIterator =
new RecordReaderIterator<>(
readerFactory.createRecordReader(
- meta.schemaId(), meta.fileName(),
meta.level()));
+ meta.schemaId(),
+ meta.fileName(),
+ meta.fileSize(),
+ meta.level()));
while (actualKvsIterator.hasNext()) {
assertThat(expectedIterator.hasNext()).isTrue();
KeyValue actualKv = actualKvsIterator.next();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 424fe9315..28c86b754 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -180,7 +180,10 @@ public class ContainsLevelsTest {
levels,
comparator,
keyType,
- file -> createReaderFactory().createRecordReader(0,
file.fileName(), file.level()),
+ file ->
+ createReaderFactory()
+ .createRecordReader(
+ 0, file.fileName(), file.fileSize(),
file.level()),
() -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(new CacheManager(2048,
MemorySize.ofMebiBytes(1)), 0.75),
Duration.ofHours(1),
@@ -239,7 +242,8 @@ public class ContainsLevelsTest {
public List<DataField> valueFields(TableSchema
schema) {
return schema.fields();
}
- });
+ },
+ new CoreOptions(new HashMap<>()));
return builder.build(BinaryRow.EMPTY_ROW, 0);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index ef77b75e0..35b847bcf 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -204,7 +204,10 @@ public class LookupLevelsTest {
comparator,
keyType,
rowType,
- file -> createReaderFactory().createRecordReader(0,
file.fileName(), file.level()),
+ file ->
+ createReaderFactory()
+ .createRecordReader(
+ 0, file.fileName(), file.fileSize(),
file.level()),
() -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
new HashLookupStoreFactory(new CacheManager(2048,
MemorySize.ofMebiBytes(1)), 0.75),
Duration.ofHours(1),
@@ -263,7 +266,8 @@ public class LookupLevelsTest {
public List<DataField> valueFields(TableSchema
schema) {
return schema.fields();
}
- });
+ },
+ new CoreOptions(new HashMap<>()));
return builder.build(BinaryRow.EMPTY_ROW, 0);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 50743892b..718d8714a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -168,7 +168,8 @@ public abstract class MergeTreeTestBase {
"v",
new
org.apache.paimon.types.IntType(false)));
}
- });
+ },
+ new CoreOptions(new HashMap<>()));
readerFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0);
compactReaderFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW,
0);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index ae5e6c7ef..02a8ef6f3 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -66,6 +66,7 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -110,6 +111,43 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
+ " "
+ COMPATIBILITY_BATCH_ROW_TO_STRING.apply(rowData);
+ @Test
+ public void testAsyncReader() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+ table =
+ table.copy(
+ Collections.singletonMap(
+ CoreOptions.FILE_READER_ASYNC_THRESHOLD.key(),
"1 b"));
+
+ Map<Integer, GenericRow> rows = new HashMap<>();
+ for (int i = 0; i < 20; i++) {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit();
+ for (int j = 0; j < 1000; j++) {
+ GenericRow row = rowData(1, i * j, 100L * i * j);
+ rows.put(row.getInt(1), row);
+ write.write(row);
+ }
+ commit.commit(write.prepareCommit());
+ write.close();
+ commit.close();
+ }
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = readBuilder.newRead();
+
+ Function<InternalRow, String> toString =
+ r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+ String[] expected =
+ rows.values().stream()
+ .sorted(Comparator.comparingInt(o -> o.getInt(1)))
+ .map(toString)
+ .toArray(String[]::new);
+ assertThat(getResult(read, splits,
toString)).containsExactly(expected);
+ }
+
@Test
public void testBatchWriteBuilder() throws Exception {
FileStoreTable table = createFileStoreTable();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/AsyncRecordReaderTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/AsyncRecordReaderTest.java
new file mode 100644
index 000000000..41039aabb
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/AsyncRecordReaderTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.reader.RecordReader;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link AsyncRecordReader}. */
+public class AsyncRecordReaderTest {
+
+ @Test
+ public void testNormal() throws IOException {
+ Queue<List<Integer>> queue = new LinkedList<>();
+ queue.add(Arrays.asList(1, 5, 6));
+ queue.add(Arrays.asList(4, 6, 8));
+ queue.add(Arrays.asList(9, 1));
+ AtomicInteger released = new AtomicInteger(0);
+ AtomicBoolean closed = new AtomicBoolean(false);
+ RecordReader<Integer> reader =
+ new RecordReader<Integer>() {
+ @Nullable
+ @Override
+ public RecordIterator<Integer> readBatch() {
+ List<Integer> values = queue.poll();
+ if (values == null) {
+ return null;
+ }
+ Queue<Integer> vQueue = new LinkedList<>(values);
+ return new RecordIterator<Integer>() {
+ @Nullable
+ @Override
+ public Integer next() {
+ return vQueue.poll();
+ }
+
+ @Override
+ public void releaseBatch() {
+ released.incrementAndGet();
+ }
+ };
+ }
+
+ @Override
+ public void close() {
+ closed.set(true);
+ }
+ };
+
+ AsyncRecordReader<Integer> asyncReader = new AsyncRecordReader<>(() ->
reader);
+ List<Integer> results = new ArrayList<>();
+ asyncReader.forEachRemaining(results::add);
+ assertThat(results).containsExactly(1, 5, 6, 4, 6, 8, 9, 1);
+ assertThat(released.get()).isEqualTo(3);
+ assertThat(closed.get()).isTrue();
+ }
+
+ @Test
+ public void testNonBlockingWhenException() {
+ String message = "Test Exception";
+ RecordReader<Integer> reader =
+ new RecordReader<Integer>() {
+ @Nullable
+ @Override
+ public RecordIterator<Integer> readBatch() {
+ throw new RuntimeException(message);
+ }
+
+ @Override
+ public void close() {}
+ };
+
+ AsyncRecordReader<Integer> asyncReader = new AsyncRecordReader<>(() ->
reader);
+ assertThatThrownBy(() -> asyncReader.forEachRemaining(v -> {}))
+ .hasMessageContaining(message);
+ }
+
+ @Test
+ public void testClassLoader() throws IOException {
+ ClassLoader goodClassLoader =
Thread.currentThread().getContextClassLoader();
+ try {
+ ClassLoader badClassLoader =
+ new ClassLoader() {
+ @Override
+ public Class<?> loadClass(String name) {
+ throw new RuntimeException();
+ }
+ };
+ Thread.currentThread().setContextClassLoader(badClassLoader);
+
+ RecordReader<Integer> reader1 =
+ new RecordReader<Integer>() {
+ @Nullable
+ @Override
+ public RecordIterator<Integer> readBatch() {
+ return null;
+ }
+
+ @Override
+ public void close() {}
+ };
+
+ AsyncRecordReader<Integer> asyncReader = new
AsyncRecordReader<>(() -> reader1);
+ asyncReader.forEachRemaining(v -> {});
+
+ Thread.currentThread().setContextClassLoader(goodClassLoader);
+ RecordReader<Integer> reader2 =
+ new RecordReader<Integer>() {
+ @Nullable
+ @Override
+ public RecordIterator<Integer> readBatch() {
+ try {
+ Thread.currentThread()
+ .getContextClassLoader()
+
.loadClass(AsyncRecordReaderTest.class.getName());
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+
+ @Override
+ public void close() {}
+ };
+
+ asyncReader = new AsyncRecordReader<>(() -> reader2);
+ asyncReader.forEachRemaining(v -> {});
+ } finally {
+ Thread.currentThread().setContextClassLoader(goodClassLoader);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index a8741b450..16e1942cd 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -134,7 +134,8 @@ public class TestChangelogDataReadWrite {
DeduplicateMergeFunction.factory(),
ignore -> avro,
pathFactory,
- EXTRACTOR);
+ EXTRACTOR,
+ new CoreOptions(new HashMap<>()));
return new KeyValueTableRead(read) {
@Override
public KeyValueTableRead withFilter(Predicate predicate) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
index 0f5867e64..594988ab6 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
@@ -55,6 +55,12 @@ public class AvroBulkFormat implements FormatReaderFactory {
return new AvroReader(fileIO, file);
}
+ @Override
+ public RecordReader<InternalRow> createReader(FileIO fileIO, Path file,
int poolSize)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
private class AvroReader implements RecordReader<InternalRow> {
private final FileIO fileIO;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index e09a72999..62957f24a 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -47,6 +47,7 @@ import org.apache.paimon.utils.Projection;
import org.apache.orc.TypeDescription;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
import java.util.ArrayList;
import java.util.List;
@@ -57,6 +58,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.types.DataTypeChecks.getFieldTypes;
/** Orc {@link FileFormat}. */
+@ThreadSafe
public class OrcFileFormat extends FileFormat {
public static final String IDENTIFIER = "orc";
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 3b794f039..02bb65802 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -94,7 +94,13 @@ public class OrcReaderFactory implements FormatReaderFactory
{
@Override
public OrcVectorizedReader createReader(FileIO fileIO, Path file) throws
IOException {
- Pool<OrcReaderBatch> poolOfBatches = createPoolOfBatches(1);
+ return createReader(fileIO, file, 1);
+ }
+
+ @Override
+ public OrcVectorizedReader createReader(FileIO fileIO, Path file, int
poolSize)
+ throws IOException {
+ Pool<OrcReaderBatch> poolOfBatches = createPoolOfBatches(poolSize);
RecordReader orcReader =
createRecordReader(
hadoopConfigWrapper.getHadoopConfig(),
@@ -135,7 +141,7 @@ public class OrcReaderFactory implements
FormatReaderFactory {
final Pool<OrcReaderBatch> pool = new Pool<>(numBatches);
for (int i = 0; i < numBatches; i++) {
- final VectorizedRowBatch orcBatch = createBatchWrapper(schema,
batchSize);
+ final VectorizedRowBatch orcBatch = createBatchWrapper(schema,
batchSize / numBatches);
final OrcReaderBatch batch = createReaderBatch(orcBatch,
pool.recycler());
pool.add(batch);
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 22c023167..995770601 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -108,6 +108,12 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
return new ParquetReader(reader, requestedSchema,
reader.getRecordCount(), poolOfBatches);
}
+ @Override
+ public RecordReader<InternalRow> createReader(FileIO fileIO, Path file,
int poolSize)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
private void setReadOptions(ParquetReadOptions.Builder builder) {
builder.useSignedStringMinMax(
conf.getBoolean("parquet.strings.signed-min-max.enabled",
false));