This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ba3839068 [format] Bump parquet from 1.13.1 to 1.15.1 (#5421)
2ba3839068 is described below

commit 2ba38390687dbcc489d4557e484b41301880269c
Author: yangjf2019 <[email protected]>
AuthorDate: Mon Apr 21 14:16:04 2025 +0800

    [format] Bump parquet from 1.13.1 to 1.15.1 (#5421)
---
 paimon-arrow/pom.xml                               |  37 ++++
 paimon-core/pom.xml                                |   6 +
 .../utils/PartitionStatisticsReporterTest.java     |  11 +-
 .../postgres/PostgresSyncTableActionITCase.java    |   2 +-
 .../flink/PrimaryKeyFileStoreTableITCase.java      |   2 +-
 .../flink/action/CompactDatabaseActionITCase.java  |   6 +-
 ...nlySingleTableCompactionWorkerOperatorTest.java |  23 +--
 .../paimon/flink/sink/CommitterOperatorTest.java   |  17 +-
 paimon-format/pom.xml                              |   8 +
 .../apache/parquet/hadoop/ParquetFileReader.java   | 222 ++++++++++++++-------
 paimon-format/src/main/resources/META-INF/NOTICE   |  14 +-
 pom.xml                                            |   6 +-
 12 files changed, 246 insertions(+), 108 deletions(-)

diff --git a/paimon-arrow/pom.xml b/paimon-arrow/pom.xml
index 48a30f0b92..4823f8d47b 100644
--- a/paimon-arrow/pom.xml
+++ b/paimon-arrow/pom.xml
@@ -120,5 +120,42 @@ under the License.
             <version>${hadoop.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-test</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-core-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>jdk.tools</groupId>
+                    <artifactId>jdk.tools</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 </project>
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 9209b35122..e06183c7da 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -210,6 +210,12 @@ under the License.
             <artifactId>iceberg-data</artifactId>
             <version>${iceberg.version}</version>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>parquet-hadoop</artifactId>
+                    <groupId>org.apache.parquet</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
index ecc20dc686..ea1b504df4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/PartitionStatisticsReporterTest.java
@@ -37,7 +37,6 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
 
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -46,6 +45,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** Test for {@link PartitionStatisticsReporter}. */
 public class PartitionStatisticsReporterTest {
 
@@ -125,11 +126,11 @@ public class PartitionStatisticsReporterTest {
                 new PartitionStatisticsReporter(table, partitionHandler);
         long time = 1729598544974L;
         action.report("c1=a/", time);
-        Assertions.assertThat(partitionParams).containsKey("c1=a/");
-        Assertions.assertThat(partitionParams.get("c1=a/").toString())
+        assertThat(partitionParams).containsKey("c1=a/");
+        assertThat(partitionParams.get("c1=a/").toString())
                 .isEqualTo(
-                        "{spec={c1=a}, recordCount=1, fileSizeInBytes=591, 
fileCount=1, lastFileCreationTime=1729598544974}");
+                        "{spec={c1=a}, recordCount=1, fileSizeInBytes=662, 
fileCount=1, lastFileCreationTime=1729598544974}");
         action.close();
-        Assertions.assertThat(closed).isTrue();
+        assertThat(closed).isTrue();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
index 55403a90a9..8e0efd110b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncTableActionITCase.java
@@ -532,7 +532,7 @@ public class PostgresSyncTableActionITCase extends 
PostgresActionITCaseBase {
     }
 
     @Test
-    @Timeout(60)
+    @Timeout(180)
     public void testComputedColumn() throws Exception {
         // the first round checks for table creation
         // the second round checks for running the action on an existing table
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index d1ed5dbc84..117afd1213 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -66,7 +66,7 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 /** Tests for changelog table with primary keys. */
 public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase {
 
-    private static final int TIMEOUT = 180;
+    private static final int TIMEOUT = 480;
     private static final Logger LOG = 
LoggerFactory.getLogger(PrimaryKeyFileStoreTableITCase.class);
 
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index 8d7be925ef..dd5b4c03ea 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -1052,7 +1052,7 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
 
         waitUtil(
                 () -> snapshotManager.latestSnapshotId() == 11L,
-                Duration.ofSeconds(60),
+                Duration.ofSeconds(240),
                 Duration.ofMillis(500));
         jobClient.cancel();
 
@@ -1061,8 +1061,8 @@ public class CompactDatabaseActionITCase extends 
CompactActionITCaseBase {
 
         waitUtil(
                 () -> snapshotManager.earliestSnapshotId() == 9L,
-                Duration.ofSeconds(60),
-                Duration.ofMillis(200),
+                Duration.ofSeconds(240),
+                Duration.ofMillis(500),
                 "Failed to wait snapshot expiration success");
 
         List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
index 2f907320a8..c30bfdc03a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java
@@ -39,8 +39,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
 import org.apache.flink.streaming.util.MockOutput;
 import org.apache.flink.streaming.util.MockStreamConfig;
-import org.assertj.core.api.Assertions;
-import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
@@ -49,10 +47,13 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
 /** Tests for {@link AppendOnlySingleTableCompactionWorkerOperator}. */
 public class AppendOnlySingleTableCompactionWorkerOperatorTest extends 
TableTestBase {
 
-    @RepeatedTest(10)
+    @Test
     public void testAsyncCompactionWorks() throws Exception {
         createTableDefault();
         AppendOnlySingleTableCompactionWorkerOperator workerOperator =
@@ -73,7 +74,7 @@ public class 
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
         List<UnawareAppendCompactionTask> tasks = packTask(commitMessages, 5);
         List<StreamRecord<UnawareAppendCompactionTask>> records =
                 
tasks.stream().map(StreamRecord::new).collect(Collectors.toList());
-        Assertions.assertThat(tasks.size()).isEqualTo(4);
+        assertThat(tasks.size()).isEqualTo(4);
 
         workerOperator.open();
 
@@ -85,7 +86,7 @@ public class 
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
         Long timeStart = System.currentTimeMillis();
         long timeout = 60_000L;
 
-        Assertions.assertThatCode(
+        assertThatCode(
                         () -> {
                             while (committables.size() != 4) {
                                 committables.addAll(
@@ -105,7 +106,7 @@ public class 
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
                 .doesNotThrowAnyException();
         committables.forEach(
                 a ->
-                        Assertions.assertThat(
+                        assertThat(
                                         ((CommitMessageImpl) 
a.wrappedCommittable())
                                                         .compactIncrement()
                                                         .compactAfter()
@@ -140,7 +141,7 @@ public class 
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
         List<UnawareAppendCompactionTask> tasks = packTask(commitMessages, 5);
         List<StreamRecord<UnawareAppendCompactionTask>> records =
                 
tasks.stream().map(StreamRecord::new).collect(Collectors.toList());
-        Assertions.assertThat(tasks.size()).isEqualTo(8);
+        assertThat(tasks.size()).isEqualTo(8);
 
         workerOperator.open();
 
@@ -149,7 +150,7 @@ public class 
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
         }
 
         // wait compaction
-        Thread.sleep(500);
+        Thread.sleep(5000);
 
         LocalFileIO localFileIO = LocalFileIO.create();
         DataFilePathFactory dataFilePathFactory =
@@ -166,8 +167,7 @@ public class 
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
             List<DataFileMeta> fileMetas =
                     ((CommitMessageImpl) 
commitMessage).compactIncrement().compactAfter();
             for (DataFileMeta fileMeta : fileMetas) {
-                
Assertions.assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta)))
-                        .isTrue();
+                
assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta))).isTrue();
             }
             if (i++ > 2) {
                 break;
@@ -193,8 +193,7 @@ public class 
AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTest
                 List<DataFileMeta> fileMetas =
                         ((CommitMessageImpl) 
commitMessage).compactIncrement().compactAfter();
                 for (DataFileMeta fileMeta : fileMetas) {
-                    
Assertions.assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta)))
-                            .isFalse();
+                    
assertThat(localFileIO.exists(dataFilePathFactory.toPath(fileMeta))).isFalse();
                 }
             } catch (Exception e) {
                 // do nothing
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 41ccfbf79e..b5ce23ef50 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -58,7 +58,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Preconditions;
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -69,6 +68,7 @@ import java.util.UUID;
 
 import static org.apache.paimon.SnapshotTest.newSnapshotManager;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for {@link CommitterOperator}. */
@@ -282,9 +282,8 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         testHarness1.initializeState(snapshot);
         testHarness1.close();
 
-        Assertions.assertThat(actual.size()).isEqualTo(1);
-
-        
Assertions.assertThat(actual).hasSameElementsAs(Lists.newArrayList(commitUser));
+        assertThat(actual.size()).isEqualTo(1);
+        assertThat(actual).hasSameElementsAs(Lists.newArrayList(commitUser));
     }
 
     @Test
@@ -325,7 +324,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
                 createTestHarness(operatorFactory);
         testHarness.open();
-        Assertions.assertThatCode(
+        assertThatCode(
                         () -> {
                             long time = System.currentTimeMillis();
                             long cp = 0L;
@@ -387,7 +386,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 .doesNotThrowAnyException();
 
         if (operatorFactory instanceof CommitterOperator) {
-            Assertions.assertThat(
+            assertThat(
                             ((ManifestCommittable)
                                             ((CommitterOperator) 
operatorFactory)
                                                     
.committablesPerCheckpoint.get(Long.MAX_VALUE))
@@ -396,7 +395,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                     .isEqualTo(3);
         }
 
-        Assertions.assertThatCode(
+        assertThatCode(
                         () -> {
                             long time = System.currentTimeMillis();
                             long cp = 0L;
@@ -608,7 +607,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                         Committer.createContext("", metricGroup, true, false, 
null, 1, 1));
         committer.commit(Collections.singletonList(manifestCommittable));
         CommitterMetrics metrics = committer.getCommitterMetrics();
-        assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(533);
+        assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(572);
         assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
         committer.close();
     }
@@ -705,7 +704,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                         table, commitUser, new NoopCommittableStateManager());
         try (OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
                 createTestHarness(operatorFactory, 10, 10, 3)) {
-            Assertions.assertThatCode(testHarness::open)
+            assertThatCode(testHarness::open)
                     .hasMessage("Committer Operator parallelism in paimon MUST 
be one.");
         }
     }
diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml
index e141eac2aa..15a894938d 100644
--- a/paimon-format/pom.xml
+++ b/paimon-format/pom.xml
@@ -36,6 +36,7 @@ under the License.
         <commons.pool.version>1.6</commons.pool.version>
         <commons.lang3.version>3.12.0</commons.lang3.version>
         <storage-api.version>2.8.1</storage-api.version>
+        <commons.io.version>2.16.1</commons.io.version>
     </properties>
 
     <dependencies>
@@ -168,6 +169,12 @@ under the License.
             <version>${joda-time.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>${commons.io.version}</version>
+        </dependency>
+
         <!-- Avro End -->
 
         <!-- Parquet Start -->
@@ -333,6 +340,7 @@ under the License.
                                     
<include>com.fasterxml.jackson.core:jackson-databind</include>
                                     
<include>com.fasterxml.jackson.core:jackson-annotations</include>
                                     
<include>org.apache.commons:commons-compress</include>
+                                    <include>commons-io:commons-io</include>
 
                                     <!-- Parquet -->
                                     
<include>org.apache.parquet:parquet-hadoop</include>
diff --git 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index e480e11222..bebdba7670 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -20,14 +20,17 @@ package org.apache.parquet.hadoop;
 
 import org.apache.paimon.format.parquet.ParquetInputFile;
 import org.apache.paimon.format.parquet.ParquetInputStream;
-import org.apache.paimon.fs.FileRange;
 import org.apache.paimon.fs.VectoredReadable;
 import org.apache.paimon.utils.RoaringBitmap32;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.ByteBufferReleaser;
 import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.ReusingByteBufferAllocator;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.DataPage;
 import org.apache.parquet.column.page.DataPageV1;
@@ -63,6 +66,7 @@ import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.hadoop.util.wrapped.io.FutureIO;
 import org.apache.parquet.internal.column.columnindex.ColumnIndex;
 import org.apache.parquet.internal.column.columnindex.OffsetIndex;
 import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter;
@@ -71,10 +75,10 @@ import 
org.apache.parquet.internal.filter2.columnindex.RowRanges;
 import org.apache.parquet.internal.hadoop.metadata.IndexReference;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.ParquetFileRange;
 import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
-import org.apache.yetus.audience.InterfaceAudience.Private;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,11 +98,11 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.zip.CRC32;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian;
 import static 
org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.BLOOMFILTER;
 import static 
org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.DICTIONARY;
@@ -118,11 +122,20 @@ public class ParquetFileReader implements Closeable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ParquetFileReader.class);
 
+    public static final long HADOOP_VECTORED_READ_TIMEOUT_SECONDS = 300;
+
     private final ParquetMetadataConverter converter;
 
     private final CRC32 crc;
+    private final ReusingByteBufferAllocator crcAllocator;
+
+    public static final ParquetMetadata readFooter(
+            InputFile file, ParquetReadOptions options, SeekableInputStream f) 
throws IOException {
+        ParquetMetadataConverter converter = new 
ParquetMetadataConverter(options);
+        return readFooter(file, options, f, converter);
+    }
 
-    private static ParquetMetadata readFooter(
+    private static final ParquetMetadata readFooter(
             InputFile file,
             ParquetReadOptions options,
             SeekableInputStream f,
@@ -180,35 +193,39 @@ public class ParquetFileReader implements Closeable {
 
         // Read all the footer bytes in one time to avoid multiple read 
operations,
         // since it can be pretty time consuming for a single read operation 
in HDFS.
-        ByteBuffer footerBytesBuffer = ByteBuffer.allocate(fileMetadataLength);
-        f.readFully(footerBytesBuffer);
-        LOG.debug("Finished to read all footer bytes.");
-        footerBytesBuffer.flip();
-        InputStream footerBytesStream = 
ByteBufferInputStream.wrap(footerBytesBuffer);
-
-        // Regular file, or encrypted file with plaintext footer
-        if (!encryptedFooterMode) {
+        ByteBuffer footerBytesBuffer = 
options.getAllocator().allocate(fileMetadataLength);
+        try {
+            f.readFully(footerBytesBuffer);
+            LOG.debug("Finished to read all footer bytes.");
+            footerBytesBuffer.flip();
+            InputStream footerBytesStream = 
ByteBufferInputStream.wrap(footerBytesBuffer);
+
+            // Regular file, or encrypted file with plaintext footer
+            if (!encryptedFooterMode) {
+                return converter.readParquetMetadata(
+                        footerBytesStream,
+                        options.getMetadataFilter(),
+                        fileDecryptor,
+                        false,
+                        fileMetadataLength);
+            }
+
+            // Encrypted file with encrypted footer
+            if (null == fileDecryptor) {
+                throw new ParquetCryptoRuntimeException(
+                        "Trying to read file with encrypted footer. No keys 
available");
+            }
+            FileCryptoMetaData fileCryptoMetaData = 
readFileCryptoMetaData(footerBytesStream);
+            fileDecryptor.setFileCryptoMetaData(
+                    fileCryptoMetaData.getEncryption_algorithm(),
+                    true,
+                    fileCryptoMetaData.getKey_metadata());
+            // footer length is required only for signed plaintext footers
             return converter.readParquetMetadata(
-                    footerBytesStream,
-                    options.getMetadataFilter(),
-                    fileDecryptor,
-                    false,
-                    fileMetadataLength);
-        }
-
-        // Encrypted file with encrypted footer
-        if (null == fileDecryptor) {
-            throw new ParquetCryptoRuntimeException(
-                    "Trying to read file with encrypted footer. No keys 
available");
-        }
-        FileCryptoMetaData fileCryptoMetaData = 
readFileCryptoMetaData(footerBytesStream);
-        fileDecryptor.setFileCryptoMetaData(
-                fileCryptoMetaData.getEncryption_algorithm(),
-                true,
-                fileCryptoMetaData.getKey_metadata());
-        // footer length is required only for signed plaintext footers
-        return converter.readParquetMetadata(
-                footerBytesStream, options.getMetadataFilter(), fileDecryptor, 
true, 0);
+                    footerBytesStream, options.getMetadataFilter(), 
fileDecryptor, true, 0);
+        } finally {
+            options.getAllocator().release(footerBytesBuffer);
+        }
     }
 
     protected final ParquetInputStream f;
@@ -268,7 +285,13 @@ public class ParquetFileReader implements Closeable {
         for (ColumnDescriptor col : 
footer.getFileMetaData().getSchema().getColumns()) {
             paths.put(ColumnPath.get(col.getPath()), col);
         }
-        this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
+        if (options.usePageChecksumVerification()) {
+            this.crc = new CRC32();
+            this.crcAllocator = 
ReusingByteBufferAllocator.strict(options.getAllocator());
+        } else {
+            this.crc = null;
+            this.crcAllocator = null;
+        }
     }
 
     private static <T> List<T> listWithNulls(int size) {
@@ -446,7 +469,7 @@ public class ParquetFileReader implements Closeable {
         ColumnChunkPageReadStore rowGroup =
                 new ColumnChunkPageReadStore(block.getRowCount(), 
block.getRowIndexOffset());
         // prepare the list of consecutive parts to read them in one scan
-        List<ConsecutivePartList> allParts = new 
ArrayList<ConsecutivePartList>();
+        List<ConsecutivePartList> allParts = new ArrayList<>();
         ConsecutivePartList currentParts = null;
         for (ColumnChunkMetaData mc : block.getColumns()) {
             ColumnPath pathKey = mc.getPath();
@@ -466,6 +489,7 @@ public class ParquetFileReader implements Closeable {
         // actually read all the chunks
         ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount());
         readAllPartsVectoredOrNormal(allParts, builder);
+        rowGroup.setReleaser(builder.releaser);
         for (Chunk chunk : builder.build()) {
             readChunkPages(chunk, block, rowGroup);
         }
@@ -600,24 +624,25 @@ public class ParquetFileReader implements Closeable {
     @SuppressWarnings("checkstyle:JavadocParagraph")
     private void readVectored(List<ConsecutivePartList> allParts, 
ChunkListBuilder builder)
             throws IOException {
-        List<FileRange> ranges = new ArrayList<>(allParts.size());
+
+        List<ParquetFileRange> ranges = new ArrayList<>(allParts.size());
         long totalSize = 0;
         for (ConsecutivePartList consecutiveChunks : allParts) {
             final long len = consecutiveChunks.length;
-            checkArgument(
+            Preconditions.checkArgument(
                     len < Integer.MAX_VALUE,
                     "Invalid length %s for vectored read operation. It must be 
less than max integer value.",
                     len);
-            ranges.add(FileRange.createFileRange(consecutiveChunks.offset, 
(int) len));
+            ranges.add(new ParquetFileRange(consecutiveChunks.offset, (int) 
len));
             totalSize += len;
         }
         LOG.debug(
                 "Reading {} bytes of data with vectored IO in {} ranges", 
totalSize, ranges.size());
         // Request a vectored read;
-        ((VectoredReadable) f.in()).readVectored(ranges);
+        f.readVectored(ranges, options.getAllocator());
         int k = 0;
         for (ConsecutivePartList consecutivePart : allParts) {
-            FileRange currRange = ranges.get(k++);
+            ParquetFileRange currRange = ranges.get(k++);
             consecutivePart.readFromVectoredRange(currRange, builder);
         }
     }
@@ -707,6 +732,7 @@ public class ParquetFileReader implements Closeable {
         }
         // actually read all the chunks
         readAllPartsVectoredOrNormal(allParts, builder);
+        rowGroup.setReleaser(builder.releaser);
         for (Chunk chunk : builder.build()) {
             readChunkPages(chunk, block, rowGroup);
         }
@@ -798,11 +824,11 @@ public class ParquetFileReader implements Closeable {
         if (blockIndex < 0 || blockIndex >= blocks.size()) {
             return null;
         }
-        return new DictionaryPageReader(this, blocks.get(blockIndex));
+        return new DictionaryPageReader(this, blocks.get(blockIndex), 
options.getAllocator());
     }
 
     public DictionaryPageReader getDictionaryReader(BlockMetaData block) {
-        return new DictionaryPageReader(this, block);
+        return new DictionaryPageReader(this, block, options.getAllocator());
     }
 
     /**
@@ -888,10 +914,7 @@ public class ParquetFileReader implements Closeable {
         int uncompressedPageSize = pageHeader.getUncompressed_page_size();
         int compressedPageSize = pageHeader.getCompressed_page_size();
 
-        byte[] dictPageBytes = new byte[compressedPageSize];
-        fin.readFully(dictPageBytes);
-
-        BytesInput bin = BytesInput.from(dictPageBytes);
+        BytesInput bin = BytesInput.from(fin, compressedPageSize);
 
         if (null != pageDecryptor) {
             bin = BytesInput.from(pageDecryptor.decrypt(bin.toByteArray(), 
dictionaryPageAAD));
@@ -1090,6 +1113,7 @@ public class ParquetFileReader implements Closeable {
         private ChunkDescriptor lastDescriptor;
         private final long rowCount;
         private SeekableInputStream f;
+        private final ByteBufferReleaser releaser = new 
ByteBufferReleaser(options.getAllocator());
 
         public ChunkListBuilder(long rowCount) {
             this.rowCount = rowCount;
@@ -1101,6 +1125,10 @@ public class ParquetFileReader implements Closeable {
             this.f = f;
         }
 
+        void addBuffersToRelease(List<ByteBuffer> toRelease) {
+            toRelease.forEach(releaser::releaseLater);
+        }
+
         void setOffsetIndex(ChunkDescriptor descriptor, OffsetIndex 
offsetIndex) {
             map.computeIfAbsent(descriptor, d -> new ChunkData()).offsetIndex 
= offsetIndex;
         }
@@ -1161,16 +1189,18 @@ public class ParquetFileReader implements Closeable {
          * Calculate checksum of input bytes, throw decoding exception if it 
does not match the
          * provided reference crc.
          */
-        private void verifyCrc(int referenceCrc, byte[] bytes, String 
exceptionMsg) {
+        private void verifyCrc(int referenceCrc, BytesInput bytes, String 
exceptionMsg) {
             crc.reset();
-            crc.update(bytes);
+            try (ByteBufferReleaser releaser = crcAllocator.getReleaser()) {
+                crc.update(bytes.toByteBuffer(releaser));
+            }
             if (crc.getValue() != ((long) referenceCrc & 0xffffffffL)) {
                 throw new ParquetDecodingException(exceptionMsg);
             }
         }
 
         /**
-         * Read all of the pages in a given column chunk.
+         * Read all the pages in a given column chunk.
          *
          * @return the list of pages
          */
@@ -1237,7 +1267,7 @@ public class ParquetFileReader implements Closeable {
                         if (options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
                             verifyCrc(
                                     pageHeader.getCrc(),
-                                    pageBytes.toByteArray(),
+                                    pageBytes,
                                     "could not verify dictionary page 
integrity, CRC checksum verification failed");
                         }
                         DictionaryPageHeader dicHeader = 
pageHeader.getDictionary_page_header();
@@ -1258,7 +1288,7 @@ public class ParquetFileReader implements Closeable {
                         if (options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
                             verifyCrc(
                                     pageHeader.getCrc(),
-                                    pageBytes.toByteArray(),
+                                    pageBytes,
                                     "could not verify page integrity, CRC 
checksum verification failed");
                         }
                         DataPageV1 dataPageV1 =
@@ -1289,23 +1319,41 @@ public class ParquetFileReader implements Closeable {
                                 compressedPageSize
                                         - 
dataHeaderV2.getRepetition_levels_byte_length()
                                         - 
dataHeaderV2.getDefinition_levels_byte_length();
-                        pagesInChunk.add(
+                        final BytesInput repetitionLevels =
+                                this.readAsBytesInput(
+                                        
dataHeaderV2.getRepetition_levels_byte_length());
+                        final BytesInput definitionLevels =
+                                this.readAsBytesInput(
+                                        
dataHeaderV2.getDefinition_levels_byte_length());
+                        final BytesInput values = 
this.readAsBytesInput(dataSize);
+                        if (options.usePageChecksumVerification() && 
pageHeader.isSetCrc()) {
+                            pageBytes =
+                                    BytesInput.concat(repetitionLevels, 
definitionLevels, values);
+                            verifyCrc(
+                                    pageHeader.getCrc(),
+                                    pageBytes,
+                                    "could not verify page integrity, CRC 
checksum verification failed");
+                        }
+                        DataPageV2 dataPageV2 =
                                 new DataPageV2(
                                         dataHeaderV2.getNum_rows(),
                                         dataHeaderV2.getNum_nulls(),
                                         dataHeaderV2.getNum_values(),
-                                        this.readAsBytesInput(
-                                                
dataHeaderV2.getRepetition_levels_byte_length()),
-                                        this.readAsBytesInput(
-                                                
dataHeaderV2.getDefinition_levels_byte_length()),
+                                        repetitionLevels,
+                                        definitionLevels,
                                         
converter.getEncoding(dataHeaderV2.getEncoding()),
-                                        this.readAsBytesInput(dataSize),
+                                        values,
                                         uncompressedPageSize,
                                         converter.fromParquetStatistics(
                                                 
getFileMetaData().getCreatedBy(),
                                                 dataHeaderV2.getStatistics(),
                                                 type),
-                                        dataHeaderV2.isIs_compressed()));
+                                        dataHeaderV2.isIs_compressed());
+                        // Copy crc to new page, used for testing
+                        if (pageHeader.isSetCrc()) {
+                            dataPageV2.setCrc(pageHeader.getCrc());
+                        }
+                        pagesInChunk.add(dataPageV2);
                         valuesCountReadSoFar += dataHeaderV2.getNum_values();
                         ++dataPageCountReadSoFar;
                         break;
@@ -1346,7 +1394,8 @@ public class ParquetFileReader implements Closeable {
                     pageBlockDecryptor,
                     aadPrefix,
                     rowGroupOrdinal,
-                    columnOrdinal);
+                    columnOrdinal,
+                    options);
         }
 
         private boolean hasMorePages(long valuesCountReadSoFar, int 
dataPageCountReadSoFar) {
@@ -1528,11 +1577,14 @@ public class ParquetFileReader implements Closeable {
             if (lastAllocationSize > 0) {
                 
buffers.add(options.getAllocator().allocate(lastAllocationSize));
             }
+            builder.addBuffersToRelease(buffers);
 
+            long readStart = System.nanoTime();
             for (ByteBuffer buffer : buffers) {
                 f.readFully(buffer);
                 buffer.flip();
             }
+            setReadMetrics(readStart, length);
 
             // report in a counter the data we just scanned
             BenchmarkCounter.incrementBytesRead(length);
@@ -1542,24 +1594,60 @@ public class ParquetFileReader implements Closeable {
             }
         }
 
+        private void setReadMetrics(long startNs, long len) {
+            ParquetMetricsCallback metricsCallback = 
options.getMetricsCallback();
+            if (metricsCallback != null) {
+                long totalFileReadTimeNs = Math.max(System.nanoTime() - 
startNs, 0);
+                double sizeInMb = ((double) len) / (1024 * 1024);
+                double timeInSec = ((double) totalFileReadTimeNs) / 
1000_0000_0000L;
+                double throughput = sizeInMb / timeInSec;
+                LOG.debug(
+                        "Parquet: File Read stats:  Length: {} MB, Time: {} 
secs, throughput: {} MB/sec ",
+                        sizeInMb,
+                        timeInSec,
+                        throughput);
+                metricsCallback.setDuration(
+                        ParquetFileReaderMetrics.ReadTime.name(), 
totalFileReadTimeNs);
+                
metricsCallback.setValueLong(ParquetFileReaderMetrics.ReadSize.name(), length);
+                metricsCallback.setValueDouble(
+                        ParquetFileReaderMetrics.ReadThroughput.name(), 
throughput);
+            }
+        }
+
         /**
-         * Populate data in a parquet file range from a vectored range.
+         * Populate data in a parquet file range from a vectored range; will 
block for up to {@link
+         * #HADOOP_VECTORED_READ_TIMEOUT_SECONDS} seconds.
          *
          * @param currRange range to populated.
          * @param builder used to build chunk list to read the pages for the 
different columns.
          * @throws IOException if there is an error while reading from the 
stream, including a
          *     timeout.
          */
-        public void readFromVectoredRange(FileRange currRange, 
ChunkListBuilder builder)
+        public void readFromVectoredRange(ParquetFileRange currRange, 
ChunkListBuilder builder)
                 throws IOException {
-            byte[] buffer;
+            ByteBuffer buffer;
+            final long timeoutSeconds = HADOOP_VECTORED_READ_TIMEOUT_SECONDS;
+            long readStart = System.nanoTime();
             try {
-                buffer = currRange.getData().get();
-            } catch (InterruptedException | ExecutionException e) {
-                throw new RuntimeException(e);
+                LOG.debug(
+                        "Waiting for vectored read to finish for range {} with 
timeout {} seconds",
+                        currRange,
+                        timeoutSeconds);
+                buffer =
+                        FutureIO.awaitFuture(
+                                currRange.getDataReadFuture(), timeoutSeconds, 
TimeUnit.SECONDS);
+                setReadMetrics(readStart, currRange.getLength());
+                // report in a counter the data we just scanned
+                BenchmarkCounter.incrementBytesRead(currRange.getLength());
+            } catch (TimeoutException e) {
+                String error =
+                        String.format(
+                                "Timeout while fetching result for %s with 
time limit %d seconds",
+                                currRange, timeoutSeconds);
+                LOG.error(error, e);
+                throw new IOException(error, e);
             }
-
-            ByteBufferInputStream stream = 
ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer));
+            ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
             for (ChunkDescriptor descriptor : chunks) {
                 builder.add(descriptor, stream.sliceBuffers(descriptor.size), 
f);
             }
diff --git a/paimon-format/src/main/resources/META-INF/NOTICE 
b/paimon-format/src/main/resources/META-INF/NOTICE
index 44e3ca97a9..9c8cbf4171 100644
--- a/paimon-format/src/main/resources/META-INF/NOTICE
+++ b/paimon-format/src/main/resources/META-INF/NOTICE
@@ -13,18 +13,18 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - commons-lang:commons-lang:2.6
 - org.apache.commons:commons-lang3:3.12.0
 
-- org.apache.avro:avro:1.11.3
+- org.apache.avro:avro:1.11.4
 - com.fasterxml.jackson.core:jackson-core:2.14.2
 - com.fasterxml.jackson.core:jackson-databind:2.14.2
 - com.fasterxml.jackson.core:jackson-annotations:2.14.2
 - org.apache.commons:commons-compress:1.22
 
-- org.apache.parquet:parquet-hadoop:1.13.1
-- org.apache.parquet:parquet-column:1.13.1
-- org.apache.parquet:parquet-common:1.13.1
-- org.apache.parquet:parquet-encoding:1.13.1
-- org.apache.parquet:parquet-format-structures:1.13.1
-- org.apache.parquet:parquet-jackson:1.13.1
+- org.apache.parquet:parquet-hadoop:1.15.1
+- org.apache.parquet:parquet-column:1.15.1
+- org.apache.parquet:parquet-common:1.15.1
+- org.apache.parquet:parquet-encoding:1.15.1
+- org.apache.parquet:parquet-format-structures:1.15.1
+- org.apache.parquet:parquet-jackson:1.15.1
 - commons-pool:commons-pool:1.6
 
 This project bundles the following dependencies under the BSD license.
diff --git a/pom.xml b/pom.xml
index a65d95b9f7..87eb328824 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,7 +106,7 @@ under the License.
         <flink.reuseForks>true</flink.reuseForks>
         <testcontainers.version>1.19.1</testcontainers.version>
         <iceberg.version>1.6.1</iceberg.version>
-        <parquet.version>1.13.1</parquet.version>
+        <parquet.version>1.15.1</parquet.version>
         <orc.version>1.9.2</orc.version>
         <protobuf-java.version>3.19.6</protobuf-java.version>
         <roaringbitmap.version>1.2.1</roaringbitmap.version>
@@ -133,7 +133,7 @@ under the License.
         <jaxb.api.version>2.3.1</jaxb.api.version>
         <findbugs.version>1.3.9</findbugs.version>
         <json-smart.version>2.5.2</json-smart.version>
-        <avro.version>1.11.3</avro.version>
+        <avro.version>1.11.4</avro.version>
         <kafka.version>3.2.3</kafka.version>
         <scala-maven-plugin.version>3.2.2</scala-maven-plugin.version>
         <scalatest-maven-plugin.version>2.1.0</scalatest-maven-plugin.version>
@@ -979,7 +979,7 @@ under the License.
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-shade-plugin</artifactId>
-                    <version>3.4.1</version>
+                    <version>3.5.3</version>
                 </plugin>
 
                 <!-- configure scala style -->


Reply via email to