yihua commented on code in PR #18375: URL: https://github.com/apache/hudi/pull/18375#discussion_r3035040931
########## hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLanceRecordReader.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; + +import java.io.IOException; + +import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; + +public class HoodieLanceRecordReader implements RecordReader<NullWritable, ArrayWritable> { + + private long count = 0; + private final ArrayWritable valueObj; + private HoodieFileReader reader; + private ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator; + private final HoodieSchema schema; + + public HoodieLanceRecordReader(Configuration conf, InputSplit split, JobConf job) throws IOException { + FileSplit fileSplit = (FileSplit) split; + StoragePath path = convertToStoragePath(fileSplit.getPath()); + StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf); + HoodieConfig hoodieConfig = getReaderConfigs(storageConf); + reader = HoodieIOFactory.getIOFactory(HoodieStorageUtils.getStorage(path, storageConf)).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + .getFileReader(hoodieConfig, path, HoodieFileFormat.LANCE, Option.empty()); + + schema = reader.getSchema(); + valueObj = new ArrayWritable(Writable.class, new Writable[schema.getFields().size()]); + } + + @Override + public boolean next(NullWritable key, ArrayWritable value) throws IOException { + if (recordIterator == null) { + recordIterator = reader.getRecordIterator(schema); + } + + if (!recordIterator.hasNext()) { + return false; + } + + IndexedRecord record = recordIterator.next().getData(); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schema.toAvroSchema()); + value.set(aWritable.get()); + count++; + return true; + } + + @Override + public NullWritable createKey() { + return null; + } + + @Override + public ArrayWritable createValue() { + return valueObj; + } + + @Override + public long getPos() throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void close() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + if (recordIterator != null) { + recordIterator.close(); + recordIterator = null; + } + } + + @Override + public float getProgress() throws IOException { + return 1.0f * count / reader.getTotalRecords(); Review Comment: 🤖 `getProgress()` will NPE if called after `close()` (since `reader` is set to null), and will divide by zero if the file has zero records. Could you guard against both, e.g. `return (reader == null || reader.getTotalRecords() == 0) ? 0f : 1.0f * count / reader.getTotalRecords();`? ########## hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLanceRecordReader.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop; + +import org.apache.hudi.common.config.HoodieConfig; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.storage.HoodieStorageUtils; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; + +import java.io.IOException; + +import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs; +import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath; + +public class HoodieLanceRecordReader implements RecordReader<NullWritable, ArrayWritable> { + + private long count = 0; + private final ArrayWritable valueObj; + private HoodieFileReader reader; + private ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator; + private final HoodieSchema schema; + + public HoodieLanceRecordReader(Configuration conf, InputSplit split, JobConf job) throws IOException { + FileSplit fileSplit = (FileSplit) split; + StoragePath path = convertToStoragePath(fileSplit.getPath()); + StorageConfiguration<?> storageConf = HadoopFSUtils.getStorageConf(conf); + HoodieConfig hoodieConfig = getReaderConfigs(storageConf); + reader = HoodieIOFactory.getIOFactory(HoodieStorageUtils.getStorage(path, storageConf)).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + .getFileReader(hoodieConfig, path, HoodieFileFormat.LANCE, Option.empty()); + + schema = reader.getSchema(); + valueObj = new ArrayWritable(Writable.class, new Writable[schema.getFields().size()]); + } + + @Override + public boolean next(NullWritable key, ArrayWritable value) throws IOException { + if (recordIterator == null) { + recordIterator = reader.getRecordIterator(schema); + } + + if (!recordIterator.hasNext()) { + return false; + } + + IndexedRecord record = recordIterator.next().getData(); + ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, schema.toAvroSchema()); + value.set(aWritable.get()); + count++; + return true; + } + + @Override + public NullWritable createKey() { + return null; + } + + @Override + public ArrayWritable createValue() { + return valueObj; + } + + @Override + public long getPos() throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void close() throws IOException { + if (reader != null) { Review Comment: 🤖 Could you close the `recordIterator` before the `reader`? The iterator likely holds references to resources owned by the reader, so closing the reader first may cause the iterator's close to fail or leak resources. (I see the HFile reader has the same order, but it looks like a bug there too.) ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala: ########## @@ -76,8 +76,21 @@ class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumna val filePath = file.filePath.toString if (requiredSchema.isEmpty && partitionSchema.isEmpty) { - // No columns requested - return empty iterator - Iterator.empty + // No column data needed (e.g. count() on a non-partitioned table). Open the Lance file to + // get the actual row count and return that many empty rows, so Spark computes the correct count. + val countAllocator = HoodieArrowAllocator.newChildAllocator( + getClass.getSimpleName + "-count-" + file.filePath, HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE) + try { + val lanceReader = LanceFileReader.open(file.filePath.toString, countAllocator) + try { + val rowCount = lanceReader.numRows() + Iterator.fill(rowCount.toInt)(InternalRow.empty) + } finally { Review Comment: 🤖 `numRows().toInt` will silently overflow for files with more than ~2.1 billion rows. Could you add a check or use a lazy iterator that doesn't need to materialize the count as an Int? ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java: ########## @@ -136,18 +140,43 @@ protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> previousRecord, B // special case for payloads when there is no previous record HoodieSchema recordSchema = readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId()); GenericRecord record = readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(), recordSchema); - HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, HoodieRecordUtils.loadPayload(payloadClass, record, mergedRecord.getOrderingValue())); - try { - if (hoodieRecord.shouldIgnore(recordSchema, properties)) { - return null; - } else { - HoodieSchema readerSchema = readerContext.getSchemaHandler().getRequestedSchema(); - // If the record schema is different from the reader schema, rewrite the record using the payload methods to ensure consistency with legacy writer paths - hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties, readerSchema).toIndexedRecord(readerSchema, properties) - .ifPresent(rewrittenRecord -> mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData()))); + Schema recordAvroSchema = recordSchema.toAvroSchema(); + + // If convertToAvroRecord returned a cached record with a different schema (e.g., from + // extractDataFromRecord caching for ExpressionPayload in the COW write path), the record + // is already in write-schema format with correctly evaluated expressions. Convert directly. + // Note: SENTINEL records have null schema and must go through the payload path for shouldIgnore. Review Comment: 🤖 When the cached Avro record's schema differs, this skips `shouldIgnore` and the payload evaluation entirely. Is it safe to assume that records arriving through the ExpressionPayload cache path can never be sentinel/ignorable records? ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/UpdateProcessor.java: ########## @@ -136,18 +140,43 @@ protected BufferedRecord<T> handleNonDeletes(BufferedRecord<T> previousRecord, B // special case for payloads when there is no previous record HoodieSchema recordSchema = readerContext.getRecordContext().decodeAvroSchema(mergedRecord.getSchemaId()); GenericRecord record = readerContext.getRecordContext().convertToAvroRecord(mergedRecord.getRecord(), recordSchema); - HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, HoodieRecordUtils.loadPayload(payloadClass, record, mergedRecord.getOrderingValue())); - try { - if (hoodieRecord.shouldIgnore(recordSchema, properties)) { - return null; - } else { - HoodieSchema readerSchema = readerContext.getSchemaHandler().getRequestedSchema(); - // If the record schema is different from the reader schema, rewrite the record using the payload methods to ensure consistency with legacy writer paths - hoodieRecord.rewriteRecordWithNewSchema(recordSchema, properties, readerSchema).toIndexedRecord(readerSchema, properties) - .ifPresent(rewrittenRecord -> mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(rewrittenRecord.getData()))); + Schema recordAvroSchema = recordSchema.toAvroSchema(); + + // If convertToAvroRecord returned a cached record with a different schema (e.g., from + // extractDataFromRecord caching for ExpressionPayload in the COW write path), the record + // is already in write-schema format with correctly evaluated expressions. Convert directly. + // Note: SENTINEL records have null schema and must go through the payload path for shouldIgnore. + if (record.getSchema() != null && !record.getSchema().equals(recordAvroSchema)) { + mergedRecord.replaceRecord(readerContext.getRecordContext().convertAvroRecord(record)); + } else { + HoodieAvroRecord hoodieRecord = new HoodieAvroRecord<>(null, HoodieRecordUtils.loadPayload(payloadClass, record, mergedRecord.getOrderingValue())); + try { + if (hoodieRecord.shouldIgnore(recordSchema, properties)) { + return null; + } + // Evaluate the payload to get the insert value + Option<IndexedRecord> insertValueOpt = hoodieRecord.getData().getInsertValue(recordAvroSchema, properties); + if (insertValueOpt.isPresent()) { + GenericRecord insertRecord = (GenericRecord) insertValueOpt.get(); + HoodieSchema readerSchema = readerContext.getSchemaHandler().getRequestedSchema(); Review Comment: 🤖 When `getInsertValue` returns empty (e.g., `DefaultHoodieRecordPayload` for a soft-deleted record where `shouldIgnore` is false), `mergedRecord` is not replaced and falls through to `super.handleNonDeletes()` which emits it as an insert. The old code would have crashed inside `rewriteRecordWithNewSchema` (which calls `.get()` on the empty Option). Is the new silent-passthrough behavior intentional here, or should we return `null` when `insertValueOpt` is empty to suppress the record? ########## hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java: ########## @@ -49,7 +49,13 @@ public static <T, I, K, O> HoodieFileWriter getFileWriter( String instantTime, StoragePath path, HoodieStorage storage, HoodieConfig config, HoodieSchema schema, TaskContextSupplier taskContextSupplier, HoodieRecordType recordType) throws IOException { final String extension = FSUtils.getFileExtension(path.getName()); - HoodieFileWriterFactory factory = HoodieIOFactory.getIOFactory(storage).getWriterFactory(recordType); + HoodieRecordType fixedType; + try { + fixedType = HoodieFileFormat.fromFileExtension(extension).requiresSparkRecordType() ? HoodieRecordType.SPARK : recordType; Review Comment: 🤖 nit: using exception-based control flow for a predictable condition (unknown extension) is a bit surprising. Could you check `HoodieFileFormat.values()` or add a `fromFileExtensionOrNull` variant to avoid the try/catch? ########## hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala: ########## @@ -21,25 +21,73 @@ package org.apache.hudi import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.hudi.common.engine.RecordContext +import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.exception.HoodieException import org.apache.spark.sql.HoodieInternalRowUtils import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.hudi.SparkAdapter +import java.io.IOException +import java.util.Properties import scala.collection.mutable trait SparkFileFormatInternalRecordContext extends BaseSparkInternalRecordContext { lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter private val deserializerMap: mutable.Map[HoodieSchema, HoodieAvroDeserializer] = mutable.Map() private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] = mutable.Map() + // Maps InternalRow instances (by identity) to their original Avro records when the Avro record's Review Comment: 🤖 The `IdentityHashMap<InternalRow, GenericRecord>` cache relies on object identity surviving between `extractDataFromRecord` (population) and `convertToAvroRecord` (lookup). If anything in between calls `seal()` (which does `internalRow.copy()`), `replaceRecord()`, or `project()`, the identity link silently breaks and the code falls through to schema-based serialization — potentially with the wrong schema. Could you add a comment documenting this invariant, or consider a more robust keying strategy (e.g., attaching the original Avro record to the BufferedRecord itself)? ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestQueryMergeOnReadOptimizedTable.scala: ########## @@ -19,63 +19,72 @@ package org.apache.spark.sql.hudi.feature +import org.apache.hudi.HoodieSparkUtils + import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase +import org.scalatest.Inspectors.forAll class TestQueryMergeOnReadOptimizedTable extends HoodieSparkSqlTestBase { - test("Test Query Merge_On_Read Read_Optimized table") { - withTempDir { tmp => - val tableName = generateTableName - val tablePath = s"${tmp.getCanonicalPath}/$tableName" - // create table - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long, - | partition long - |) using hudi - | partitioned by (partition) - | location '$tablePath' - | tblproperties ( - | type = 'mor', - | primaryKey = 'id', - | orderingFields = 'ts' - | ) - """.stripMargin) - // insert data to table - withSQLConf("hoodie.parquet.max.file.size" -> "10000") { - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)") - spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 1000)") - spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 1000)") - spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000, 1000)") - spark.sql(s"update $tableName set price = 11 where id = 1") - spark.sql(s"update $tableName set price = 21 where id = 2") - spark.sql(s"update $tableName set price = 31 where id = 3") - spark.sql(s"update $tableName set price = 41 where id = 4") - // expect that all complete parquet files can be scanned - assertQueryResult(4, tablePath) + val baseFileFormats: List[String] = if (HoodieSparkUtils.gteqSpark3_4) List("parquet", "lance") else List("parquet") + Review Comment: 🤖 nit: `Inspectors.forAll` is intended for assertions ("verify all elements satisfy a predicate"), not for iterating to register tests. A plain `baseFileFormats.foreach` would be more idiomatic here and avoids the subtle exception-wrapping behavior of `forAll` if test registration ever throws. ########## hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestQueryMergeOnReadOptimizedTable.scala: ########## @@ -19,63 +19,72 @@ package org.apache.spark.sql.hudi.feature +import org.apache.hudi.HoodieSparkUtils + import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase +import org.scalatest.Inspectors.forAll class TestQueryMergeOnReadOptimizedTable extends HoodieSparkSqlTestBase { - test("Test Query Merge_On_Read Read_Optimized table") { - withTempDir { tmp => - val tableName = generateTableName - val tablePath = s"${tmp.getCanonicalPath}/$tableName" - // create table - spark.sql( - s""" - |create table $tableName ( - | id int, - | name string, - | price double, - | ts long, - | partition long - |) using hudi - | partitioned by (partition) - | location '$tablePath' - | tblproperties ( - | type = 'mor', - | primaryKey = 'id', - | orderingFields = 'ts' - | ) - """.stripMargin) - // insert data to table - withSQLConf("hoodie.parquet.max.file.size" -> "10000") { - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)") - spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, 1000)") - spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, 1000)") - spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000, 1000)") - spark.sql(s"update $tableName set price = 11 where id = 1") - spark.sql(s"update $tableName set price = 21 where id = 2") - spark.sql(s"update $tableName set price = 31 where id = 3") - spark.sql(s"update $tableName set price = 41 where id = 4") - // expect that all complete parquet files can be scanned - assertQueryResult(4, tablePath) + val baseFileFormats: List[String] = if (HoodieSparkUtils.gteqSpark3_4) List("parquet", "lance") else List("parquet") + + forAll(baseFileFormats) { (baseFileFormat: String) => + test(s"Test Query Merge_On_Read Read_Optimized table - $baseFileFormat") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | partition long + |) using hudi + | partitioned by (partition) + | location '$tablePath' + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', Review Comment: 🤖 In `TestSqlStatement`, you explicitly set `hoodie.write.record.merge.custom.implementation.classes` for lance format, but here it's not set. Is this intentional — relying on the auto-selected merger from `EVENT_TIME_ORDERING` mode (via `orderingFields = 'ts'`)? Might be worth adding a comment noting why it's not needed here, since the other test does set it. ########## hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java: ########## @@ -157,6 +175,7 @@ public static String getOutputFormatClassName(HoodieFileFormat baseFileFormat) { switch (baseFileFormat) { case PARQUET: case HFILE: + case LANCE: Review Comment: 🤖 Is it intentional that Lance uses `MapredParquetOutputFormat` and `ParquetHiveSerDe`? If Hive ever tries to write through this path, it would produce Parquet files instead of Lance files. Might be worth a comment explaining this is read-only / placeholder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
