This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3290fcc31a [parquet] Introduce LongIterator to Parquet 
RowIndexGenerator (#4991)
3290fcc31a is described below

commit 3290fcc31a1a140f75948252de31f88e80d3829c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jan 24 14:01:12 2025 +0800

    [parquet] Introduce LongIterator to Parquet RowIndexGenerator (#4991)
---
 .../paimon/data/columnar/ColumnarRowIterator.java  | 17 +++---
 .../data/columnar/VectorizedRowIterator.java       |  3 +-
 .../java/org/apache/paimon/utils/LongIterator.java | 69 ++++++++++++++++++++++
 .../org/apache/paimon/utils/LongIteratorTest.java  | 50 ++++++++++++++++
 .../format/parquet/newreader/ColumnarBatch.java    |  3 +-
 .../parquet/newreader/RowIndexGenerator.java       | 28 ++++-----
 .../newreader/VectorizedParquetRecordReader.java   |  2 +-
 7 files changed, 144 insertions(+), 28 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index 1cd6e3199c..980c15c3a0 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.utils.LongIterator;
 import org.apache.paimon.utils.RecyclableIterator;
 import org.apache.paimon.utils.VectorMappingUtils;
 
@@ -51,17 +52,15 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
     }
 
     public void reset(long nextFilePos) {
-        long[] positions = new long[row.batch().getNumRows()];
-        for (int i = 0; i < row.batch().getNumRows(); i++) {
-            positions[i] = nextFilePos++;
-        }
-        reset(positions);
+        reset(LongIterator.fromRange(nextFilePos, nextFilePos + 
positions.length));
     }
 
-    public void reset(long[] positions) {
-        assert positions.length == row.batch().getNumRows();
-        this.positions = positions;
+    public void reset(LongIterator positions) {
         this.num = row.batch().getNumRows();
+        this.positions = new long[num];
+        for (int i = 0; i < num; i++) {
+            this.positions[i] = positions.next();
+        }
         this.nextPos = 0;
     }
 
@@ -92,7 +91,7 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
     protected ColumnarRowIterator copy(ColumnVector[] vectors) {
         ColumnarRowIterator newIterator =
                 new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
-        newIterator.reset(positions);
+        newIterator.reset(LongIterator.fromArray(positions));
         return newIterator;
     }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
index 592a7845fc..fa9b1ded84 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
@@ -20,6 +20,7 @@ package org.apache.paimon.data.columnar;
 
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.VectorizedRecordIterator;
+import org.apache.paimon.utils.LongIterator;
 
 import javax.annotation.Nullable;
 
@@ -39,7 +40,7 @@ public class VectorizedRowIterator extends 
ColumnarRowIterator implements Vector
     protected VectorizedRowIterator copy(ColumnVector[] vectors) {
         VectorizedRowIterator newIterator =
                 new VectorizedRowIterator(filePath, row.copy(vectors), 
recycler);
-        newIterator.reset(positions);
+        newIterator.reset(LongIterator.fromArray(positions));
         return newIterator;
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/LongIterator.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/LongIterator.java
new file mode 100644
index 0000000000..a63358f8e9
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/LongIterator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.NoSuchElementException;
+
+/** Iterator for long. */
+public interface LongIterator {
+
+    boolean hasNext();
+
+    long next();
+
+    static LongIterator fromRange(final long startInclusive, final long 
endExclusive) {
+        return new LongIterator() {
+
+            private long i = startInclusive;
+
+            @Override
+            public boolean hasNext() {
+                return i < endExclusive;
+            }
+
+            @Override
+            public long next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+                return i++;
+            }
+        };
+    }
+
+    static LongIterator fromArray(final long[] longs) {
+        return new LongIterator() {
+
+            private int i = 0;
+
+            @Override
+            public boolean hasNext() {
+                return i < longs.length;
+            }
+
+            @Override
+            public long next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+                return longs[i++];
+            }
+        };
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/LongIteratorTest.java 
b/paimon-common/src/test/java/org/apache/paimon/utils/LongIteratorTest.java
new file mode 100644
index 0000000000..dd90ef680b
--- /dev/null
+++ b/paimon-common/src/test/java/org/apache/paimon/utils/LongIteratorTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class LongIteratorTest {
+
+    @Test
+    public void testRange() {
+        LongIterator iterator = LongIterator.fromRange(5, 10);
+        List<Long> list = new ArrayList<>();
+        while (iterator.hasNext()) {
+            list.add(iterator.next());
+        }
+        assertThat(list).containsExactlyInAnyOrder(5L, 6L, 7L, 8L, 9L);
+    }
+
+    @Test
+    public void testFromArray() {
+        long[] array = new long[] {5L, 6L, 7L, 8L, 9L};
+        LongIterator iterator = LongIterator.fromArray(array);
+        List<Long> list = new ArrayList<>();
+        while (iterator.hasNext()) {
+            list.add(iterator.next());
+        }
+        assertThat(list).containsExactlyInAnyOrder(5L, 6L, 7L, 8L, 9L);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ColumnarBatch.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ColumnarBatch.java
index 6bf00c106c..70695b3984 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ColumnarBatch.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ColumnarBatch.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.columnar.RowColumnVector;
 import org.apache.paimon.data.columnar.VectorizedColumnBatch;
 import org.apache.paimon.data.columnar.VectorizedRowIterator;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.LongIterator;
 
 import java.util.Arrays;
 
@@ -55,7 +56,7 @@ public class ColumnarBatch {
     }
 
     /** Reset next record position and return self. */
-    public void resetPositions(long[] positions) {
+    public void resetPositions(LongIterator positions) {
         vectorizedRowIterator.reset(positions);
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/RowIndexGenerator.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/RowIndexGenerator.java
index 7e302a43f3..5de08910cd 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/RowIndexGenerator.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/RowIndexGenerator.java
@@ -18,10 +18,11 @@
 
 package org.apache.paimon.format.parquet.newreader;
 
+import org.apache.paimon.utils.LongIterator;
+
 import org.apache.parquet.column.page.PageReadStore;
 
-import java.util.Iterator;
-import java.util.stream.Stream;
+import java.util.PrimitiveIterator;
 
 /* This file is based on source code from the Spark Project 
(http://spark.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -30,37 +31,32 @@ import java.util.stream.Stream;
 /** Generate row index for columnar batch. */
 public class RowIndexGenerator {
 
-    Iterator<Long> rowIndexIterator;
+    private LongIterator rowIndexIterator;
 
     public void initFromPageReadStore(PageReadStore pageReadStore) {
         long startingRowIdx = pageReadStore.getRowIndexOffset().orElse(0L);
-
-        if (pageReadStore.getRowIndexes().isPresent()) {
-            final Iterator<Long> rowIndexes = 
pageReadStore.getRowIndexes().get();
+        PrimitiveIterator.OfLong rowIndexes = 
pageReadStore.getRowIndexes().orElse(null);
+        if (rowIndexes != null) {
             rowIndexIterator =
-                    new Iterator<Long>() {
+                    new LongIterator() {
                         @Override
                         public boolean hasNext() {
                             return rowIndexes.hasNext();
                         }
 
                         @Override
-                        public Long next() {
-                            return rowIndexes.next() + startingRowIdx;
+                        public long next() {
+                            return rowIndexes.nextLong() + startingRowIdx;
                         }
                     };
         } else {
             long numRowsInRowGroup = pageReadStore.getRowCount();
             rowIndexIterator =
-                    Stream.iterate(startingRowIdx, i -> i + 
1).limit(numRowsInRowGroup).iterator();
+                    LongIterator.fromRange(startingRowIdx, startingRowIdx + 
numRowsInRowGroup);
         }
     }
 
-    public void populateRowIndex(ColumnarBatch columnarBatch, int numRows) {
-        long[] rowIndexes = new long[numRows];
-        for (int i = 0; i < numRows; i++) {
-            rowIndexes[i] = rowIndexIterator.next();
-        }
-        columnarBatch.resetPositions(rowIndexes);
+    public void populateRowIndex(ColumnarBatch columnarBatch) {
+        columnarBatch.resetPositions(rowIndexIterator);
     }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java
index e54fcd493f..90123113da 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedParquetRecordReader.java
@@ -286,7 +286,7 @@ public class VectorizedParquetRecordReader implements 
FileRecordReader<InternalR
         }
         rowsReturned += num;
         columnarBatch.setNumRows(num);
-        rowIndexGenerator.populateRowIndex(columnarBatch, num);
+        rowIndexGenerator.populateRowIndex(columnarBatch);
         return true;
     }
 

Reply via email to