This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 317a5345 fix: Use the number of rows from underlying arrays instead of 
logical row count from RecordBatch (#972)
317a5345 is described below

commit 317a5345eb1bbe8483a0dcc61a4c01b5ad7ece71
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Sep 27 00:21:10 2024 -0700

    fix: Use the number of rows from underlying arrays instead of logical row 
count from RecordBatch (#972)
---
 .../main/scala/org/apache/comet/vector/NativeUtil.scala   | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala 
b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
index 33af8662..72472a54 100644
--- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
+++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
@@ -92,11 +92,15 @@ class NativeUtil {
       arrayAddrs: Array[Long],
       schemaAddrs: Array[Long],
       batch: ColumnarBatch): Int = {
+    val numRows = mutable.ArrayBuffer.empty[Int]
+
     (0 until batch.numCols()).foreach { index =>
       batch.column(index) match {
         case a: CometVector =>
           val valueVector = a.getValueVector
 
+          numRows += valueVector.getValueCount
+
           val provider = if (valueVector.getField.getDictionary != null) {
             a.getDictionaryProvider
           } else {
@@ -120,7 +124,16 @@ class NativeUtil {
       }
     }
 
-    batch.numRows()
+    if (numRows.distinct.length > 1) {
+      throw new SparkException(
+        s"Number of rows in each column should be the same, but got 
[${numRows.distinct}]")
+    }
+
+    // `ColumnarBatch.numRows` might return a different number than the actual 
number of rows in
+    // the Arrow arrays. For example, Iceberg column reader will skip deleted 
rows internally in
+    // its `CometVector` implementation. The `ColumnarBatch` returned by the 
reader will report
+    // logical number of rows which is less than actual number of rows due to 
row deletion.
+    numRows.headOption.getOrElse(batch.numRows())
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to