This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ae68a752d [core] Refactor getParquetReader with options (#7080)
8ae68a752d is described below

commit 8ae68a752dfabff3bf29ae3d032f85d5bd41fbf0
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Jan 20 15:13:32 2026 +0800

    [core] Refactor getParquetReader with options (#7080)
---
 .../paimon/format/parquet/ParquetFileFormat.java   |  2 +-
 .../format/parquet/ParquetReaderFactory.java       | 29 ++--------------------
 .../parquet/ParquetSimpleStatsExtractor.java       |  7 ++++--
 .../apache/paimon/format/parquet/ParquetUtil.java  | 19 ++++++++++----
 .../format/parquet/ParquetFormatReadWriteTest.java |  3 ++-
 .../writer/InferVariantShreddingWriteTest.java     |  3 ++-
 6 files changed, 26 insertions(+), 37 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
index a8368da882..9ccf88bd17 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java
@@ -89,7 +89,7 @@ public class ParquetFileFormat extends FileFormat {
     @Override
     public Optional<SimpleStatsExtractor> createStatsExtractor(
             RowType type, SimpleColStatsCollector.Factory[] statsCollectors) {
-        return Optional.of(new ParquetSimpleStatsExtractor(type, 
statsCollectors));
+        return Optional.of(new ParquetSimpleStatsExtractor(options, type, 
statsCollectors));
     }
 
     private Options getParquetConfiguration(FormatContext context) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 2d2102bbd8..29f22e1de6 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -38,10 +38,8 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.parquet.ParquetReadOptions;
-import org.apache.parquet.conf.PlainParquetConfiguration;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.apache.parquet.io.ColumnIOFactory;
 import org.apache.parquet.io.MessageColumnIO;
 import org.apache.parquet.schema.ConversionPatterns;
@@ -67,7 +65,6 @@ import static 
org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetLis
 import static 
org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetMapKeyValueType;
 import static 
org.apache.paimon.format.parquet.reader.ParquetReaderUtil.buildFieldsList;
 import static 
org.apache.paimon.format.parquet.reader.ParquetReaderUtil.createWritableColumnVector;
-import static 
org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
 
 /**
  * Parquet {@link FormatReaderFactory} that reads data from the file to {@link
@@ -77,8 +74,6 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ParquetReaderFactory.class);
 
-    private static final String ALLOCATION_SIZE = 
"parquet.read.allocation.size";
-
     private final Options conf;
     private final DataField[] readFields;
     private final int batchSize;
@@ -96,9 +91,9 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
     public FileRecordReader<InternalRow> 
createReader(FormatReaderFactory.Context context)
             throws IOException {
         ParquetReadOptions.Builder builder =
-                ParquetReadOptions.builder(new PlainParquetConfiguration())
+                ParquetUtil.getParquetReadOptionsBuilder(conf)
+                        .withRecordFilter(filter)
                         .withRange(0, context.fileSize());
-        setReadOptions(builder);
 
         ParquetFileReader reader =
                 new ParquetFileReader(
@@ -127,26 +122,6 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
                 context.filePath(), reader, fileSchema, fields, 
writableVectors, batchSize);
     }
 
-    private void setReadOptions(ParquetReadOptions.Builder builder) {
-        builder.useSignedStringMinMax(
-                conf.getBoolean("parquet.strings.signed-min-max.enabled", 
false));
-        builder.useDictionaryFilter(
-                
conf.getBoolean(ParquetInputFormat.DICTIONARY_FILTERING_ENABLED, true));
-        
builder.useStatsFilter(conf.getBoolean(ParquetInputFormat.STATS_FILTERING_ENABLED,
 true));
-        
builder.useRecordFilter(conf.getBoolean(ParquetInputFormat.RECORD_FILTERING_ENABLED,
 true));
-        builder.useColumnIndexFilter(
-                
conf.getBoolean(ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED, true));
-        builder.usePageChecksumVerification(
-                
conf.getBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false));
-        
builder.useBloomFilter(conf.getBoolean(ParquetInputFormat.BLOOM_FILTERING_ENABLED,
 true));
-        builder.withMaxAllocationInBytes(conf.getInteger(ALLOCATION_SIZE, 
8388608));
-        String badRecordThresh = conf.getString(BAD_RECORD_THRESHOLD_CONF_KEY, 
null);
-        if (badRecordThresh != null) {
-            builder.set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh);
-        }
-        builder.withRecordFilter(filter);
-    }
-
     /** Clips `parquetSchema` according to `fieldNames`. */
     private MessageType clipParquetSchema(GroupType parquetSchema) {
         Type[] types = new Type[readFields.length];
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java
index f68e36fa11..a265b76d4c 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java
@@ -25,6 +25,7 @@ import org.apache.paimon.format.SimpleColStats;
 import org.apache.paimon.format.SimpleStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.statistics.SimpleColStatsCollector;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DecimalType;
@@ -56,14 +57,16 @@ public class ParquetSimpleStatsExtractor implements 
SimpleStatsExtractor {
 
     private final RowType rowType;
     private final SimpleColStatsCollector.Factory[] statsCollectors;
+    private final Options options;
 
     public ParquetSimpleStatsExtractor(
-            RowType rowType, SimpleColStatsCollector.Factory[] 
statsCollectors) {
+            Options options, RowType rowType, 
SimpleColStatsCollector.Factory[] statsCollectors) {
         this.rowType = rowType;
         this.statsCollectors = statsCollectors;
         Preconditions.checkArgument(
                 rowType.getFieldCount() == statsCollectors.length,
                 "The stats collector is not aligned to write schema.");
+        this.options = options;
     }
 
     @Override
@@ -75,7 +78,7 @@ public class ParquetSimpleStatsExtractor implements 
SimpleStatsExtractor {
     public Pair<SimpleColStats[], FileInfo> extractWithFileInfo(
             FileIO fileIO, Path path, long length) throws IOException {
         Pair<Map<String, Statistics<?>>, FileInfo> statsPair =
-                ParquetUtil.extractColumnStats(fileIO, path, length);
+                ParquetUtil.extractColumnStats(fileIO, path, length, options);
         SimpleColStatsCollector[] collectors = 
SimpleColStatsCollector.create(statsCollectors);
         return Pair.of(
                 IntStream.range(0, rowType.getFieldCount())
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
index 26b6a7b4e9..f3c358c375 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format.parquet;
 import org.apache.paimon.format.SimpleStatsExtractor;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.utils.Pair;
 
@@ -49,8 +50,9 @@ public class ParquetUtil {
      *     minimum value, maximum value)
      */
     public static Pair<Map<String, Statistics<?>>, 
SimpleStatsExtractor.FileInfo>
-            extractColumnStats(FileIO fileIO, Path path, long length) throws 
IOException {
-        try (ParquetFileReader reader = getParquetReader(fileIO, path, 
length)) {
+            extractColumnStats(FileIO fileIO, Path path, long length, Options 
options)
+                    throws IOException {
+        try (ParquetFileReader reader = getParquetReader(fileIO, path, length, 
options)) {
             ParquetMetadata parquetMetadata = reader.getFooter();
             List<BlockMetaData> blockMetaDataList = 
parquetMetadata.getBlocks();
             Map<String, Statistics<?>> resultStats = new HashMap<>();
@@ -78,16 +80,23 @@ public class ParquetUtil {
      *
      * @param path the path of parquet file to be read
      * @param length the length of parquet file to be read
+     * @param options the configuration
      * @return parquet reader, used for reading footer, status, etc.
      */
-    public static ParquetFileReader getParquetReader(FileIO fileIO, Path path, 
long length)
-            throws IOException {
+    public static ParquetFileReader getParquetReader(
+            FileIO fileIO, Path path, long length, Options options) throws 
IOException {
         return new ParquetFileReader(
                 ParquetInputFile.fromPath(fileIO, path, length),
-                ParquetReadOptions.builder(new 
PlainParquetConfiguration()).build(),
+                getParquetReadOptionsBuilder(options).build(),
                 null);
     }
 
+    public static ParquetReadOptions.Builder 
getParquetReadOptionsBuilder(Options options) {
+        PlainParquetConfiguration parquetConfiguration =
+                new PlainParquetConfiguration(options.toMap());
+        return ParquetReadOptions.builder(parquetConfiguration);
+    }
+
     static void assertStatsClass(
             DataField field, Statistics<?> stats, Class<? extends 
Statistics<?>> expectedClass) {
         if (!expectedClass.isInstance(stats)) {
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
index 024ea93b6e..0f71fbbcd7 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
@@ -76,7 +76,8 @@ public class ParquetFormatReadWriteTest extends 
FormatReadWriteTest {
         out.close();
 
         try (ParquetFileReader reader =
-                ParquetUtil.getParquetReader(fileIO, file, 
fileIO.getFileSize(file))) {
+                ParquetUtil.getParquetReader(
+                        fileIO, file, fileIO.getFileSize(file), new 
Options())) {
             ParquetMetadata parquetMetadata = reader.getFooter();
             List<BlockMetaData> blockMetaDataList = 
parquetMetadata.getBlocks();
             for (BlockMetaData blockMetaData : blockMetaDataList) {
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
index fd42291652..f07c70784a 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
@@ -541,7 +541,8 @@ public class InferVariantShreddingWriteTest {
 
     protected void verifyShreddingSchema(RowType... expectShreddedTypes) 
throws IOException {
         try (ParquetFileReader reader =
-                ParquetUtil.getParquetReader(fileIO, file, 
fileIO.getFileSize(file))) {
+                ParquetUtil.getParquetReader(
+                        fileIO, file, fileIO.getFileSize(file), new 
Options())) {
             MessageType schema = 
reader.getFooter().getFileMetaData().getSchema();
             for (int i = 0; i < expectShreddedTypes.length; i++) {
                 assertThat(VariantUtils.variantFileType(schema.getType(i)))

Reply via email to