This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8ae68a752d [core] Refactor getParquetReader with options (#7080)
8ae68a752d is described below
commit 8ae68a752dfabff3bf29ae3d032f85d5bd41fbf0
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jan 20 15:13:32 2026 +0800
[core] Refactor getParquetReader with options (#7080)
---
.../paimon/format/parquet/ParquetFileFormat.java | 2 +-
.../format/parquet/ParquetReaderFactory.java | 29 ++--------------------
.../parquet/ParquetSimpleStatsExtractor.java | 7 ++++--
.../apache/paimon/format/parquet/ParquetUtil.java | 19 ++++++++++----
.../format/parquet/ParquetFormatReadWriteTest.java | 3 ++-
.../writer/InferVariantShreddingWriteTest.java | 3 ++-
6 files changed, 26 insertions(+), 37 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
index a8368da882..9ccf88bd17 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
@@ -89,7 +89,7 @@ public class ParquetFileFormat extends FileFormat {
@Override
public Optional<SimpleStatsExtractor> createStatsExtractor(
RowType type, SimpleColStatsCollector.Factory[] statsCollectors) {
- return Optional.of(new ParquetSimpleStatsExtractor(type,
statsCollectors));
+ return Optional.of(new ParquetSimpleStatsExtractor(options, type,
statsCollectors));
}
private Options getParquetConfiguration(FormatContext context) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 2d2102bbd8..29f22e1de6 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -38,10 +38,8 @@ import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.parquet.ParquetReadOptions;
-import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.ConversionPatterns;
@@ -67,7 +65,6 @@ import static
org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetLis
import static
org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetMapKeyValueType;
import static
org.apache.paimon.format.parquet.reader.ParquetReaderUtil.buildFieldsList;
import static
org.apache.paimon.format.parquet.reader.ParquetReaderUtil.createWritableColumnVector;
-import static
org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
/**
* Parquet {@link FormatReaderFactory} that reads data from the file to {@link
@@ -77,8 +74,6 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
private static final Logger LOG =
LoggerFactory.getLogger(ParquetReaderFactory.class);
- private static final String ALLOCATION_SIZE =
"parquet.read.allocation.size";
-
private final Options conf;
private final DataField[] readFields;
private final int batchSize;
@@ -96,9 +91,9 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
public FileRecordReader<InternalRow>
createReader(FormatReaderFactory.Context context)
throws IOException {
ParquetReadOptions.Builder builder =
- ParquetReadOptions.builder(new PlainParquetConfiguration())
+ ParquetUtil.getParquetReadOptionsBuilder(conf)
+ .withRecordFilter(filter)
.withRange(0, context.fileSize());
- setReadOptions(builder);
ParquetFileReader reader =
new ParquetFileReader(
@@ -127,26 +122,6 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
context.filePath(), reader, fileSchema, fields,
writableVectors, batchSize);
}
- private void setReadOptions(ParquetReadOptions.Builder builder) {
- builder.useSignedStringMinMax(
- conf.getBoolean("parquet.strings.signed-min-max.enabled",
false));
- builder.useDictionaryFilter(
-
conf.getBoolean(ParquetInputFormat.DICTIONARY_FILTERING_ENABLED, true));
-
builder.useStatsFilter(conf.getBoolean(ParquetInputFormat.STATS_FILTERING_ENABLED,
true));
-
builder.useRecordFilter(conf.getBoolean(ParquetInputFormat.RECORD_FILTERING_ENABLED,
true));
- builder.useColumnIndexFilter(
-
conf.getBoolean(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, true));
- builder.usePageChecksumVerification(
-
conf.getBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false));
-
builder.useBloomFilter(conf.getBoolean(ParquetInputFormat.BLOOM_FILTERING_ENABLED,
true));
- builder.withMaxAllocationInBytes(conf.getInteger(ALLOCATION_SIZE,
8388608));
- String badRecordThresh = conf.getString(BAD_RECORD_THRESHOLD_CONF_KEY,
null);
- if (badRecordThresh != null) {
- builder.set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
- }
- builder.withRecordFilter(filter);
- }
-
/** Clips `parquetSchema` according to `fieldNames`. */
private MessageType clipParquetSchema(GroupType parquetSchema) {
Type[] types = new Type[readFields.length];
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java
index f68e36fa11..a265b76d4c 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java
@@ -25,6 +25,7 @@ import org.apache.paimon.format.SimpleColStats;
import org.apache.paimon.format.SimpleStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DecimalType;
@@ -56,14 +57,16 @@ public class ParquetSimpleStatsExtractor implements
SimpleStatsExtractor {
private final RowType rowType;
private final SimpleColStatsCollector.Factory[] statsCollectors;
+ private final Options options;
public ParquetSimpleStatsExtractor(
- RowType rowType, SimpleColStatsCollector.Factory[]
statsCollectors) {
+ Options options, RowType rowType,
SimpleColStatsCollector.Factory[] statsCollectors) {
this.rowType = rowType;
this.statsCollectors = statsCollectors;
Preconditions.checkArgument(
rowType.getFieldCount() == statsCollectors.length,
"The stats collector is not aligned to write schema.");
+ this.options = options;
}
@Override
@@ -75,7 +78,7 @@ public class ParquetSimpleStatsExtractor implements
SimpleStatsExtractor {
public Pair<SimpleColStats[], FileInfo> extractWithFileInfo(
FileIO fileIO, Path path, long length) throws IOException {
Pair<Map<String, Statistics<?>>, FileInfo> statsPair =
- ParquetUtil.extractColumnStats(fileIO, path, length);
+ ParquetUtil.extractColumnStats(fileIO, path, length, options);
SimpleColStatsCollector[] collectors =
SimpleColStatsCollector.create(statsCollectors);
return Pair.of(
IntStream.range(0, rowType.getFieldCount())
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
index 26b6a7b4e9..f3c358c375 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format.parquet;
import org.apache.paimon.format.SimpleStatsExtractor;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.Pair;
@@ -49,8 +50,9 @@ public class ParquetUtil {
* minimum value, maximum value)
*/
public static Pair<Map<String, Statistics<?>>,
SimpleStatsExtractor.FileInfo>
- extractColumnStats(FileIO fileIO, Path path, long length) throws
IOException {
- try (ParquetFileReader reader = getParquetReader(fileIO, path,
length)) {
+ extractColumnStats(FileIO fileIO, Path path, long length, Options
options)
+ throws IOException {
+ try (ParquetFileReader reader = getParquetReader(fileIO, path, length,
options)) {
ParquetMetadata parquetMetadata = reader.getFooter();
List<BlockMetaData> blockMetaDataList =
parquetMetadata.getBlocks();
Map<String, Statistics<?>> resultStats = new HashMap<>();
@@ -78,16 +80,23 @@ public class ParquetUtil {
*
* @param path the path of parquet file to be read
* @param length the length of parquet file to be read
+ * @param options the configuration
* @return parquet reader, used for reading footer, status, etc.
*/
- public static ParquetFileReader getParquetReader(FileIO fileIO, Path path,
long length)
- throws IOException {
+ public static ParquetFileReader getParquetReader(
+ FileIO fileIO, Path path, long length, Options options) throws
IOException {
return new ParquetFileReader(
ParquetInputFile.fromPath(fileIO, path, length),
- ParquetReadOptions.builder(new
PlainParquetConfiguration()).build(),
+ getParquetReadOptionsBuilder(options).build(),
null);
}
+ public static ParquetReadOptions.Builder
getParquetReadOptionsBuilder(Options options) {
+ PlainParquetConfiguration parquetConfiguration =
+ new PlainParquetConfiguration(options.toMap());
+ return ParquetReadOptions.builder(parquetConfiguration);
+ }
+
static void assertStatsClass(
DataField field, Statistics<?> stats, Class<? extends
Statistics<?>> expectedClass) {
if (!expectedClass.isInstance(stats)) {
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
index 024ea93b6e..0f71fbbcd7 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
@@ -76,7 +76,8 @@ public class ParquetFormatReadWriteTest extends
FormatReadWriteTest {
out.close();
try (ParquetFileReader reader =
- ParquetUtil.getParquetReader(fileIO, file,
fileIO.getFileSize(file))) {
+ ParquetUtil.getParquetReader(
+ fileIO, file, fileIO.getFileSize(file), new
Options())) {
ParquetMetadata parquetMetadata = reader.getFooter();
List<BlockMetaData> blockMetaDataList =
parquetMetadata.getBlocks();
for (BlockMetaData blockMetaData : blockMetaDataList) {
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
index fd42291652..f07c70784a 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
@@ -541,7 +541,8 @@ public class InferVariantShreddingWriteTest {
protected void verifyShreddingSchema(RowType... expectShreddedTypes)
throws IOException {
try (ParquetFileReader reader =
- ParquetUtil.getParquetReader(fileIO, file,
fileIO.getFileSize(file))) {
+ ParquetUtil.getParquetReader(
+ fileIO, file, fileIO.getFileSize(file), new
Options())) {
MessageType schema =
reader.getFooter().getFileMetaData().getSchema();
for (int i = 0; i < expectShreddedTypes.length; i++) {
assertThat(VariantUtils.variantFileType(schema.getType(i)))