yihua commented on code in PR #18375:
URL: https://github.com/apache/hudi/pull/18375#discussion_r3036286607


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSqlStatement.scala:
##########
@@ -37,15 +39,23 @@ class TestSqlStatement extends HoodieSparkSqlTestBase {
   val STATE_FINISH_ALL = 12
 
   test("Test Sql Statements") {
-    Seq("cow", "mor").foreach { tableType =>
-      withTempDir { tmp =>
-        val params = Map(
-          "tableType" -> tableType,
-          "tmpDir" -> {
-            tmp.getCanonicalPath.replace('\\', '/')
-          }
-        )
-        execSqlFile("/sql-statements.sql", params)
+    val baseFileFormats = if (HoodieSparkUtils.gteqSpark3_4) Seq("parquet", 
"lance") else Seq("parquet")
+    baseFileFormats.foreach { baseFileFormat =>
+      Seq("cow", "mor").foreach { tableType =>
+        withTempDir { tmp =>
+          val params = Map(
+            "tableType" -> tableType,
+            "baseFileFormat" -> baseFileFormat,
+            "recordMergerImpl" -> (
+              if (baseFileFormat.equals("parquet")) "" else {
+                s"hoodie.write.record.merge.custom.implementation.classes = 
'${classOf[DefaultSparkRecordMerger].getName}',"

Review Comment:
   🤖 nit: same as above — `baseFileFormat == "parquet"` is idiomatic Scala; 
`.equals("parquet")` looks like Java style slipping in here.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -216,11 +241,19 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase 
{
       tableType
     }
 
+    val recordMergerImpl = if (baseFileFormat.equals("lance")) {

Review Comment:
   🤖 nit: in Scala, `==` is the idiomatic way to compare strings — 
`baseFileFormat == "lance"` reads more naturally than `.equals("lance")`.



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLanceRecordReader.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.io.IOException;
+
+import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+
+public class HoodieLanceRecordReader implements RecordReader<NullWritable, 
ArrayWritable> {
+
+  private long count = 0;
+  private final ArrayWritable valueObj;
+  private HoodieFileReader reader;
+  private ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator;
+  private final HoodieSchema schema;
+
+  public HoodieLanceRecordReader(Configuration conf, InputSplit split, JobConf 
job) throws IOException {
+    FileSplit fileSplit = (FileSplit) split;
+    StoragePath path = convertToStoragePath(fileSplit.getPath());
+    StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf);
+    HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
+    reader = HoodieIOFactory.getIOFactory(HoodieStorageUtils.getStorage(path, 
storageConf)).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+        .getFileReader(hoodieConfig, path, HoodieFileFormat.LANCE, 
Option.empty());

Review Comment:
   🤖 nit: this chain is pretty long for one line — it might be worth breaking 
it into a local variable for the IO factory or reader factory to make it easier 
to scan.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSqlStatement.scala:
##########
@@ -37,15 +39,23 @@ class TestSqlStatement extends HoodieSparkSqlTestBase {
   val STATE_FINISH_ALL = 12
 
   test("Test Sql Statements") {
-    Seq("cow", "mor").foreach { tableType =>
-      withTempDir { tmp =>
-        val params = Map(
-          "tableType" -> tableType,
-          "tmpDir" -> {
-            tmp.getCanonicalPath.replace('\\', '/')
-          }
-        )
-        execSqlFile("/sql-statements.sql", params)
+    val baseFileFormats = if (HoodieSparkUtils.gteqSpark3_4) Seq("parquet", 
"lance") else Seq("parquet")
+    baseFileFormats.foreach { baseFileFormat =>
+      Seq("cow", "mor").foreach { tableType =>
+        withTempDir { tmp =>
+          val params = Map(
+            "tableType" -> tableType,
+            "baseFileFormat" -> baseFileFormat,
+            "recordMergerImpl" -> (

Review Comment:
   🤖 nit: the multi-line `if/else` expression for `recordMergerImpl` embedded 
inline inside the `Map(...)` literal is a bit hard to parse at a glance — could 
you assign it to a `val` before the `Map`, similar to how 
`TestSparkSqlCoreFlow` handles it?



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java:
##########
@@ -49,7 +49,10 @@ public static <T, I, K, O> HoodieFileWriter getFileWriter(
       String instantTime, StoragePath path, HoodieStorage storage, 
HoodieConfig config, HoodieSchema schema,
       TaskContextSupplier taskContextSupplier, HoodieRecordType recordType) 
throws IOException {
     final String extension = FSUtils.getFileExtension(path.getName());
-    HoodieFileWriterFactory factory = 
HoodieIOFactory.getIOFactory(storage).getWriterFactory(recordType);
+    HoodieFileFormat format = 
HoodieFileFormat.fromFileExtensionOrNull(extension);
+    HoodieRecordType fixedType = (format != null && 
format.requiresSparkRecordType())

Review Comment:
   🤖 nit: `fixedType` doesn't communicate much intent — something like 
`effectiveRecordType` or `resolvedRecordType` would make it clearer that this 
is the format-aware override of the caller-supplied type.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -396,24 +399,51 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecordWithEx
     }
 
     //record is inserted or updated
-    String partitionPath = inferPartitionPath(incoming, existing, 
writeSchemaWithMetaFields, keyGenerator, existingRecordContext, mergeResult);
-    HoodieRecord<R> result = 
existingRecordContext.constructHoodieRecord(mergeResult, partitionPath);
+    String partitionPath = inferPartitionPath(incoming, existing, 
writeSchemaWithMetaFields, keyGenerator,
+        existingRecordContext, mergeResult, incomingBufferedRecord, 
existingBufferedRecord, incomingRecordContext);
+    // When HoodieAvroRecordMerger creates a genuinely new BufferedRecord, it 
encodes the schema into
+    // incomingRecordContext. Re-encode into existingRecordContext so 
constructHoodieRecord (which uses
+    // existingRecordContext for the correct payload class) can resolve the 
schema for SPARK records.
+    BufferedRecord<R> mergeResultForConstruct = mergeResult;
+    if (mergeResult != incomingBufferedRecord && mergeResult != 
existingBufferedRecord) {
+      HoodieSchema mergedSchema = 
incomingRecordContext.getSchemaFromBufferRecord(mergeResult);
+      if (mergedSchema != null && 
existingRecordContext.getSchemaFromBufferRecord(mergeResult) == null) {
+        mergeResultForConstruct = BufferedRecords.fromEngineRecord(
+            mergeResult.getRecord(), mergeResult.getRecordKey(), mergedSchema,
+            existingRecordContext, Arrays.asList(orderingFieldNames), 
mergeResult.getHoodieOperation());
+      }
+    }
+    HoodieRecord<R> result = 
existingRecordContext.constructHoodieRecord(mergeResultForConstruct, 
partitionPath);
     HoodieRecord<R> withMeta = result.prependMetaFields(writeSchema, 
writeSchemaWithMetaFields,
         new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(partitionPath),
 properties);
     return 
Option.of(withMeta.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
 properties, Option.empty(),
         config.allowOperationMetadataField(), Option.empty(), false, 
Option.of(writeSchema)));
   }

Review Comment:
   🤖 nit: `inferPartitionPath` has grown to 9 parameters — might be worth 
considering a small value object or passing the incoming/existing 
`BufferedRecord` pair as a context struct to reduce the signature length.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -47,24 +48,47 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
   val colsToCompare = "timestamp, _row_key, partition_path, rider, driver, 
begin_lat, begin_lon, end_lat, end_lon, fare.amount, fare.currency, 
_hoodie_is_deleted"
 
   //params for core flow tests
-  val params: List[String] = List(
-    
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
-    
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
-    
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
-    
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
-    
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
-    
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
-    
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
-    
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
-    
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
-    
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM",
-    
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
-    
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE",
-    
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
-    
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM",
-    
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE",
-    
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE"
-  )
+  val params: List[String] = {
+    val allParams: List[String] = List(
+      
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|parquet",
+      
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|parquet",
+      
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|parquet",
+      
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|parquet",
+      
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|parquet",
+      
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|parquet",
+      
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|parquet",
+      
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|parquet",
+      
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|parquet",
+      
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|parquet",
+      
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|parquet",
+      
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|parquet",
+      
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|parquet",
+      
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|parquet",
+      
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|parquet",
+      
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|parquet",
+      
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|lance",
+      
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|lance",
+      
"COPY_ON_WRITE|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|lance",
+      
"COPY_ON_WRITE|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|lance",
+      
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|lance",
+      
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|lance",
+      
"COPY_ON_WRITE|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|lance",
+      
"COPY_ON_WRITE|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|lance",
+      
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|lance",
+      
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_BLOOM|lance",
+      
"MERGE_ON_READ|false|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|lance",
+      
"MERGE_ON_READ|true|org.apache.hudi.keygen.SimpleKeyGenerator|GLOBAL_SIMPLE|lance",
+      
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|lance",
+      
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|BLOOM|lance",
+      
"MERGE_ON_READ|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|lance",
+      
"MERGE_ON_READ|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|SIMPLE|lance"

Review Comment:
   🤖 nit: the `if (HoodieSparkUtils.gteqSpark3_4) allParams else 
allParams.filterNot(_.endsWith("|lance"))` pattern is copy-pasted verbatim for 
`paramsForImmutable` too — could you pull it into a small helper like `def 
withLanceIfSupported(ps: List[String]): List[String]` to keep the two call 
sites DRY?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieLanceRealtimeInputFormat.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hadoop.HoodieLanceInputFormat;
+import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
+import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * HoodieRealtimeInputFormat for HUDI datasets which store data in Lance base 
file format.
+ */
+@UseRecordReaderFromInputFormat
+@UseFileSplitsFromInputFormat
+public class HoodieLanceRealtimeInputFormat extends 
HoodieMergeOnReadTableInputFormat {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLanceRealtimeInputFormat.class);
+
+  // NOTE: We're only using {@code HoodieLanceInputFormat} to compose {@code 
RecordReader}
+  private final HoodieLanceInputFormat lanceInputFormat = new 
HoodieLanceInputFormat();
+
+  @Override
+  public RecordReader<NullWritable, ArrayWritable> getRecordReader(final 
InputSplit split, final JobConf jobConf,
+      final Reporter reporter) throws IOException {
+    // Hive on Spark invokes multiple getRecordReaders from different threads 
in the same spark task (and hence the
+    // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is 
shared across all threads, is at the
+    // risk of experiencing race conditions. Hence, we synchronize on the 
JobConf object here. There is negligible
+    // latency incurred here due to the synchronization since get record 
reader is called once per spilt before the
+    // actual heavy lifting of reading the parquet files happen.
+    if (jobConf.get(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP) == null) {

Review Comment:
   🤖 nit: the comment says "reading the parquet files" — looks like a 
copy-paste artifact from the Parquet realtime format; probably should say 
"Lance files" (or just "the files").



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java:
##########
@@ -70,6 +70,14 @@ public String getFileExtension() {
     return extension;
   }
 
+  /**
+   * Returns true if this file format requires the SPARK record type for 
reading/writing.
+   * Lance only supports the Spark-native InternalRow representation, not Avro.
+   */
+  public boolean requiresSparkRecordType() {

Review Comment:
   🤖 nit: the `requiresSparkRecordType() ? SPARK : <fallback>` ternary is 
copy-pasted at least five times across this PR (HoodieIndexUtils ×3, 
HoodieReadHandle, HoodieTable, HoodieFileWriterFactory) — have you considered 
adding a helper like `resolveRecordType(HoodieRecordType fallback)` directly on 
`HoodieFileFormat` to avoid the repetition?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLanceRecordReader.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+
+import java.io.IOException;
+
+import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+
+public class HoodieLanceRecordReader implements RecordReader<NullWritable, 
ArrayWritable> {
+
+  private long count = 0;
+  private final ArrayWritable valueObj;
+  private HoodieFileReader reader;
+  private ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator;
+  private final HoodieSchema schema;
+
+  public HoodieLanceRecordReader(Configuration conf, InputSplit split, JobConf 
job) throws IOException {
+    FileSplit fileSplit = (FileSplit) split;
+    StoragePath path = convertToStoragePath(fileSplit.getPath());
+    StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf);
+    HoodieConfig hoodieConfig = getReaderConfigs(storageConf);
+    reader = HoodieIOFactory.getIOFactory(HoodieStorageUtils.getStorage(path, 
storageConf)).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+        .getFileReader(hoodieConfig, path, HoodieFileFormat.LANCE, 
Option.empty());
+
+    schema = reader.getSchema();
+    valueObj = new ArrayWritable(Writable.class, new 
Writable[schema.getFields().size()]);
+  }
+
+  @Override
+  public boolean next(NullWritable key, ArrayWritable value) throws 
IOException {
+    if (recordIterator == null) {
+      recordIterator = reader.getRecordIterator(schema);
+    }
+
+    if (!recordIterator.hasNext()) {
+      return false;
+    }
+
+    IndexedRecord record = recordIterator.next().getData();
+    ArrayWritable aWritable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, 
schema.toAvroSchema());
+    value.set(aWritable.get());
+    count++;
+    return true;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return null;
+  }
+
+  @Override
+  public ArrayWritable createValue() {
+    return valueObj;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    // TODO Auto-generated method stub
+    return 0;

Review Comment:
   🤖 nit: `// TODO Auto-generated method stub` looks like a leftover IDE 
placeholder — could you either remove it or replace it with a note explaining 
why `getPos()` always returns 0 for Lance?



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLanceInputFormat.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+
+/**
+ * HoodieInputFormat for HUDI datasets which store data in Lance base file 
format.
+ */
+@UseFileSplitsFromInputFormat
+public class HoodieLanceInputFormat extends HoodieCopyOnWriteTableInputFormat {
+
+  protected HoodieTimeline filterInstantsTimeline(HoodieTimeline timeline) {
+    return HoodieInputFormatUtils.filterInstantsTimeline(timeline);

Review Comment:
   🤖 nit: `filterInstantsTimeline` looks like it's overriding a parent method — 
could you add `@Override` here so the compiler can catch any future signature 
drift?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieReadHandle.java:
##########
@@ -68,14 +70,16 @@ protected HoodieBaseFile getLatestBaseFile() {
   }
 
   protected HoodieFileReader createNewFileReader() throws IOException {
-    return HoodieIOFactory.getIOFactory(hoodieTable.getStorage())
-        .getReaderFactory(this.config.getRecordMerger().getRecordType())
-        .getFileReader(config, getLatestBaseFile().getStoragePath());
+    return createNewFileReader(getLatestBaseFile());
   }
 

Review Comment:
   🤖 nit: `ext` is a bit terse — the rest of the codebase (e.g. 
`HoodieFileWriterFactory`) consistently uses `extension` for the same concept, 
could you align here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to