This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 317a5345 fix: Use the number of rows from underlying arrays instead of
logical row count from RecordBatch (#972)
317a5345 is described below
commit 317a5345eb1bbe8483a0dcc61a4c01b5ad7ece71
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Sep 27 00:21:10 2024 -0700
fix: Use the number of rows from underlying arrays instead of logical row
count from RecordBatch (#972)
---
.../main/scala/org/apache/comet/vector/NativeUtil.scala | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
index 33af8662..72472a54 100644
--- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
+++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
@@ -92,11 +92,15 @@ class NativeUtil {
arrayAddrs: Array[Long],
schemaAddrs: Array[Long],
batch: ColumnarBatch): Int = {
+ val numRows = mutable.ArrayBuffer.empty[Int]
+
(0 until batch.numCols()).foreach { index =>
batch.column(index) match {
case a: CometVector =>
val valueVector = a.getValueVector
+ numRows += valueVector.getValueCount
+
val provider = if (valueVector.getField.getDictionary != null) {
a.getDictionaryProvider
} else {
@@ -120,7 +124,16 @@ class NativeUtil {
}
}
- batch.numRows()
+ if (numRows.distinct.length > 1) {
+ throw new SparkException(
+ s"Number of rows in each column should be the same, but got
[${numRows.distinct}]")
+ }
+
+ // `ColumnarBatch.numRows` might return a different number than the actual
number of rows in
+ // the Arrow arrays. For example, Iceberg column reader will skip deleted
rows internally in
+ // its `CometVector` implementation. The `ColumnarBatch` returned by the
reader will report
+ // logical number of rows which is less than actual number of rows due to
row deletion.
+ numRows.headOption.getOrElse(batch.numRows())
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]