This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.0
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.0 by this push:
new 73c3097c84 [core] Refactory ColumnarRowIterator using LongIterator.
(#4992)
73c3097c84 is described below
commit 73c3097c84044c34a253434b2c55f4d88a5f495c
Author: YeJunHao <[email protected]>
AuthorDate: Fri Jan 24 15:47:41 2025 +0800
[core] Refactory ColumnarRowIterator using LongIterator. (#4992)
---
.../converter/ArrowVectorizedBatchConverter.java | 37 +++++++-----
.../paimon/data/columnar/ColumnarRowIterator.java | 36 +++++++-----
.../data/columnar/VectorizedRowIterator.java | 6 +-
.../data/columnar/ColumnarRowIteratorTest.java | 65 ++++++++++++++++++++++
.../heap/RowColumnVectorTest.java | 5 +-
5 files changed, 115 insertions(+), 34 deletions(-)
diff --git
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java
index 8e12d56dcd..cc7c6dc301 100644
---
a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java
+++
b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java
@@ -32,6 +32,8 @@ import org.apache.arrow.vector.VectorSchemaRoot;
import javax.annotation.Nullable;
+import java.io.IOException;
+
/** To convert {@link VectorizedColumnBatch} to Arrow format. */
public class ArrowVectorizedBatchConverter extends ArrowBatchConverter {
@@ -65,22 +67,27 @@ public class ArrowVectorizedBatchConverter extends
ArrowBatchConverter {
FileRecordIterator<InternalRow> innerIterator = iterator.iterator();
this.batch = ((VectorizedRecordIterator) innerIterator).batch();
- long firstReturnedPosition = innerIterator.returnedPosition() + 1;
- DeletionVector deletionVector = iterator.deletionVector();
- int originNumRows = this.batch.getNumRows();
- IntArrayList picked = new IntArrayList(originNumRows);
- for (int i = 0; i < originNumRows; i++) {
- long returnedPosition = firstReturnedPosition + i;
- if (!deletionVector.isDeleted(returnedPosition)) {
- picked.add(i);
+ try {
+ DeletionVector deletionVector = iterator.deletionVector();
+ int originNumRows = this.batch.getNumRows();
+ IntArrayList picked = new IntArrayList(originNumRows);
+ for (int i = 0; i < originNumRows; i++) {
+ innerIterator.next();
+ long returnedPosition = innerIterator.returnedPosition();
+ if (!deletionVector.isDeleted(returnedPosition)) {
+ picked.add(i);
+ }
}
- }
- if (picked.size() == originNumRows) {
- this.pickedInColumn = null;
- this.totalNumRows = originNumRows;
- } else {
- this.pickedInColumn = picked.toArray();
- this.totalNumRows = this.pickedInColumn.length;
+
+ if (picked.size() == originNumRows) {
+ this.pickedInColumn = null;
+ this.totalNumRows = originNumRows;
+ } else {
+ this.pickedInColumn = picked.toArray();
+ this.totalNumRows = this.pickedInColumn.length;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to apply deletion vector.", e);
}
this.startIndex = 0;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index c1ed028acd..02bfe5912d 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -29,6 +29,8 @@ import org.apache.paimon.utils.VectorMappingUtils;
import javax.annotation.Nullable;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/**
* A {@link RecordReader.RecordIterator} that returns {@link InternalRow}s.
The next row is set by
* {@link ColumnarRow#setRowId}.
@@ -41,8 +43,10 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
protected final Runnable recycler;
protected int num;
- protected int nextPos;
- protected long[] positions;
+ protected int index;
+ protected int returnedPositionIndex;
+ protected long returnedPosition;
+ protected LongIterator positionIterator;
public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable
Runnable recycler) {
super(recycler);
@@ -56,19 +60,18 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
}
public void reset(LongIterator positions) {
+ this.positionIterator = positions;
this.num = row.batch().getNumRows();
- this.positions = new long[num];
- for (int i = 0; i < num; i++) {
- this.positions[i] = positions.next();
- }
- this.nextPos = 0;
+ this.index = 0;
+ this.returnedPositionIndex = 0;
+ this.returnedPosition = -1;
}
@Nullable
@Override
public InternalRow next() {
- if (nextPos < num) {
- row.setRowId(nextPos++);
+ if (index < num) {
+ row.setRowId(index++);
return row;
} else {
return null;
@@ -77,10 +80,15 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
@Override
public long returnedPosition() {
- if (nextPos == 0) {
- return positions[0] - 1;
+ for (int i = 0; i < index - returnedPositionIndex; i++) {
+ returnedPosition = positionIterator.next();
}
- return positions[nextPos - 1];
+ returnedPositionIndex = index;
+ if (returnedPosition == -1) {
+ throw new IllegalStateException("returnedPosition() is called
before next()");
+ }
+
+ return returnedPosition;
}
@Override
@@ -89,9 +97,11 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
}
protected ColumnarRowIterator copy(ColumnVector[] vectors) {
+ // We should call copy only when the iterator is at the beginning of
the file.
+ checkArgument(returnedPositionIndex == 0, "copy() should not be called
after next()");
ColumnarRowIterator newIterator =
new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
- newIterator.reset(LongIterator.fromArray(positions));
+ newIterator.reset(positionIterator);
return newIterator;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
index fa9b1ded84..dfec00a39b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
@@ -20,10 +20,11 @@ package org.apache.paimon.data.columnar;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.VectorizedRecordIterator;
-import org.apache.paimon.utils.LongIterator;
import javax.annotation.Nullable;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** A {@link ColumnarRowIterator} with {@link VectorizedRecordIterator}. */
public class VectorizedRowIterator extends ColumnarRowIterator implements
VectorizedRecordIterator {
@@ -38,9 +39,10 @@ public class VectorizedRowIterator extends
ColumnarRowIterator implements Vector
@Override
protected VectorizedRowIterator copy(ColumnVector[] vectors) {
+ checkArgument(returnedPositionIndex == 0, "copy() should not be called
after next()");
VectorizedRowIterator newIterator =
new VectorizedRowIterator(filePath, row.copy(vectors),
recycler);
- newIterator.reset(LongIterator.fromArray(positions));
+ newIterator.reset(positionIterator);
return newIterator;
}
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java
new file mode 100644
index 0000000000..8ab926a193
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.data.columnar.heap.HeapIntVector;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.utils.LongIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** Test for {@link ColumnarRowIterator}. */
+public class ColumnarRowIteratorTest {
+
+ @Test
+ public void testRowIterator() {
+ Random random = new Random();
+ HeapIntVector heapIntVector = new HeapIntVector(100);
+ for (int i = 0; i < 100; i++) {
+ heapIntVector.setInt(i, random.nextInt());
+ }
+ long[] positions = new long[100];
+ positions[0] = random.nextInt(10);
+ for (int i = 1; i < 100; i++) {
+ positions[i] = positions[i - 1] + random.nextInt(100);
+ }
+
+ VectorizedColumnBatch vectorizedColumnBatch =
+ new VectorizedColumnBatch(new ColumnVector[] {heapIntVector});
+ vectorizedColumnBatch.setNumRows(100);
+ ColumnarRowIterator rowIterator =
+ new ColumnarRowIterator(
+ new Path("test"), new
ColumnarRow(vectorizedColumnBatch), null);
+ rowIterator.reset(LongIterator.fromArray(positions));
+ assertThatCode(rowIterator::returnedPosition)
+ .hasMessage("returnedPosition() is called before next()");
+ rowIterator.next();
+ for (int i = 0; i < random.nextInt(10); i++) {
+ for (int j = 0; j < random.nextInt(9); j++) {
+ rowIterator.next();
+ }
+
assertThat(rowIterator.returnedPosition()).isEqualTo(positions[rowIterator.index
- 1]);
+ }
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/data/calumnar/heap/RowColumnVectorTest.java
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/RowColumnVectorTest.java
similarity index 90%
rename from
paimon-common/src/test/java/org/apache/paimon/data/calumnar/heap/RowColumnVectorTest.java
rename to
paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/RowColumnVectorTest.java
index bc7c127a63..97b00f590b 100644
---
a/paimon-common/src/test/java/org/apache/paimon/data/calumnar/heap/RowColumnVectorTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/RowColumnVectorTest.java
@@ -16,13 +16,10 @@
* limitations under the License.
*/
-package org.apache.paimon.data.calumnar.heap;
+package org.apache.paimon.data.columnar.heap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.ColumnVector;
-import org.apache.paimon.data.columnar.heap.CastedRowColumnVector;
-import org.apache.paimon.data.columnar.heap.HeapIntVector;
-import org.apache.paimon.data.columnar.heap.HeapRowVector;
import org.junit.jupiter.api.Test;