hudi-agent commented on code in PR #18403:
URL: https://github.com/apache/hudi/pull/18403#discussion_r3189906874


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -137,69 +149,62 @@ class SparkLanceReaderBase(enableVectorizedReader: 
Boolean) extends SparkColumna
         // the option regardless.
         val blobMode = resolveBlobReadMode(storageConf)
         val readOpts = FileReadOptions.builder().blobReadMode(blobMode).build()
-        val arrowReader = lanceReader.readAll(columnNames, null, 
DEFAULT_BATCH_SIZE, readOpts)
+        arrowReader = lanceReader.readAll(columnNames, null, 
DEFAULT_BATCH_SIZE, readOpts)
 
-        // Compose the DESCRIPTOR-aware blob transform only when the user 
opted into that mode
-        // AND the request actually has BLOB columns (otherwise the rewrite 
has nothing to do).
+        // BLOB columns in the request โ€” needed both for the DESCRIPTOR-mode 
row-path transform
+        // and for the path-selection gate below.
         val blobFieldNames: java.util.Set[String] =
           iteratorSchema.fields.collect { case f if isBlobField(f) => f.name 
}.toSet.asJava
-        val blobTransform = if (blobMode == BlobReadMode.DESCRIPTOR && 
!blobFieldNames.isEmpty) {
-          new BlobDescriptorTransform(blobFieldNames, filePath)
+
+        // Decide between batch mode and row mode.
+        // Fall back to row mode if:
+        //   - type casting is needed (batch-level type casting deferred to 
follow-up), OR
+        //   - the partition schema contains a type the batch-mode 
partition-vector populator
+        //     does not handle (Struct/Array/Map/Char/Varchar/interval, etc.). 
The row path
+        //     preserves these via JoinedRow, so falling back avoids silently 
nulling them out, OR
+        //   - DESCRIPTOR blob mode is requested with BLOB columns present. 
The descriptor
+        //     rewrite (dataโ†’null + synthesized/passthrough reference) lives 
in BlobDescriptorTransform
+        //     which only the row path applies; the batch path would surface 
Lance's raw
+        //     {position, size} struct in `data`, breaking the DESCRIPTOR-mode 
contract.
+        val hasTypeChanges = !implicitTypeChangeInfo.isEmpty
+        val partitionTypesBatchSupported =
+          partitionSchema.forall(f => 
isPartitionTypeSupportedForBatch(f.dataType))
+        val descriptorBlobReadNeedsRowPath =
+          blobMode == BlobReadMode.DESCRIPTOR && !blobFieldNames.isEmpty
+        if (enableVectorizedReader && !hasTypeChanges && 
partitionTypesBatchSupported

Review Comment:
   ๐Ÿค– Could this fall back to row mode after Spark already committed to columnar 
output? `supportBatch` only checks `!internalSchemaOpt.isPresent`, but 
`hasTypeChanges` can still be true at runtime when a file's schema differs in 
type from the required schema (e.g., older base files written before a widening 
reconciliation, or `spark.read.schema(wider)` on a table with narrower file 
types). In that case the planner expects `ColumnarBatch`, but `readRows` 
returns `InternalRow` โ†’ ClassCastException downstream. Worth confirming whether 
this can be reached via Hudi-managed Lance writes, and if so, gating batch mode 
more conservatively at plan time (or adding type-cast support to `readBatch`).
   
   <sub><i>- AI-generated; verify before applying. React ๐Ÿ‘/๐Ÿ‘Ž to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to