This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f71a26ac8d [core] Remove redundant reset by HeapRowVector (#5226)
f71a26ac8d is described below
commit f71a26ac8d82c57d71dabbe34aa14a59c000c4e6
Author: YeJunHao <[email protected]>
AuthorDate: Mon Mar 10 11:24:05 2025 +0800
[core] Remove redundant reset by HeapRowVector (#5226)
---
.../paimon/data/columnar/heap/HeapRowVector.java | 10 ----
.../newreader/VectorizedParquetRecordReader.java | 70 +++++++++++++---------
2 files changed, 43 insertions(+), 37 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
index 6653afaea7..6f1bec9ef9 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
@@ -47,16 +47,6 @@ public class HeapRowVector extends AbstractStructVector
return vectorizedColumnBatch;
}
- @Override
- public void reset() {
- super.reset();
- for (ColumnVector field : children) {
- if (field instanceof WritableColumnVector) {
- ((WritableColumnVector) field).reset();
- }
- }
- }
-
@Override
void reserveForHeapVector(int newCapacity) {
// Nothing to store.
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java
index 90123113da..2aae1d54f7 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java
@@ -58,6 +58,8 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
+import static java.lang.String.format;
+
/* This file is based on source code from the Spark Project
(http://spark.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
* additional information regarding copyright ownership. */
@@ -258,36 +260,50 @@ public class VectorizedParquetRecordReader implements
FileRecordReader<InternalR
}
public boolean nextBatch() throws IOException {
- // Primary key table will use the last record, so we can't reset first
- // TODO: remove usage of the last record by primary key table after
batch reset
- if (rowsReturned >= totalRowCount) {
- return false;
- }
- for (ParquetColumnVector vector : columnVectors) {
- vector.reset();
- }
- columnarBatch.setNumRows(0);
- checkEndOfRowGroup();
-
- int num = (int) Math.min(batchSize, totalCountLoadedSoFar -
rowsReturned);
- for (ParquetColumnVector cv : columnVectors) {
- for (ParquetColumnVector leafCv : cv.getLeaves()) {
- VectorizedColumnReader columnReader = leafCv.getColumnReader();
- if (columnReader != null) {
- columnReader.readBatch(
- num,
- leafCv.getColumn().getType(),
- leafCv.getValueVector(),
- leafCv.getRepetitionLevelVector(),
- leafCv.getDefinitionLevelVector());
+ try {
+ // Primary key table will use the last record, so we can't reset
first
+ // TODO: remove usage of the last record by primary key table
after batch reset
+ if (rowsReturned >= totalRowCount) {
+ return false;
+ }
+ for (ParquetColumnVector vector : columnVectors) {
+ vector.reset();
+ }
+ columnarBatch.setNumRows(0);
+ checkEndOfRowGroup();
+
+ int num = (int) Math.min(batchSize, totalCountLoadedSoFar -
rowsReturned);
+ for (ParquetColumnVector cv : columnVectors) {
+ for (ParquetColumnVector leafCv : cv.getLeaves()) {
+ VectorizedColumnReader columnReader =
leafCv.getColumnReader();
+ if (columnReader != null) {
+ columnReader.readBatch(
+ num,
+ leafCv.getColumn().getType(),
+ leafCv.getValueVector(),
+ leafCv.getRepetitionLevelVector(),
+ leafCv.getDefinitionLevelVector());
+ }
}
+ cv.assemble();
}
- cv.assemble();
+ rowsReturned += num;
+ columnarBatch.setNumRows(num);
+ rowIndexGenerator.populateRowIndex(columnarBatch);
+ return true;
+ } catch (IOException e) {
+ throw new IOException(
+ format(
+ "Exception in nextBatch, filePath: %s fileSchema:
%s",
+ filePath, fileSchema),
+ e);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ format(
+ "Exception in nextBatch, filePath: %s fileSchema:
%s",
+ filePath, fileSchema),
+ e);
}
- rowsReturned += num;
- columnarBatch.setNumRows(num);
- rowIndexGenerator.populateRowIndex(columnarBatch);
- return true;
}
private void checkEndOfRowGroup() throws IOException {