hudi-agent commented on code in PR #18403:
URL: https://github.com/apache/hudi/pull/18403#discussion_r3183992489


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -262,4 +257,307 @@ class SparkLanceReaderBase(enableVectorizedReader: 
Boolean) extends SparkColumna
       valueContainsNull = true)
     case other => other
   }
+
+  /**
+   * Columnar batch reading path. Returns Iterator[ColumnarBatch] type-erased 
as Iterator[InternalRow].
+   * Used when enableVectorizedReader=true and no type casting is needed.
+   */
+  private def readBatch(file: PartitionedFile,
+                        allocator: BufferAllocator,
+                        lanceReader: LanceFileReader,
+                        arrowReader: ArrowReader,
+                        filePath: String,
+                        requestSchema: StructType,
+                        requiredSchema: StructType,
+                        partitionSchema: StructType): Iterator[InternalRow] = {
+
+    val batchIterator = new LanceBatchIterator(allocator, lanceReader, 
arrowReader, filePath)
+
+    // Build column mapping: for each column in requiredSchema, find its index 
in requestSchema (file columns)
+    // Returns -1 if the column is missing from the file (schema evolution: 
column addition)
+    val columnMapping: Array[Int] = requiredSchema.fields.map { field =>
+      requestSchema.fieldNames.indexOf(field.name)
+    }
+
+    // Create Arrow-backed null vectors for columns missing from the file.
+    // Uses LanceArrowColumnVector so that Spark's vectorTypes() contract is 
satisfied
+    // (FileSourceScanExec expects all data columns to be 
LanceArrowColumnVector).
+    val nullAllocator: Option[BufferAllocator] = if 
(columnMapping.contains(-1)) {
+      Some(HoodieArrowAllocator.newChildAllocator(
+        getClass.getSimpleName + "-null-" + filePath,
+        
HoodieStorageConfig.LANCE_READ_ALLOCATOR_SIZE_BYTES.defaultValue().toLong))
+    } else None
+
+    // Arrow vectors auto-reallocate on setValueCount (see 
BaseFixedWidthVector.setValueCount),
+    // so it is safe to call setValueCount with a count larger than 
DEFAULT_BATCH_SIZE.
+    val nullColumnVectors: Array[NullColumnEntry] =
+      nullAllocator.map { alloc =>
+        columnMapping.zipWithIndex.filter(_._1 < 0).map { case (_, idx) =>
+          val field = LanceArrowUtils.toArrowField(
+            requiredSchema(idx).name, requiredSchema(idx).dataType, 
requiredSchema(idx).nullable, "UTC")
+          val arrowVector = field.createVector(alloc)
+          arrowVector.allocateNew()
+          arrowVector.setValueCount(DEFAULT_BATCH_SIZE)
+          NullColumnEntry(idx, new LanceArrowColumnVector(arrowVector), 
arrowVector)
+        }
+      }.getOrElse(Array.empty)
+
+    // Direct-indexed lookup so the per-batch hot loop is O(1) instead of 
scanning nullColumnVectors.
+    val nullColumnByIndex: Array[NullColumnEntry] = {

Review Comment:
   🤖 nit: `nullColumnVectors` is built and then immediately re-bucketed into 
`nullColumnByIndex` — could you build the sparse `nullColumnByIndex` array 
directly in the `nullAllocator.map { ... }` block, and iterate it (filtering 
nulls) in `close()`? Would drop the duplicate array and the 
`NullColumnEntry.colIndex` field becomes unnecessary too.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -137,69 +149,52 @@ class SparkLanceReaderBase(enableVectorizedReader: 
Boolean) extends SparkColumna
         // the option regardless.
         val blobMode = resolveBlobReadMode(storageConf)
         val readOpts = FileReadOptions.builder().blobReadMode(blobMode).build()
-        val arrowReader = lanceReader.readAll(columnNames, null, 
DEFAULT_BATCH_SIZE, readOpts)
-
-        // Compose the DESCRIPTOR-aware blob transform only when the user 
opted into that mode
-        // AND the request actually has BLOB columns (otherwise the rewrite 
has nothing to do).
-        val blobFieldNames: java.util.Set[String] =
-          iteratorSchema.fields.collect { case f if isBlobField(f) => f.name 
}.toSet.asJava
-        val blobTransform = if (blobMode == BlobReadMode.DESCRIPTOR && 
!blobFieldNames.isEmpty) {
-          new BlobDescriptorTransform(blobFieldNames, filePath)
+        arrowReader = lanceReader.readAll(columnNames, null, 
DEFAULT_BATCH_SIZE, readOpts)
+
+        // Decide between batch mode and row mode.
+        // Fall back to row mode if:
+        //   - type casting is needed (batch-level type casting deferred to 
follow-up), OR
+        //   - the partition schema contains a type the batch-mode 
partition-vector populator
+        //     does not handle (Struct/Array/Map/Char/Varchar/interval, etc.). 
The row path
+        //     preserves these via JoinedRow, so falling back avoids silently 
nulling them out.
+        val hasTypeChanges = !implicitTypeChangeInfo.isEmpty
+        val partitionTypesBatchSupported =
+          partitionSchema.forall(f => 
isPartitionTypeSupportedForBatch(f.dataType))
+        if (enableVectorizedReader && !hasTypeChanges && 
partitionTypesBatchSupported) {

Review Comment:
   🤖 @yihua could you double-check this fallback's interaction with Spark's 
columnar codegen? After the PR, `supportBatch` returns `true` for COW Lance, so 
Spark wraps the scan to expect `ColumnarBatch` (`ColumnarToRowExec` calls 
`batch.rowIterator()` on each element). When `hasTypeChanges=true` (e.g., an 
old base file with FLOAT vs. a query with DOUBLE after schema evolution), this 
branch returns `Iterator[InternalRow]`. In a `FileScanRDD` partition that 
contains both a fallback file and a batch-mode file, the consumer would hit a 
`ClassCastException` on the row elements — the existing unit test 
(`testTypeChangeFallsBackToRowPath`) exercises `SparkLanceReaderBase.read` 
directly and wouldn't catch this. Same concern applies if 
`partitionTypesBatchSupported=false` for the query.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to