This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3290fcc31a [parquet] Introduce LongIterator to Parquet
RowIndexGenerator (#4991)
3290fcc31a is described below
commit 3290fcc31a1a140f75948252de31f88e80d3829c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jan 24 14:01:12 2025 +0800
[parquet] Introduce LongIterator to Parquet RowIndexGenerator (#4991)
---
.../paimon/data/columnar/ColumnarRowIterator.java | 17 +++---
.../data/columnar/VectorizedRowIterator.java | 3 +-
.../java/org/apache/paimon/utils/LongIterator.java | 69 ++++++++++++++++++++++
.../org/apache/paimon/utils/LongIteratorTest.java | 50 ++++++++++++++++
.../format/parquet/newreader/ColumnarBatch.java | 3 +-
.../parquet/newreader/RowIndexGenerator.java | 28 ++++-----
.../newreader/VectorizedParquetRecordReader.java | 2 +-
7 files changed, 144 insertions(+), 28 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index 1cd6e3199c..980c15c3a0 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.utils.LongIterator;
import org.apache.paimon.utils.RecyclableIterator;
import org.apache.paimon.utils.VectorMappingUtils;
@@ -51,17 +52,15 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
}
public void reset(long nextFilePos) {
- long[] positions = new long[row.batch().getNumRows()];
- for (int i = 0; i < row.batch().getNumRows(); i++) {
- positions[i] = nextFilePos++;
- }
- reset(positions);
+ reset(LongIterator.fromRange(nextFilePos, nextFilePos +
positions.length));
}
- public void reset(long[] positions) {
- assert positions.length == row.batch().getNumRows();
- this.positions = positions;
+ public void reset(LongIterator positions) {
this.num = row.batch().getNumRows();
+ this.positions = new long[num];
+ for (int i = 0; i < num; i++) {
+ this.positions[i] = positions.next();
+ }
this.nextPos = 0;
}
@@ -92,7 +91,7 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
protected ColumnarRowIterator copy(ColumnVector[] vectors) {
ColumnarRowIterator newIterator =
new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
- newIterator.reset(positions);
+ newIterator.reset(LongIterator.fromArray(positions));
return newIterator;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
index 592a7845fc..fa9b1ded84 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.data.columnar;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.VectorizedRecordIterator;
+import org.apache.paimon.utils.LongIterator;
import javax.annotation.Nullable;
@@ -39,7 +40,7 @@ public class VectorizedRowIterator extends
ColumnarRowIterator implements Vector
protected VectorizedRowIterator copy(ColumnVector[] vectors) {
VectorizedRowIterator newIterator =
new VectorizedRowIterator(filePath, row.copy(vectors),
recycler);
- newIterator.reset(positions);
+ newIterator.reset(LongIterator.fromArray(positions));
return newIterator;
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/LongIterator.java
b/paimon-common/src/main/java/org/apache/paimon/utils/LongIterator.java
new file mode 100644
index 0000000000..a63358f8e9
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/LongIterator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.NoSuchElementException;
+
+/** Iterator for long. */
+public interface LongIterator {
+
+ boolean hasNext();
+
+ long next();
+
+ static LongIterator fromRange(final long startInclusive, final long
endExclusive) {
+ return new LongIterator() {
+
+ private long i = startInclusive;
+
+ @Override
+ public boolean hasNext() {
+ return i < endExclusive;
+ }
+
+ @Override
+ public long next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return i++;
+ }
+ };
+ }
+
+ static LongIterator fromArray(final long[] longs) {
+ return new LongIterator() {
+
+ private int i = 0;
+
+ @Override
+ public boolean hasNext() {
+ return i < longs.length;
+ }
+
+ @Override
+ public long next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return longs[i++];
+ }
+ };
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/LongIteratorTest.java
b/paimon-common/src/test/java/org/apache/paimon/utils/LongIteratorTest.java
new file mode 100644
index 0000000000..dd90ef680b
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/LongIteratorTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class LongIteratorTest {
+
+ @Test
+ public void testRange() {
+ LongIterator iterator = LongIterator.fromRange(5, 10);
+ List<Long> list = new ArrayList<>();
+ while (iterator.hasNext()) {
+ list.add(iterator.next());
+ }
+ assertThat(list).containsExactlyInAnyOrder(5L, 6L, 7L, 8L, 9L);
+ }
+
+ @Test
+ public void testFromArray() {
+ long[] array = new long[] {5L, 6L, 7L, 8L, 9L};
+ LongIterator iterator = LongIterator.fromArray(array);
+ List<Long> list = new ArrayList<>();
+ while (iterator.hasNext()) {
+ list.add(iterator.next());
+ }
+ assertThat(list).containsExactlyInAnyOrder(5L, 6L, 7L, 8L, 9L);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ColumnarBatch.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ColumnarBatch.java
index 6bf00c106c..70695b3984 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ColumnarBatch.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ColumnarBatch.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.VectorizedRowIterator;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.LongIterator;
import java.util.Arrays;
@@ -55,7 +56,7 @@ public class ColumnarBatch {
}
/** Reset next record position and return self. */
- public void resetPositions(long[] positions) {
+ public void resetPositions(LongIterator positions) {
vectorizedRowIterator.reset(positions);
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/RowIndexGenerator.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/RowIndexGenerator.java
index 7e302a43f3..5de08910cd 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/RowIndexGenerator.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/RowIndexGenerator.java
@@ -18,10 +18,11 @@
package org.apache.paimon.format.parquet.newreader;
+import org.apache.paimon.utils.LongIterator;
+
import org.apache.parquet.column.page.PageReadStore;
-import java.util.Iterator;
-import java.util.stream.Stream;
+import java.util.PrimitiveIterator;
/* This file is based on source code from the Spark Project
(http://spark.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -30,37 +31,32 @@ import java.util.stream.Stream;
/** Generate row index for columnar batch. */
public class RowIndexGenerator {
- Iterator<Long> rowIndexIterator;
+ private LongIterator rowIndexIterator;
public void initFromPageReadStore(PageReadStore pageReadStore) {
long startingRowIdx = pageReadStore.getRowIndexOffset().orElse(0L);
-
- if (pageReadStore.getRowIndexes().isPresent()) {
- final Iterator<Long> rowIndexes =
pageReadStore.getRowIndexes().get();
+ PrimitiveIterator.OfLong rowIndexes =
pageReadStore.getRowIndexes().orElse(null);
+ if (rowIndexes != null) {
rowIndexIterator =
- new Iterator<Long>() {
+ new LongIterator() {
@Override
public boolean hasNext() {
return rowIndexes.hasNext();
}
@Override
- public Long next() {
- return rowIndexes.next() + startingRowIdx;
+ public long next() {
+ return rowIndexes.nextLong() + startingRowIdx;
}
};
} else {
long numRowsInRowGroup = pageReadStore.getRowCount();
rowIndexIterator =
- Stream.iterate(startingRowIdx, i -> i +
1).limit(numRowsInRowGroup).iterator();
+ LongIterator.fromRange(startingRowIdx, startingRowIdx +
numRowsInRowGroup);
}
}
- public void populateRowIndex(ColumnarBatch columnarBatch, int numRows) {
- long[] rowIndexes = new long[numRows];
- for (int i = 0; i < numRows; i++) {
- rowIndexes[i] = rowIndexIterator.next();
- }
- columnarBatch.resetPositions(rowIndexes);
+ public void populateRowIndex(ColumnarBatch columnarBatch) {
+ columnarBatch.resetPositions(rowIndexIterator);
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java
index e54fcd493f..90123113da 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java
@@ -286,7 +286,7 @@ public class VectorizedParquetRecordReader implements
FileRecordReader<InternalR
}
rowsReturned += num;
columnarBatch.setNumRows(num);
- rowIndexGenerator.populateRowIndex(columnarBatch, num);
+ rowIndexGenerator.populateRowIndex(columnarBatch);
return true;
}