This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/release-1.0 by this push:
     new 73c3097c84 [core] Refactory ColumnarRowIterator using LongIterator. 
(#4992)
73c3097c84 is described below

commit 73c3097c84044c34a253434b2c55f4d88a5f495c
Author: YeJunHao <[email protected]>
AuthorDate: Fri Jan 24 15:47:41 2025 +0800

    [core] Refactory ColumnarRowIterator using LongIterator. (#4992)
---
 .../converter/ArrowVectorizedBatchConverter.java   | 37 +++++++-----
 .../paimon/data/columnar/ColumnarRowIterator.java  | 36 +++++++-----
 .../data/columnar/VectorizedRowIterator.java       |  6 +-
 .../data/columnar/ColumnarRowIteratorTest.java     | 65 ++++++++++++++++++++++
 .../heap/RowColumnVectorTest.java                  |  5 +-
 5 files changed, 115 insertions(+), 34 deletions(-)

diff --git 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java
 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java
index 8e12d56dcd..cc7c6dc301 100644
--- 
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java
+++ 
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java
@@ -32,6 +32,8 @@ import org.apache.arrow.vector.VectorSchemaRoot;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+
 /** To convert {@link VectorizedColumnBatch} to Arrow format. */
 public class ArrowVectorizedBatchConverter extends ArrowBatchConverter {
 
@@ -65,22 +67,27 @@ public class ArrowVectorizedBatchConverter extends 
ArrowBatchConverter {
         FileRecordIterator<InternalRow> innerIterator = iterator.iterator();
         this.batch = ((VectorizedRecordIterator) innerIterator).batch();
 
-        long firstReturnedPosition = innerIterator.returnedPosition() + 1;
-        DeletionVector deletionVector = iterator.deletionVector();
-        int originNumRows = this.batch.getNumRows();
-        IntArrayList picked = new IntArrayList(originNumRows);
-        for (int i = 0; i < originNumRows; i++) {
-            long returnedPosition = firstReturnedPosition + i;
-            if (!deletionVector.isDeleted(returnedPosition)) {
-                picked.add(i);
+        try {
+            DeletionVector deletionVector = iterator.deletionVector();
+            int originNumRows = this.batch.getNumRows();
+            IntArrayList picked = new IntArrayList(originNumRows);
+            for (int i = 0; i < originNumRows; i++) {
+                innerIterator.next();
+                long returnedPosition = innerIterator.returnedPosition();
+                if (!deletionVector.isDeleted(returnedPosition)) {
+                    picked.add(i);
+                }
             }
-        }
-        if (picked.size() == originNumRows) {
-            this.pickedInColumn = null;
-            this.totalNumRows = originNumRows;
-        } else {
-            this.pickedInColumn = picked.toArray();
-            this.totalNumRows = this.pickedInColumn.length;
+
+            if (picked.size() == originNumRows) {
+                this.pickedInColumn = null;
+                this.totalNumRows = originNumRows;
+            } else {
+                this.pickedInColumn = picked.toArray();
+                this.totalNumRows = this.pickedInColumn.length;
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to apply deletion vector.", e);
         }
 
         this.startIndex = 0;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index c1ed028acd..02bfe5912d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -29,6 +29,8 @@ import org.apache.paimon.utils.VectorMappingUtils;
 
 import javax.annotation.Nullable;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /**
  * A {@link RecordReader.RecordIterator} that returns {@link InternalRow}s. 
The next row is set by
  * {@link ColumnarRow#setRowId}.
@@ -41,8 +43,10 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
     protected final Runnable recycler;
 
     protected int num;
-    protected int nextPos;
-    protected long[] positions;
+    protected int index;
+    protected int returnedPositionIndex;
+    protected long returnedPosition;
+    protected LongIterator positionIterator;
 
     public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable 
Runnable recycler) {
         super(recycler);
@@ -56,19 +60,18 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
     }
 
     public void reset(LongIterator positions) {
+        this.positionIterator = positions;
         this.num = row.batch().getNumRows();
-        this.positions = new long[num];
-        for (int i = 0; i < num; i++) {
-            this.positions[i] = positions.next();
-        }
-        this.nextPos = 0;
+        this.index = 0;
+        this.returnedPositionIndex = 0;
+        this.returnedPosition = -1;
     }
 
     @Nullable
     @Override
     public InternalRow next() {
-        if (nextPos < num) {
-            row.setRowId(nextPos++);
+        if (index < num) {
+            row.setRowId(index++);
             return row;
         } else {
             return null;
@@ -77,10 +80,15 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
 
     @Override
     public long returnedPosition() {
-        if (nextPos == 0) {
-            return positions[0] - 1;
+        for (int i = 0; i < index - returnedPositionIndex; i++) {
+            returnedPosition = positionIterator.next();
         }
-        return positions[nextPos - 1];
+        returnedPositionIndex = index;
+        if (returnedPosition == -1) {
+            throw new IllegalStateException("returnedPosition() is called 
before next()");
+        }
+
+        return returnedPosition;
     }
 
     @Override
@@ -89,9 +97,11 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
     }
 
     protected ColumnarRowIterator copy(ColumnVector[] vectors) {
+        // We should call copy only when the iterator is at the beginning of 
the file.
+        checkArgument(returnedPositionIndex == 0, "copy() should not be called 
after next()");
         ColumnarRowIterator newIterator =
                 new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
-        newIterator.reset(LongIterator.fromArray(positions));
+        newIterator.reset(positionIterator);
         return newIterator;
     }
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
index fa9b1ded84..dfec00a39b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
@@ -20,10 +20,11 @@ package org.apache.paimon.data.columnar;
 
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.VectorizedRecordIterator;
-import org.apache.paimon.utils.LongIterator;
 
 import javax.annotation.Nullable;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** A {@link ColumnarRowIterator} with {@link VectorizedRecordIterator}. */
 public class VectorizedRowIterator extends ColumnarRowIterator implements 
VectorizedRecordIterator {
 
@@ -38,9 +39,10 @@ public class VectorizedRowIterator extends 
ColumnarRowIterator implements Vector
 
     @Override
     protected VectorizedRowIterator copy(ColumnVector[] vectors) {
+        checkArgument(returnedPositionIndex == 0, "copy() should not be called 
after next()");
         VectorizedRowIterator newIterator =
                 new VectorizedRowIterator(filePath, row.copy(vectors), 
recycler);
-        newIterator.reset(LongIterator.fromArray(positions));
+        newIterator.reset(positionIterator);
         return newIterator;
     }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java
new file mode 100644
index 0000000000..8ab926a193
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.columnar.heap.HeapIntVector;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.LongIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Test for {@link ColumnarRowIterator}. */
+public class ColumnarRowIteratorTest {
+
+    @Test
+    public void testRowIterator() {
+        Random random = new Random();
+        HeapIntVector heapIntVector = new HeapIntVector(100);
+        for (int i = 0; i < 100; i++) {
+            heapIntVector.setInt(i, random.nextInt());
+        }
+        long[] positions = new long[100];
+        positions[0] = random.nextInt(10);
+        for (int i = 1; i < 100; i++) {
+            positions[i] = positions[i - 1] + random.nextInt(100);
+        }
+
+        VectorizedColumnBatch vectorizedColumnBatch =
+                new VectorizedColumnBatch(new ColumnVector[] {heapIntVector});
+        vectorizedColumnBatch.setNumRows(100);
+        ColumnarRowIterator rowIterator =
+                new ColumnarRowIterator(
+                        new Path("test"), new 
ColumnarRow(vectorizedColumnBatch), null);
+        rowIterator.reset(LongIterator.fromArray(positions));
+        assertThatCode(rowIterator::returnedPosition)
+                .hasMessage("returnedPosition() is called before next()");
+        rowIterator.next();
+        for (int i = 0; i < random.nextInt(10); i++) {
+            for (int j = 0; j < random.nextInt(9); j++) {
+                rowIterator.next();
+            }
+            
assertThat(rowIterator.returnedPosition()).isEqualTo(positions[rowIterator.index
 - 1]);
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/data/calumnar/heap/RowColumnVectorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/RowColumnVectorTest.java
similarity index 90%
rename from 
paimon-common/src/test/java/org/apache/paimon/data/calumnar/heap/RowColumnVectorTest.java
rename to 
paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/RowColumnVectorTest.java
index bc7c127a63..97b00f590b 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/data/calumnar/heap/RowColumnVectorTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/RowColumnVectorTest.java
@@ -16,13 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.data.calumnar.heap;
+package org.apache.paimon.data.columnar.heap;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.columnar.ColumnVector;
-import org.apache.paimon.data.columnar.heap.CastedRowColumnVector;
-import org.apache.paimon.data.columnar.heap.HeapIntVector;
-import org.apache.paimon.data.columnar.heap.HeapRowVector;
 
 import org.junit.jupiter.api.Test;
 

Reply via email to