hudi-agent commented on code in PR #18403:
URL: https://github.com/apache/hudi/pull/18403#discussion_r3180280150


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.lance.file.LanceFileReader;
+import org.lance.spark.vectorized.LanceArrowColumnVector;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Iterator that returns {@link ColumnarBatch} directly from Lance files 
without
+ * decomposing to individual rows. Used for vectorized/columnar batch reading
+ * in Spark's COW base-file-only read path.
+ *
+ * <p>Unlike {@link LanceRecordIterator} which extracts rows one by one,
+ * this iterator preserves the columnar format for zero-copy batch processing.
+ *
+ * <p>Manages the lifecycle of:
+ * <ul>
+ *   <li>BufferAllocator - Arrow memory management</li>
+ *   <li>LanceFileReader - Lance file handle</li>
+ *   <li>ArrowReader - Arrow batch reader</li>
+ * </ul>
+ */
+public class LanceBatchIterator implements Iterator<ColumnarBatch>, Closeable {
+  private final BufferAllocator allocator;
+  private final LanceFileReader lanceReader;
+  private final ArrowReader arrowReader;
+  private final String path;
+
+  private ColumnVector[] columnVectors;
+  private ColumnarBatch currentBatch;
+  private boolean nextBatchLoaded = false;
+  private boolean finished = false;
+  private boolean closed = false;
+
+  /**
+   * Creates a new Lance batch iterator.
+   *
+   * @param allocator   Arrow buffer allocator for memory management
+   * @param lanceReader Lance file reader
+   * @param arrowReader Arrow reader for batch reading
+   * @param path        File path (for error messages)
+   */
+  public LanceBatchIterator(BufferAllocator allocator,
+                            LanceFileReader lanceReader,
+                            ArrowReader arrowReader,
+                            String path) {
+    this.allocator = Objects.requireNonNull(allocator, "allocator must not be 
null");
+    this.lanceReader = Objects.requireNonNull(lanceReader, "lanceReader must 
not be null");
+    this.arrowReader = Objects.requireNonNull(arrowReader, "arrowReader must 
not be null");
+    this.path = path;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (finished) {
+      return false;
+    }
+    if (nextBatchLoaded) {
+      return true;
+    }
+
+    try {
+      if (arrowReader.loadNextBatch()) {
+        VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+        // Create column vector wrappers once and reuse across batches
+        // (ArrowReader reuses the same VectorSchemaRoot)
+        if (columnVectors == null) {
+          columnVectors = root.getFieldVectors().stream()
+              .map(LanceArrowColumnVector::new)
+              .toArray(ColumnVector[]::new);
+        }
+
+        currentBatch = new ColumnarBatch(columnVectors, root.getRowCount());
+        nextBatchLoaded = true;
+        return true;
+      }
+    } catch (IOException e) {
+      throw new HoodieException("Failed to read next batch from Lance file: " 
+ path, e);
+    }
+
+    finished = true;
+    return false;
+  }
+
+  @Override
+  public ColumnarBatch next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException("No more batches available");
+    }
+    nextBatchLoaded = false;
+    return currentBatch;
+  }
+
+  @Override
+  public void close() {
+    if (closed) {
+      return;
+    }
+    closed = true;
+
+    IOException arrowException = null;
+    Exception lanceException = null;
+    Exception allocatorException = null;
+
+    // Don't close currentBatch here: ColumnarBatch.close() would close the
+    // underlying Arrow FieldVectors through LanceArrowColumnVector, but they
+    // are owned by the ArrowReader (via VectorSchemaRoot) and will be closed
+    // when arrowReader.close() is called below.
+    currentBatch = null;
+
+    try {
+      arrowReader.close();
+    } catch (IOException e) {

Review Comment:
   🤖 Catch here is `IOException` while the `lanceReader` and `allocator` blocks 
below catch the broader `Exception`. If `arrowReader.close()` throws a 
`RuntimeException` (e.g., from Arrow allocator state — unclosed children, 
etc.), it propagates out of `close()` and we skip both `lanceReader.close()` 
and `allocator.close()`, leaking them. Was the narrower catch intentional, or 
worth widening to `Exception` for symmetry?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########


Review Comment:
   🤖 **Line 556:** For partition types not in the match (complex types like 
Struct/Array/Map, plus atomic types like CharType/VarcharType/interval types), 
this silently substitutes null. The row path preserves the value via 
`JoinedRow`, so this is a silent regression in batch mode for those types. 
Worth either throwing on unsupported types or filtering them in `supportBatch` 
(cf. Parquet's `schema.forall(_.dataType.isInstanceOf[AtomicType])` gate)?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.lance.file.LanceFileReader;
+import org.lance.spark.vectorized.LanceArrowColumnVector;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Iterator that returns {@link ColumnarBatch} directly from Lance files 
without
+ * decomposing to individual rows. Used for vectorized/columnar batch reading
+ * in Spark's COW base-file-only read path.
+ *
+ * <p>Unlike {@link LanceRecordIterator} which extracts rows one by one,
+ * this iterator preserves the columnar format for zero-copy batch processing.
+ *
+ * <p>Manages the lifecycle of:
+ * <ul>
+ *   <li>BufferAllocator - Arrow memory management</li>
+ *   <li>LanceFileReader - Lance file handle</li>
+ *   <li>ArrowReader - Arrow batch reader</li>
+ * </ul>
+ */
+public class LanceBatchIterator implements Iterator<ColumnarBatch>, Closeable {
+  private final BufferAllocator allocator;
+  private final LanceFileReader lanceReader;
+  private final ArrowReader arrowReader;
+  private final String path;
+
+  private ColumnVector[] columnVectors;
+  private ColumnarBatch currentBatch;
+  private boolean nextBatchLoaded = false;

Review Comment:
   🤖 nit: could you rename `nextBatchLoaded` to `hasCachedBatch`? The current 
name describes the mechanism (a batch was loaded from the reader), but what 
readers of `hasNext()` really need to know is whether a batch is already 
available to return — `if (hasCachedBatch) return true` reads much more 
naturally than `if (nextBatchLoaded) return true`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -23,37 +23,47 @@ import org.apache.hudi.SparkAdapterSupport.sparkAdapter
 import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
 import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
 import org.apache.hudi.common.util
-import org.apache.hudi.common.util.collection.ClosableIterator
+import org.apache.hudi.common.util.collection.{ClosableIterator, Pair => 
HoodiePair}
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.io.memory.HoodieArrowAllocator
-import org.apache.hudi.io.storage.{BlobDescriptorTransform, 
LanceRecordIterator, VectorConversionUtils}
+import org.apache.hudi.io.storage.{BlobDescriptorTransform, 
LanceBatchIterator, LanceRecordIterator, VectorConversionUtils}
 import org.apache.hudi.storage.StorageConfiguration
 
+import org.apache.arrow.memory.BufferAllocator
+import org.apache.arrow.vector.FieldVector
+import org.apache.arrow.vector.ipc.ArrowReader
 import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.schema.MessageType
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
JoinedRow, UnsafeProjection, UnsafeRow}
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources.{PartitionedFile, 
SparkColumnarFileReader, SparkSchemaTransformUtils}
+import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, 
WritableColumnVector}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, 
StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.LanceArrowUtils
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 import org.lance.file.{BlobReadMode, FileReadOptions, LanceFileReader}
+import org.lance.spark.vectorized.LanceArrowColumnVector
 
-import java.io.IOException
+import java.io.{Closeable, IOException}
 
 import scala.collection.JavaConverters._
 
 /**
  * Reader for Lance files in Spark datasource.
- * Implements vectorized reading using LanceArrowColumnVector.
+ * Supports both row-based and columnar batch reading modes.
  *
- * @param enableVectorizedReader whether to use vectorized reading (currently 
always true for Lance)
+ * @param enableVectorizedReader when true, returns ColumnarBatch for 
vectorized processing;
+ *                               when false, returns InternalRow one by one
  */
 class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends 
SparkColumnarFileReader {
 
+  /** Holds a pre-created all-null Arrow vector for a column missing from the 
file (schema evolution). */
+  private case class NullColumnEntry(colIndex: Int, columnVector: 
LanceArrowColumnVector, arrowVector: FieldVector)
+
   // Batch size for reading Lance files (number of rows per batch)

Review Comment:
   🤖 nit: the `columnVector` field name is a bit generic alongside 
`arrowVector` — both are "vectors". Something like `sparkVector` (or 
`lanceColumnVector`) would make the Spark-facing wrapper vs. the raw Arrow 
backing vector distinction explicit without having to look at the types.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to