wombatu-kun commented on code in PR #18403:
URL: https://github.com/apache/hudi/pull/18403#discussion_r3187023901


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -262,4 +257,307 @@ class SparkLanceReaderBase(enableVectorizedReader: 
Boolean) extends SparkColumna
       valueContainsNull = true)
     case other => other
   }
+
+  /**
+   * Columnar batch reading path. Returns Iterator[ColumnarBatch] type-erased 
as Iterator[InternalRow].
+   * Used when enableVectorizedReader=true and no type casting is needed.
+   */
+  private def readBatch(file: PartitionedFile,
+                        allocator: BufferAllocator,
+                        lanceReader: LanceFileReader,
+                        arrowReader: ArrowReader,
+                        filePath: String,
+                        requestSchema: StructType,
+                        requiredSchema: StructType,
+                        partitionSchema: StructType): Iterator[InternalRow] = {
+
+    val batchIterator = new LanceBatchIterator(allocator, lanceReader, 
arrowReader, filePath)
+
+    // Build column mapping: for each column in requiredSchema, find its index 
in requestSchema (file columns)
+    // Returns -1 if the column is missing from the file (schema evolution: 
column addition)
+    val columnMapping: Array[Int] = requiredSchema.fields.map { field =>
+      requestSchema.fieldNames.indexOf(field.name)
+    }
+
+    // Create Arrow-backed null vectors for columns missing from the file.
+    // Uses LanceArrowColumnVector so that Spark's vectorTypes() contract is 
satisfied
+    // (FileSourceScanExec expects all data columns to be 
LanceArrowColumnVector).
+    val nullAllocator: Option[BufferAllocator] = if 
(columnMapping.contains(-1)) {
+      Some(HoodieArrowAllocator.newChildAllocator(
+        getClass.getSimpleName + "-null-" + filePath,
+        
HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES.defaultValue().toLong))
+    } else None
+
+    // Arrow vectors auto-reallocate on setValueCount (see 
BaseFixedWidthVector.setValueCount),
+    // so it is safe to call setValueCount with a count larger than 
DEFAULT_BATCH_SIZE.
+    val nullColumnVectors: Array[NullColumnEntry] =
+      nullAllocator.map { alloc =>
+        columnMapping.zipWithIndex.filter(_._1 < 0).map { case (_, idx) =>
+          val field = LanceArrowUtils.toArrowField(
+            requiredSchema(idx).name, requiredSchema(idx).dataType, 
requiredSchema(idx).nullable, "UTC")
+          val arrowVector = field.createVector(alloc)
+          arrowVector.allocateNew()
+          arrowVector.setValueCount(DEFAULT_BATCH_SIZE)
+          NullColumnEntry(idx, new LanceArrowColumnVector(arrowVector), 
arrowVector)
+        }
+      }.getOrElse(Array.empty)
+
+    // Direct-indexed lookup so the per-batch hot loop is O(1) instead of 
scanning nullColumnVectors.
+    val nullColumnByIndex: Array[NullColumnEntry] = {

Review Comment:
   Done in fe8f7e1c2a56. `nullColumnByIndex` is now built directly inside the 
`nullAllocator.map { ... }` block as a sparse `Array[NullColumnEntry]` of size 
`requiredSchema.length`, populated only at indexes where the column is missing 
from the file. `nullColumnVectors` is gone; `NullColumnEntry.colIndex` is gone 
too. `close()` walks the sparse array and skips null slots; the per-batch hot 
loop is unchanged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to