This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b271ce650b [parquet] Bump parquet version to 1.16.0 (#7081)
b271ce650b is described below

commit b271ce650bfe6e4d8c47dfaed3903c86a6b17923
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jan 22 18:27:07 2026 +0800

    [parquet] Bump parquet version to 1.16.0 (#7081)
---
 .../org/apache/paimon/rest/RESTCatalogTest.java    |   4 +-
 .../table/system/AllPartitionsTableTest.java       |  22 ++-
 .../utils/PartitionStatisticsReporterTest.java     |  10 +-
 .../paimon/flink/sink/CommitterOperatorTest.java   |   4 +-
 paimon-format/pom.xml                              |   6 +
 .../apache/parquet/hadoop/ParquetFileReader.java   |  85 ++++++++--
 .../org/apache/parquet/hadoop/ParquetWriter.java   | 181 ++++++++++++++++++++-
 paimon-format/src/main/resources/META-INF/NOTICE   |  16 +-
 paimon-hive/paimon-hive-connector-common/pom.xml   |   6 +
 paimon-iceberg/pom.xml                             |   4 +
 pom.xml                                            |   2 +-
 11 files changed, 294 insertions(+), 46 deletions(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index fe34113b22..81025f2405 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -2644,7 +2644,7 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
                     assertThat(r.getLong(8)).isEqualTo(1);
                     assertThat(r.getString(9).toString()).isEqualTo("updated");
                     assertThat(r.getLong(10)).isEqualTo(2);
-                    assertThat(r.getLong(11)).isEqualTo(2584);
+                    assertThat(r.getLong(11)).isGreaterThan(0);
                     assertThat(r.getLong(12)).isEqualTo(2);
                 };
         tablesCheck.accept(row);
@@ -2670,7 +2670,7 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
                     
assertThat(r.getString(1).toString()).isEqualTo("all_tables");
                     assertThat(r.getString(2).toString()).isEqualTo("f1=2");
                     assertThat(r.getLong(3)).isEqualTo(1);
-                    assertThat(r.getLong(4)).isEqualTo(1292);
+                    assertThat(r.getLong(4)).isGreaterThan(0);
                     assertThat(r.getLong(5)).isEqualTo(1);
                     assertThat(r.getBoolean(7)).isEqualTo(false);
                 };
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/AllPartitionsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/AllPartitionsTableTest.java
index e5cca01f45..2b6b9a33a9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/AllPartitionsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/AllPartitionsTableTest.java
@@ -34,8 +34,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
 
 import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
 import static org.apache.paimon.table.system.AllPartitionsTable.ALL_PARTITIONS;
@@ -67,12 +65,20 @@ public class AllPartitionsTableTest extends TableTestBase {
 
     @Test
     public void testAllPartitionsTable() throws Exception {
-        List<String> result =
-                read(allPartitionsTable).stream()
-                        .map(Objects::toString)
-                        .collect(Collectors.toList());
-        result = result.stream().filter(r -> 
!r.contains("path")).collect(Collectors.toList());
-        
assertThat(result.get(0).toString()).startsWith("+I(default,T,f1=1,1,680,1,");
+        List<InternalRow> rows = read(allPartitionsTable);
+
+        assertThat(rows.size()).isEqualTo(1);
+        InternalRow row = rows.get(0);
+        assertThat(row.getFieldCount()).isEqualTo(8);
+        assertThat(row.getString(0).toString()).isEqualTo("default"); // 
database_name
+        assertThat(row.getString(1).toString()).isEqualTo("T"); // table_name
+        assertThat(row.getString(2).toString()).isEqualTo("f1=1"); // 
partition_name
+        assertThat(row.getLong(3)).isEqualTo(1L); // record_count
+        assertThat(row.getLong(4)).isGreaterThan(0L); // file_size_in_bytes
+        assertThat(row.getLong(5)).isEqualTo(1L); // file_count
+        assertThat(row.getLong(6))
+                .isLessThanOrEqualTo(System.currentTimeMillis()); // 
last_file_creation_time
+        assertThat(row.getBoolean(7)).isFalse(); // done
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
index 7517fac6f1..02e81bdfd0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
@@ -133,9 +133,13 @@ public class PartitionStatisticsReporterTest {
         long time = 1729598544974L;
         action.report("c1=a/", time);
         assertThat(partitionParams).containsKey("c1=a/");
-        assertThat(partitionParams.get("c1=a/").toString())
-                .isEqualTo(
-                        "{spec={c1=a}, recordCount=2, fileSizeInBytes=705, 
fileCount=1, lastFileCreationTime=1729598544974, totalBuckets=-1}");
+        PartitionStatistics stats = partitionParams.get("c1=a/");
+        assertThat(stats.spec()).containsEntry("c1", "a");
+        assertThat(stats.recordCount()).isEqualTo(2);
+        assertThat(stats.fileSizeInBytes()).isGreaterThan(0); // 
fileSizeInBytes
+        assertThat(stats.fileCount()).isEqualTo(1);
+        assertThat(stats.lastFileCreationTime()).isEqualTo(1729598544974L);
+        assertThat(stats.totalBuckets()).isEqualTo(-1);
         action.close();
         assertThat(closed).isTrue();
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 4c4bd4350e..c533e175fa 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -635,7 +635,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                         Committer.createContext("", metricGroup, true, false, 
null, 1, 1));
         committer.commit(Collections.singletonList(manifestCommittable));
         CommitterMetrics metrics = committer.getCommitterMetrics();
-        assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(572);
+        
assertThat(metrics.getNumBytesOutCounter().getCount()).isGreaterThan(0);
         assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
         committer.close();
     }
@@ -664,7 +664,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                         Committer.createContext("", metricGroup, true, false, 
null, 1, 1));
         
committer.filterAndCommit(Collections.singletonList(manifestCommittable), true, 
false);
         CommitterMetrics metrics = committer.getCommitterMetrics();
-        assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(572);
+        
assertThat(metrics.getNumBytesOutCounter().getCount()).isGreaterThan(0);
         assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
         committer.close();
     }
diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml
index a82d3e7107..4f7f11965a 100644
--- a/paimon-format/pom.xml
+++ b/paimon-format/pom.xml
@@ -351,6 +351,8 @@ under the License.
                                     
<include>org.apache.parquet:parquet-format-structures</include>
                                     
<include>org.apache.parquet:parquet-jackson</include>
                                     
<include>commons-pool:commons-pool</include>
+                                    
<include>commons-pool:commons-pool</include>
+                                    
<include>org.locationtech.jts:jts-core</include>
 
                                     <!-- compress -->
                                     
<include>com.github.luben:zstd-jni</include>
@@ -411,6 +413,10 @@ under the License.
                                     <pattern>shaded.parquet</pattern>
                                     
<shadedPattern>org.apache.paimon.shade.parquet</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>org.locationtech.jts</pattern>
+                                    
<shadedPattern>org.apache.paimon.shade.org.locationtech.jts</shadedPattern>
+                                </relocation>
 
                                 <!-- Relocate Common. -->
                                 <relocation>
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 59700c4dc6..6f3a20be59 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -133,8 +133,26 @@ public class ParquetFileReader implements Closeable {
 
     public static final ParquetMetadata readFooter(
             InputFile file, ParquetReadOptions options, SeekableInputStream f) 
throws IOException {
+        return readFooter(file, options, f, /*closeStreamOnFailure*/ false);
+    }
+
+    private static final ParquetMetadata readFooter(
+            InputFile file,
+            ParquetReadOptions options,
+            SeekableInputStream f,
+            boolean closeStreamOnFailure)
+            throws IOException {
         ParquetMetadataConverter converter = new 
ParquetMetadataConverter(options);
-        return readFooter(file, options, f, converter);
+        try {
+            return readFooter(file, options, f, converter);
+        } catch (Exception e) {
+            // In case that readFooter throws an exception in the constructor, 
the new stream
+            // should be closed. Otherwise, there's no way to close this 
outside.
+            if (closeStreamOnFailure) {
+                f.close();
+            }
+            throw e;
+        }
     }
 
     private static final ParquetMetadata readFooter(
@@ -162,7 +180,13 @@ public class ParquetFileReader implements Closeable {
         long fileMetadataLengthIndex = fileLen - magic.length - 
footerLengthSize;
         LOG.debug("reading footer index at {}", fileMetadataLengthIndex);
         f.seek(fileMetadataLengthIndex);
-        int fileMetadataLength = readIntLittleEndian(f);
+        long readFileMetadataLength = readIntLittleEndian(f) & 0xFFFFFFFFL;
+        if (readFileMetadataLength > Integer.MAX_VALUE) {
+            throw new RuntimeException(
+                    "footer is too large: " + readFileMetadataLength + "to be 
read");
+        }
+        int fileMetadataLength = (int) readFileMetadataLength;
+
         f.readFully(magic);
 
         boolean encryptedFooterMode;
@@ -230,7 +254,7 @@ public class ParquetFileReader implements Closeable {
         }
     }
 
-    protected final ParquetInputStream f;
+    protected ParquetInputStream f;
     private final ParquetInputFile file;
     private final ParquetReadOptions options;
     private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<>();
@@ -253,19 +277,31 @@ public class ParquetFileReader implements Closeable {
     public ParquetFileReader(
             InputFile file, ParquetReadOptions options, @Nullable 
RoaringBitmap32 selection)
             throws IOException {
+        this(file, options, ((ParquetInputFile) file).newStream(), selection);
+    }
+
+    public ParquetFileReader(
+            InputFile file,
+            ParquetReadOptions options,
+            ParquetInputStream f,
+            @Nullable RoaringBitmap32 selection)
+            throws IOException {
+        this(file, readFooter(file, options, f, true), options, f, selection);
+    }
+
+    public ParquetFileReader(
+            InputFile file,
+            ParquetMetadata footer,
+            ParquetReadOptions options,
+            ParquetInputStream f,
+            @Nullable RoaringBitmap32 selection)
+            throws IOException {
         this.converter = new ParquetMetadataConverter(options);
         this.file = (ParquetInputFile) file;
-        this.f = this.file.newStream();
+        this.f = f;
         this.options = options;
         this.selection = selection;
-        try {
-            this.footer = readFooter(file, options, f, converter);
-        } catch (Exception e) {
-            // In case that reading footer throws an exception in the 
constructor, the new stream
-            // should be closed. Otherwise, there's no way to close this 
outside.
-            f.close();
-            throw e;
-        }
+        this.footer = footer;
         this.fileMetaData = footer.getFileMetaData();
         this.fileDecryptor =
                 fileMetaData.getFileDecryptor(); // must be called before 
filterRowGroups!
@@ -407,13 +443,17 @@ public class ParquetFileReader implements Closeable {
         return blocks;
     }
 
-    public void setRequestedSchema(MessageType projection) {
+    public void setRequestedSchema(List<ColumnDescriptor> columns) {
         paths.clear();
-        for (ColumnDescriptor col : projection.getColumns()) {
+        for (ColumnDescriptor col : columns) {
             paths.put(ColumnPath.get(col.getPath()), col);
         }
     }
 
+    public void setRequestedSchema(MessageType projection) {
+        setRequestedSchema(projection.getColumns());
+    }
+
     public void appendTo(ParquetFileWriter writer) throws IOException {
         writer.appendRowGroups(f, blocks, true);
     }
@@ -1086,7 +1126,14 @@ public class ParquetFileReader implements Closeable {
         byte[] bitset;
         if (null == bloomFilterDecryptor) {
             bitset = new byte[numBytes];
-            in.read(bitset);
+            // For negative bloomFilterLength (files from older versions), use 
readFully() instead
+            // of read(). readFully() guarantees reading exactly numBytes 
bytes, while read() may
+            // read fewer bytes in a single call. This ensures the entire 
bitset is properly loaded.
+            if (bloomFilterLength < 0) {
+                f.readFully(bitset);
+            } else {
+                in.read(bitset);
+            }
         } else {
             bitset = bloomFilterDecryptor.decrypt(in, bloomFilterBitsetAAD);
             if (bitset.length != numBytes) {
@@ -1166,6 +1213,14 @@ public class ParquetFileReader implements Closeable {
                 Util.readOffsetIndex(f, offsetIndexDecryptor, offsetIndexAAD));
     }
 
+    /**
+     * Explicitly detach the input stream for the file to avoid being closed 
via {@link
+     * ParquetFileReader#close()}.
+     */
+    public void detachFileInputStream() {
+        f = null;
+    }
+
     @Override
     public void close() throws IOException {
         try {
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 79a2a10695..3358e84fdc 100644
--- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -19,8 +19,11 @@
 package org.apache.parquet.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.compression.CompressionCodecFactory;
 import org.apache.parquet.crypto.FileEncryptionProperties;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -56,22 +59,34 @@ public class ParquetWriter<T> implements Closeable {
     public static final int MAX_PADDING_SIZE_DEFAULT = 8 * 1024 * 1024; // 8MB
 
     private final InternalParquetRecordWriter<T> writer;
-    private final CodecFactory codecFactory;
+    private final CompressionCodecFactory codecFactory;
 
     ParquetWriter(
             OutputFile file,
             ParquetFileWriter.Mode mode,
             WriteSupport<T> writeSupport,
             CompressionCodecName compressionCodecName,
+            CompressionCodecFactory codecFactory,
             long rowGroupSize,
             boolean validating,
             Configuration conf,
             int maxPaddingSize,
-            ParquetProperties encodingProps)
+            ParquetProperties encodingProps,
+            FileEncryptionProperties encryptionProperties)
             throws IOException {
         WriteSupport.WriteContext writeContext = writeSupport.init(conf);
         MessageType schema = writeContext.getSchema();
 
+        // encryptionProperties could be built from the implementation of
+        // EncryptionPropertiesFactory when it is attached.
+        if (encryptionProperties == null) {
+            Path tempFilePath =
+                    file != null && file.getPath() != null ? new 
Path(file.getPath()) : null;
+            encryptionProperties =
+                    EncryptionPropertiesHelper.createEncryptionProperties(
+                            conf, tempFilePath, writeContext);
+        }
+
         ParquetFileWriter fileWriter =
                 new ParquetFileWriter(
                         file,
@@ -79,14 +94,13 @@ public class ParquetWriter<T> implements Closeable {
                         mode,
                         rowGroupSize,
                         maxPaddingSize,
-                        encodingProps.getColumnIndexTruncateLength(),
-                        encodingProps.getStatisticsTruncateLength(),
-                        encodingProps.getPageWriteChecksumEnabled(),
-                        (FileEncryptionProperties) null);
+                        encryptionProperties,
+                        encodingProps);
         fileWriter.start();
 
-        this.codecFactory = new CodecFactory(conf, 
encodingProps.getPageSizeThreshold());
-        CodecFactory.BytesCompressor compressor = 
codecFactory.getCompressor(compressionCodecName);
+        this.codecFactory = codecFactory;
+        CompressionCodecFactory.BytesInputCompressor compressor =
+                codecFactory.getCompressor(compressionCodecName);
 
         final Map<String, String> extraMetadata;
         if (encodingProps.getExtraMetaData() == null
@@ -169,8 +183,10 @@ public class ParquetWriter<T> implements Closeable {
 
         private final OutputFile file;
 
+        private FileEncryptionProperties encryptionProperties = null;
         private Configuration conf = new Configuration(false);
         private ParquetFileWriter.Mode mode;
+        private CompressionCodecFactory codecFactory = null;
         private CompressionCodecName codecName = 
DEFAULT_COMPRESSION_CODEC_NAME;
         private long rowGroupSize = DEFAULT_BLOCK_SIZE;
         private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT;
@@ -224,6 +240,29 @@ public class ParquetWriter<T> implements Closeable {
             return self();
         }
 
+        /**
+         * Set the {@link CompressionCodecFactory codec factory} used by the 
constructed writer.
+         *
+         * @param codecFactory a {@link CompressionCodecFactory}
+         * @return this builder for method chaining.
+         */
+        public SELF withCodecFactory(CompressionCodecFactory codecFactory) {
+            this.codecFactory = codecFactory;
+            return self();
+        }
+
+        /**
+         * Set the {@link FileEncryptionProperties file encryption properties} 
used by the
+         * constructed writer.
+         *
+         * @param encryptionProperties a {@code FileEncryptionProperties}
+         * @return this builder for method chaining.
+         */
+        public SELF withEncryption(FileEncryptionProperties 
encryptionProperties) {
+            this.encryptionProperties = encryptionProperties;
+            return self();
+        }
+
         /**
          * Set the Parquet format row group size used by the constructed 
writer.
          *
@@ -258,6 +297,17 @@ public class ParquetWriter<T> implements Closeable {
             return self();
         }
 
+        /**
+         * Sets the Parquet format row group row count limit used by the 
constructed writer.
+         *
+         * @param rowCount limit for the number of rows stored in a row group
+         * @return this builder for method chaining
+         */
+        public SELF withRowGroupRowCountLimit(int rowCount) {
+            encodingPropsBuilder.withRowGroupRowCountLimit(rowCount);
+            return self();
+        }
+
         /**
          * Sets the Parquet format page row count limit used by the 
constructed writer.
          *
@@ -384,6 +434,17 @@ public class ParquetWriter<T> implements Closeable {
             return self();
         }
 
+        /**
+         * Set max Bloom filter bytes for related columns.
+         *
+         * @param maxBloomFilterBytes the max bytes of a Bloom filter bitset 
for a column.
+         * @return this builder for method chaining
+         */
+        public SELF withMaxBloomFilterBytes(int maxBloomFilterBytes) {
+            encodingPropsBuilder.withMaxBloomFilterBytes(maxBloomFilterBytes);
+            return self();
+        }
+
         /**
          * Sets the NDV (number of distinct values) for the specified column.
          *
@@ -402,6 +463,29 @@ public class ParquetWriter<T> implements Closeable {
             return self();
         }
 
+        /**
+         * When NDV (number of distinct values) for a specified column is not 
set, whether to use
+         * `AdaptiveBloomFilter` to automatically adjust the BloomFilter size 
according to
+         * `parquet.bloom.filter.max.bytes`.
+         *
+         * @param enabled whether to write bloom filter for the column
+         */
+        public SELF withAdaptiveBloomFilterEnabled(boolean enabled) {
+            encodingPropsBuilder.withAdaptiveBloomFilterEnabled(enabled);
+            return self();
+        }
+
+        /**
+         * When `AdaptiveBloomFilter` is enabled, set how many bloom filter 
candidates to use.
+         *
+         * @param columnPath the path of the column (dot-string)
+         * @param number the number of candidate
+         */
+        public SELF withBloomFilterCandidateNumber(String columnPath, int 
number) {
+            encodingPropsBuilder.withBloomFilterCandidatesNumber(columnPath, 
number);
+            return self();
+        }
+
         /**
          * Sets the bloom filter enabled/disabled.
          *
@@ -471,6 +555,28 @@ public class ParquetWriter<T> implements Closeable {
             return self();
         }
 
+        /**
+         * Sets additional metadata entries to be included in the file footer.
+         *
+         * @param extraMetaData a Map of additional stringly-typed metadata 
entries
+         * @return this builder for method chaining
+         */
+        public SELF withExtraMetaData(Map<String, String> extraMetaData) {
+            encodingPropsBuilder.withExtraMetaData(extraMetaData);
+            return self();
+        }
+
+        /**
+         * Sets the ByteBuffer allocator instance to be used for allocating 
memory for writing.
+         *
+         * @param allocator the allocator instance
+         * @return this builder for method chaining
+         */
+        public SELF withAllocator(ByteBufferAllocator allocator) {
+            encodingPropsBuilder.withAllocator(allocator);
+            return self();
+        }
+
         /**
          * Set a property that will be available to the read path. For writers 
that use a Hadoop
          * configuration, this is the recommended way to add configuration 
values.
@@ -484,6 +590,56 @@ public class ParquetWriter<T> implements Closeable {
             return self();
         }
 
+        /**
+         * Sets the statistics enabled/disabled for the specified column. All 
column statistics are
+         * enabled by default.
+         *
+         * @param columnPath the path of the column (dot-string)
+         * @param enabled whether to write calculate statistics for the column
+         * @return this builder for method chaining
+         */
+        public SELF withStatisticsEnabled(String columnPath, boolean enabled) {
+            encodingPropsBuilder.withStatisticsEnabled(columnPath, enabled);
+            return self();
+        }
+
+        /**
+         * Sets whether statistics are enabled globally. When disabled, 
statistics will not be
+         * collected for any column unless explicitly enabled for specific 
columns.
+         *
+         * @param enabled whether to collect statistics globally
+         * @return this builder for method chaining
+         */
+        public SELF withStatisticsEnabled(boolean enabled) {
+            encodingPropsBuilder.withStatisticsEnabled(enabled);
+            return self();
+        }
+
+        /**
+         * Sets the size statistics enabled/disabled for the specified column. 
All column size
+         * statistics are enabled by default.
+         *
+         * @param columnPath the path of the column (dot-string)
+         * @param enabled whether to collect size statistics for the column
+         * @return this builder for method chaining
+         */
+        public SELF withSizeStatisticsEnabled(String columnPath, boolean 
enabled) {
+            encodingPropsBuilder.withSizeStatisticsEnabled(columnPath, 
enabled);
+            return self();
+        }
+
+        /**
+         * Sets whether size statistics are enabled globally. When disabled, 
size statistics will
+         * not be collected for any column unless explicitly enabled for 
specific columns.
+         *
+         * @param enabled whether to collect size statistics globally
+         * @return this builder for method chaining
+         */
+        public SELF withSizeStatisticsEnabled(boolean enabled) {
+            encodingPropsBuilder.withSizeStatisticsEnabled(enabled);
+            return self();
+        }
+
         /**
          * Build a {@link ParquetWriter} with the accumulated configuration.
          *
@@ -491,16 +647,23 @@ public class ParquetWriter<T> implements Closeable {
          * @throws IOException if there is an error while creating the writer
          */
         public ParquetWriter<T> build() throws IOException {
+            ParquetProperties encodingProps = encodingPropsBuilder.build();
+            if (codecFactory == null) {
+                codecFactory = new CodecFactory(conf, 
encodingProps.getPageSizeThreshold());
+            }
+
             return new ParquetWriter<>(
                     file,
                     mode,
                     getWriteSupport(conf),
                     codecName,
+                    codecFactory,
                     rowGroupSize,
                     enableValidation,
                     conf,
                     maxPaddingSize,
-                    encodingPropsBuilder.build());
+                    encodingProps,
+                    encryptionProperties);
         }
     }
 }
diff --git a/paimon-format/src/main/resources/META-INF/NOTICE 
b/paimon-format/src/main/resources/META-INF/NOTICE
index 5f2c582bdd..91510b5093 100644
--- a/paimon-format/src/main/resources/META-INF/NOTICE
+++ b/paimon-format/src/main/resources/META-INF/NOTICE
@@ -19,12 +19,12 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - com.fasterxml.jackson.core:jackson-annotations:2.14.2
 - org.apache.commons:commons-compress:1.22
 
-- org.apache.parquet:parquet-hadoop:1.15.2
-- org.apache.parquet:parquet-column:1.15.2
-- org.apache.parquet:parquet-common:1.15.2
-- org.apache.parquet:parquet-encoding:1.15.2
-- org.apache.parquet:parquet-format-structures:1.15.2
-- org.apache.parquet:parquet-jackson:1.15.2
+- org.apache.parquet:parquet-hadoop:1.16.0
+- org.apache.parquet:parquet-column:1.16.0
+- org.apache.parquet:parquet-common:1.16.0
+- org.apache.parquet:parquet-encoding:1.16.0
+- org.apache.parquet:parquet-format-structures:1.16.0
+- org.apache.parquet:parquet-jackson:1.16.0
 - commons-pool:commons-pool:1.6
 
 This project bundles the following dependencies under the BSD license.
@@ -34,3 +34,7 @@ and licenses/LICENSE.threeten-extra
 - com.google.protobuf:protobuf-java:3.19.6
 - com.github.luben:zstd-jni:1.5.5-11
 - org.threeten:threeten-extra:1.7.1
+
+This project bundles the following dependencies under the BSD 3-clause 
license. (http://www.opensource.org/licenses/BSD-3-Clause)
+
+- org.locationtech.jts:jts-core:1.20.0
diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml 
b/paimon-hive/paimon-hive-connector-common/pom.xml
index 22c06a97e3..9ae8f7c065 100644
--- a/paimon-hive/paimon-hive-connector-common/pom.xml
+++ b/paimon-hive/paimon-hive-connector-common/pom.xml
@@ -44,6 +44,12 @@ under the License.
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-bundle</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
diff --git a/paimon-iceberg/pom.xml b/paimon-iceberg/pom.xml
index a2a0450500..c1ae683350 100644
--- a/paimon-iceberg/pom.xml
+++ b/paimon-iceberg/pom.xml
@@ -53,6 +53,10 @@ under the License.
                     <groupId>org.apache.orc</groupId>
                     <artifactId>orc-core</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
diff --git a/pom.xml b/pom.xml
index c9025eee8c..3100a4df6b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,7 +110,7 @@ under the License.
         <testcontainers.version>1.19.1</testcontainers.version>
         <iceberg.version>1.6.1</iceberg.version>
         <hudi.version>0.15.0</hudi.version>
-        <parquet.version>1.15.2</parquet.version>
+        <parquet.version>1.16.0</parquet.version>
         <orc.version>1.9.2</orc.version>
         <protobuf-java.version>3.19.6</protobuf-java.version>
         <roaringbitmap.version>1.2.1</roaringbitmap.version>

Reply via email to