yihua commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3035040931


##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLanceRecordReader.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.io.IOException;
+
+import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+
+public class HoodieLanceRecordReader implements RecordReader<NullWritable, 
ArrayWritable> {
+
+  private long count = 0;
+  private final ArrayWritable valueObj;
+  private HoodieFileReader reader;
+  private ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator;
+  private final HoodieSchema schema;
+
+  public HoodieLanceRecordReader(Configuration conf, InputSplit split, JobConf 
job) throws IOException {
+    FileSplit fileSplit = (FileSplit) split;
+    StoragePath path = convertToStoragePath(fileSplit.getPath());
+    StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf);
+    HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
+    reader = HoodieIOFactory.getIOFactory(HoodieStorageUtils.getStorage(path, 
storageConf)).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+        .getFileReader(hoodieConfig, path, HoodieFileFormat.LANCE, 
Option.empty());
+
+    schema = reader.getSchema();
+    valueObj = new ArrayWritable(Writable.class, new 
Writable[schema.getFields().size()]);
+  }
+
+  @Override
+  public boolean next(NullWritable key, ArrayWritable value) throws 
IOException {
+    if (recordIterator == null) {
+      recordIterator = reader.getRecordIterator(schema);
+    }
+
+    if (!recordIterator.hasNext()) {
+      return false;
+    }
+
+    IndexedRecord record = recordIterator.next().getData();
+    ArrayWritable aWritable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, 
schema.toAvroSchema());
+    value.set(aWritable.get());
+    count++;
+    return true;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return null;
+  }
+
+  @Override
+  public ArrayWritable createValue() {
+    return valueObj;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (reader != null) {
+      reader.close();
+      reader = null;
+    }
+    if (recordIterator != null) {
+      recordIterator.close();
+      recordIterator = null;
+    }
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return 1.0f * count / reader.getTotalRecords();

Review Comment:
   🤖 `getProgress()` will NPE if called after `close()` (since `reader` is set 
to null), and will divide by zero if the file has zero records. Could you guard 
against both, e.g. `return (reader == null || reader.getTotalRecords() == 0) ? 
0f : 1.0f * count / reader.getTotalRecords();`?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLanceRecordReader.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.io.IOException;
+
+import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+
+public class HoodieLanceRecordReader implements RecordReader<NullWritable, 
ArrayWritable> {
+
+  private long count = 0;
+  private final ArrayWritable valueObj;
+  private HoodieFileReader reader;
+  private ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator;
+  private final HoodieSchema schema;
+
+  public HoodieLanceRecordReader(Configuration conf, InputSplit split, JobConf 
job) throws IOException {
+    FileSplit fileSplit = (FileSplit) split;
+    StoragePath path = convertToStoragePath(fileSplit.getPath());
+    StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf);
+    HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
+    reader = HoodieIOFactory.getIOFactory(HoodieStorageUtils.getStorage(path, 
storageConf)).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+        .getFileReader(hoodieConfig, path, HoodieFileFormat.LANCE, 
Option.empty());
+
+    schema = reader.getSchema();
+    valueObj = new ArrayWritable(Writable.class, new 
Writable[schema.getFields().size()]);
+  }
+
+  @Override
+  public boolean next(NullWritable key, ArrayWritable value) throws 
IOException {
+    if (recordIterator == null) {
+      recordIterator = reader.getRecordIterator(schema);
+    }
+
+    if (!recordIterator.hasNext()) {
+      return false;
+    }
+
+    IndexedRecord record = recordIterator.next().getData();
+    ArrayWritable aWritable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, 
schema.toAvroSchema());
+    value.set(aWritable.get());
+    count++;
+    return true;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return null;
+  }
+
+  @Override
+  public ArrayWritable createValue() {
+    return valueObj;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (reader != null) {

Review Comment:
   🤖 Could you close the `recordIterator` before the `reader`? The iterator 
likely holds references to resources owned by the reader, so closing the reader 
first may cause the iterator's close to fail or leak resources. (I see the 
HFile reader has the same order, but it looks like a bug there too.)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -76,8 +76,21 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) 
extends SparkColumna
     val filePath = file.filePath.toString
 
     if (requiredSchema.isEmpty && partitionSchema.isEmpty) {
-      // No columns requested - return empty iterator
-      Iterator.empty
+      // No column data needed (e.g. count() on a non-partitioned table). Open 
the Lance file to
+      // get the actual row count and return that many empty rows, so Spark 
computes the correct count.
+      val countAllocator = HoodieArrowAllocator.newChildAllocator(
+        getClass.getSimpleName + "-count-" + file.filePath, 
HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE)
+      try {
+        val lanceReader = LanceFileReader.open(file.filePath.toString, 
countAllocator)
+        try {
+          val rowCount = lanceReader.numRows()
+          Iterator.fill(rowCount.toInt)(InternalRow.empty)
+        } finally {

Review Comment:
   🤖 `numRows().toInt` will silently overflow for files with more than ~2.1 
billion rows. Could you add a check or use a lazy iterator that doesn't need to 
materialize the count as an Int?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -136,18 +140,43 @@ protected BufferedRecord<T> 
handleNonDeletes(BufferedRecord<T> previousRecord, B
         // special case for payloads when there is no previous record
         HoodieSchema recordSchema = 
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
         GenericRecord record = 
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(), 
recordSchema);
-        HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, 
HoodieRecordUtils.loadPayload(payloadClass, record, 
mergedRecord.getOrderingValue()));
-        try {
-          if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
-            return null;
-          } else {
-            HoodieSchema readerSchema = 
readerContext.getSchemaHandler().getRequestedSchema();
-            // If the record schema is different from the reader schema, 
rewrite the record using the payload methods to ensure consistency with legacy 
writer paths
-            hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties, 
readerSchema).toIndexedRecord(readerSchema, properties)
-                .ifPresent(rewrittenRecord -> 
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+        Schema recordAvroSchema = recordSchema.toAvroSchema();
+
+        // If convertToAvroRecord returned a cached record with a different 
schema (e.g., from
+        // extractDataFromRecord caching for ExpressionPayload in the COW 
write path), the record
+        // is already in write-schema format with correctly evaluated 
expressions. Convert directly.
+        // Note: SENTINEL records have null schema and must go through the 
payload path for shouldIgnore.

Review Comment:
   🤖 When the cached Avro record's schema differs, this skips `shouldIgnore` 
and the payload evaluation entirely. Is it safe to assume that records arriving 
through the ExpressionPayload cache path can never be sentinel/ignorable 
records?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java:
##########
@@ -136,18 +140,43 @@ protected BufferedRecord<T> 
handleNonDeletes(BufferedRecord<T> previousRecord, B
         // special case for payloads when there is no previous record
         HoodieSchema recordSchema = 
readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId());
         GenericRecord record = 
readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(), 
recordSchema);
-        HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, 
HoodieRecordUtils.loadPayload(payloadClass, record, 
mergedRecord.getOrderingValue()));
-        try {
-          if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
-            return null;
-          } else {
-            HoodieSchema readerSchema = 
readerContext.getSchemaHandler().getRequestedSchema();
-            // If the record schema is different from the reader schema, 
rewrite the record using the payload methods to ensure consistency with legacy 
writer paths
-            hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties, 
readerSchema).toIndexedRecord(readerSchema, properties)
-                .ifPresent(rewrittenRecord -> 
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData())));
+        Schema recordAvroSchema = recordSchema.toAvroSchema();
+
+        // If convertToAvroRecord returned a cached record with a different 
schema (e.g., from
+        // extractDataFromRecord caching for ExpressionPayload in the COW 
write path), the record
+        // is already in write-schema format with correctly evaluated 
expressions. Convert directly.
+        // Note: SENTINEL records have null schema and must go through the 
payload path for shouldIgnore.
+        if (record.getSchema() != null && 
!record.getSchema().equals(recordAvroSchema)) {
+          
mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(record));
+        } else {
+          HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, 
HoodieRecordUtils.loadPayload(payloadClass, record, 
mergedRecord.getOrderingValue()));
+          try {
+            if (hoodieRecord.shouldIgnore(recordSchema, properties)) {
+              return null;
+            }
+            // Evaluate the payload to get the insert value
+            Option<IndexedRecord> insertValueOpt = 
hoodieRecord.getData().getInsertValue(recordAvroSchema, properties);
+            if (insertValueOpt.isPresent()) {
+              GenericRecord insertRecord = (GenericRecord) 
insertValueOpt.get();
+              HoodieSchema readerSchema = 
readerContext.getSchemaHandler().getRequestedSchema();

Review Comment:
   🤖 When `getInsertValue` returns empty (e.g., `DefaultHoodieRecordPayload` 
for a soft-deleted record where `shouldIgnore` is false), `mergedRecord` is not 
replaced and falls through to `super.handleNonDeletes()` which emits it as an 
insert. The old code would have crashed inside `rewriteRecordWithNewSchema` 
(which calls `.get()` on the empty Option). Is the new silent-passthrough 
behavior intentional here, or should we return `null` when `insertValueOpt` is 
empty to suppress the record?



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java:
##########
@@ -49,7 +49,13 @@ public static <T, I, K, O> HoodieFileWriter getFileWriter(
       String instantTime, StoragePath path, HoodieStorage storage, 
HoodieConfig config, HoodieSchema schema,
       TaskContextSupplier taskContextSupplier, HoodieRecordType recordType) 
throws IOException {
     final String extension = FSUtils.getFileExtension(path.getName());
-    HoodieFileWriterFactory factory = 
HoodieIOFactory.getIOFactory(storage).getWriterFactory(recordType);
+    HoodieRecordType fixedType;
+    try {
+      fixedType = 
HoodieFileFormat.fromFileExtension(extension).requiresSparkRecordType() ? 
HoodieRecordType.SPARK : recordType;

Review Comment:
   🤖 nit: using exception-based control flow for a predictable condition 
(unknown extension) is a bit surprising. Could you check 
`HoodieFileFormat.values()` or add a `fromFileExtensionOrNull` variant to avoid 
the try/catch?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -21,25 +21,73 @@ package org.apache.hudi
 
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.hudi.common.engine.RecordContext
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.exception.HoodieException
 import org.apache.spark.sql.HoodieInternalRowUtils
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.hudi.SparkAdapter
 
+import java.io.IOException
+import java.util.Properties
 import scala.collection.mutable
 
 trait SparkFileFormatInternalRecordContext extends 
BaseSparkInternalRecordContext {
 
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
   private val deserializerMap: mutable.Map[HoodieSchema, 
HoodieAvroDeserializer] = mutable.Map()
   private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] = 
mutable.Map()
+  // Maps InternalRow instances (by identity) to their original Avro records 
when the Avro record's

Review Comment:
   🤖 The `IdentityHashMap<InternalRow, GenericRecord>` cache relies on object 
identity surviving between `extractDataFromRecord` (population) and 
`convertToAvroRecord` (lookup). If anything in between calls `seal()` (which 
does `internalRow.copy()`), `replaceRecord()`, or `project()`, the identity 
link silently breaks and the code falls through to schema-based serialization — 
potentially with the wrong schema. Could you add a comment documenting this 
invariant, or consider a more robust keying strategy (e.g., attaching the 
original Avro record to the BufferedRecord itself)?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestQueryMergeOnReadOptimizedTable.scala:
##########
@@ -19,63 +19,72 @@
 
 package org.apache.spark.sql.hudi.feature
 
+import org.apache.hudi.HoodieSparkUtils
+
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.scalatest.Inspectors.forAll
 
 class TestQueryMergeOnReadOptimizedTable extends HoodieSparkSqlTestBase {
-  test("Test Query Merge_On_Read Read_Optimized table") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
-      // create table
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long,
-           |  partition long
-           |) using hudi
-           | partitioned by (partition)
-           | location '$tablePath'
-           | tblproperties (
-           |  type = 'mor',
-           |  primaryKey = 'id',
-           |  orderingFields = 'ts'
-           | )
-       """.stripMargin)
-      // insert data to table
-      withSQLConf("hoodie.parquet.max.file.size" -> "10000") {
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
-        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 1000)")
-        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 1000)")
-        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000, 1000)")
-        spark.sql(s"update $tableName set price = 11 where id = 1")
-        spark.sql(s"update $tableName set price = 21 where id = 2")
-        spark.sql(s"update $tableName set price = 31 where id = 3")
-        spark.sql(s"update $tableName set price = 41 where id = 4")
 
-        // expect that all complete parquet files can be scanned
-        assertQueryResult(4, tablePath)
+  val baseFileFormats: List[String] = if (HoodieSparkUtils.gteqSpark3_4) 
List("parquet", "lance") else List("parquet")
+

Review Comment:
   🤖 nit: `Inspectors.forAll` is intended for assertions ("verify all elements 
satisfy a predicate"), not for iterating to register tests. A plain 
`baseFileFormats.foreach` would be more idiomatic here and avoids the subtle 
exception-wrapping behavior of `forAll` if test registration ever throws.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestQueryMergeOnReadOptimizedTable.scala:
##########
@@ -19,63 +19,72 @@
 
 package org.apache.spark.sql.hudi.feature
 
+import org.apache.hudi.HoodieSparkUtils
+
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.scalatest.Inspectors.forAll
 
 class TestQueryMergeOnReadOptimizedTable extends HoodieSparkSqlTestBase {
-  test("Test Query Merge_On_Read Read_Optimized table") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
-      // create table
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long,
-           |  partition long
-           |) using hudi
-           | partitioned by (partition)
-           | location '$tablePath'
-           | tblproperties (
-           |  type = 'mor',
-           |  primaryKey = 'id',
-           |  orderingFields = 'ts'
-           | )
-       """.stripMargin)
-      // insert data to table
-      withSQLConf("hoodie.parquet.max.file.size" -> "10000") {
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
-        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 1000)")
-        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 1000)")
-        spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000, 1000)")
-        spark.sql(s"update $tableName set price = 11 where id = 1")
-        spark.sql(s"update $tableName set price = 21 where id = 2")
-        spark.sql(s"update $tableName set price = 31 where id = 3")
-        spark.sql(s"update $tableName set price = 41 where id = 4")
 
-        // expect that all complete parquet files can be scanned
-        assertQueryResult(4, tablePath)
+  val baseFileFormats: List[String] = if (HoodieSparkUtils.gteqSpark3_4) 
List("parquet", "lance") else List("parquet")
+
+  forAll(baseFileFormats) { (baseFileFormat: String) =>
+    test(s"Test Query Merge_On_Read Read_Optimized table - $baseFileFormat") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        // create table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  partition long
+             |) using hudi
+             | partitioned by (partition)
+             | location '$tablePath'
+             | tblproperties (
+             |  type = 'mor',
+             |  primaryKey = 'id',

Review Comment:
   🤖 In `TestSqlStatement`, you explicitly set 
`hoodie.write.record.merge.custom.implementation.classes` for lance format, but 
here it's not set. Is this intentional — relying on the auto-selected merger 
from `EVENT_TIME_ORDERING` mode (via `orderingFields = 'ts'`)? Might be worth 
adding a comment noting why it's not needed here, since the other test does set 
it.



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java:
##########
@@ -157,6 +175,7 @@ public static String 
getOutputFormatClassName(HoodieFileFormat baseFileFormat) {
     switch (baseFileFormat) {
       case PARQUET:
       case HFILE:
+      case LANCE:

Review Comment:
   🤖 Is it intentional that Lance uses `MapredParquetOutputFormat` and 
`ParquetHiveSerDe`? If Hive ever tries to write through this path, it would 
produce Parquet files instead of Lance files. Might be worth a comment 
explaining this is read-only / placeholder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to