hudi-agent commented on code in PR #18987:
URL: https://github.com/apache/hudi/pull/18987#discussion_r3409099149


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.table.read.UpdateProcessor;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.OrderingValues;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.UnaryOperator;
+
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD;
+import static 
org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent;
+import static 
org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath;
+
+/**
+ * Streaming sorted-merge reader for LSM file groups whose delta files are 
parquet files.
+ *
+ * <p>Each input file is expected to be sorted by record key. The iterator 
keeps one record from
+ * each file in memory, merges all versions for the same key with the regular 
file-group reader
+ * merge semantics, and emits the final row.
+ */
+public class LsmFileGroupRecordIterator<T> implements 
ClosableIterator<BufferedRecord<T>> {
+
+  private static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key";
+  private static final HoodieSchema DELETE_LOG_SCHEMA = HoodieSchema.parse(
+      "{"
+          + "\"type\":\"record\","
+          + "\"name\":\"hudi_delete_log_record\","
+          + "\"fields\":["
+          + "{\"name\":\"record_key\",\"type\":\"string\"},"
+          + 
"{\"name\":\"partition_path\",\"type\":[\"null\",\"string\"],\"default\":null},"
+          + 
"{\"name\":\"ordering_val\",\"type\":[\"null\",\"bytes\"],\"default\":null}"
+          + "]"
+          + "}");
+
+  private final HoodieReaderContext<T> readerContext;
+  private final HoodieStorage storage;
+  private final InputSplit inputSplit;
+  private final HoodieSchema readerSchema;
+  private final List<String> orderingFieldNames;
+  private final boolean includeBaseFile;
+  private final BufferedRecordMerger<T> bufferedRecordMerger;
+  private final UpdateProcessor<T> updateProcessor;
+  private final LoserTree<T> readers;
+  private final int spillThreshold;
+  private final String spillBasePath;
+  private BufferedRecord<T> nextRecord;
+
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback) throws IOException {
+    this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, 
props, readerParameters, readStats, fileGroupUpdateCallback, true);
+  }
+
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback,
+                                    boolean includeBaseFile) throws 
IOException {
+    this.readerContext = readerContext;
+    this.storage = storage;
+    this.inputSplit = inputSplit;
+    this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
+    this.orderingFieldNames = orderingFieldNames;
+    this.includeBaseFile = includeBaseFile;
+    this.bufferedRecordMerger = BufferedRecordMergerFactory.create(
+        readerContext, readerContext.getMergeMode(), false, 
readerContext.getRecordMerger(),
+        readerSchema, readerContext.getPayloadClasses(props), props, 
metaClient.getTableConfig().getPartialUpdateMode());
+    this.updateProcessor = UpdateProcessor.create(readStats, readerContext, 
readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props);
+    this.spillThreshold = 
props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), 
LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue());
+    this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), 
getDefaultSpillableMapBasePath());
+    this.readers = new LoserTree<>(initializeReaders());
+  }
+
+  private List<ReaderState<T>> initializeReaders() throws IOException {
+    List<ReaderState<T>> readerStates = new ArrayList<>();
+    int mergeOrder = 0;
+    if (includeBaseFile && inputSplit.getBaseFileOption().isPresent()) {
+      addReader(readerStates, mergeOrder++, 
createBaseFileIterator(inputSplit.getBaseFileOption().get()));
+    }
+    for (HoodieLogFile logFile : inputSplit.getLogFiles()) {
+      ClosableIterator<BufferedRecord<T>> iterator = 
createFileIterator(logFile.getPathInfo(), logFile.getPath(), 
logFile.getFileSize());
+      addReader(readerStates, mergeOrder, maybeSpillIterator(mergeOrder, 
iterator));
+      mergeOrder++;
+    }
+    return readerStates;
+  }
+
+  private ClosableIterator<BufferedRecord<T>> maybeSpillIterator(int 
mergeOrder,
+                                                                 
ClosableIterator<BufferedRecord<T>> iterator) {
+    if (mergeOrder < spillThreshold) {
+      return iterator;
+    }
+    return new SpillableLsmRecordIterator<>(iterator, 
readerContext.getRecordSerializer(), readerContext.getRecordContext(), 
spillBasePath);
+  }
+
+  private void addReader(List<ReaderState<T>> readerStates, int mergeOrder, 
ClosableIterator<BufferedRecord<T>> iterator) {
+    ReaderState<T> readerState = new ReaderState<>(mergeOrder, iterator);
+    if (readerState.advance()) {
+      readerStates.add(readerState);
+    } else {
+      readerState.close();
+    }
+  }
+
+  private ClosableIterator<BufferedRecord<T>> 
createBaseFileIterator(HoodieBaseFile baseFile) throws IOException {
+    BaseFile file = baseFile.getBootstrapBaseFile().orElse(baseFile);
+    return createFileIterator(file.getPathInfo(), file.getStoragePath(), 
file.getFileSize());
+  }
+
+  private ClosableIterator<BufferedRecord<T>> 
createFileIterator(StoragePathInfo pathInfo,
+                                                                 StoragePath 
path,
+                                                                 long 
fileSize) throws IOException {
+    StoragePath storagePath = pathInfo != null ? pathInfo.getPath() : path;
+    if (FSUtils.isNativeDeleteLogFile(storagePath.getName())) {
+      return createNativeDeleteLogIterator(pathInfo, storagePath, fileSize);
+    }
+    Pair<HoodieSchema, Map<String, String>> requiredSchemaAndRenamedColumns =
+        
readerContext.getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(storagePath);
+    HoodieSchema fileRequiredSchema = 
requiredSchemaAndRenamedColumns.getLeft();
+    ClosableIterator<T> recordIterator;
+    if (pathInfo != null) {
+      recordIterator = readerContext.getFileRecordIterator(
+          pathInfo, 0, pathInfo.getLength(), 
readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage);
+    } else {
+      long length = fileSize >= 0 ? fileSize : 
storage.getPathInfo(storagePath).getLength();
+      recordIterator = readerContext.getFileRecordIterator(
+          storagePath, 0, length, 
readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage);
+    }
+    if (!areSchemasProjectionEquivalent(fileRequiredSchema, readerSchema) || 
!requiredSchemaAndRenamedColumns.getRight().isEmpty()) {
+      UnaryOperator<T> projector = readerContext.getRecordContext()
+          .projectRecord(fileRequiredSchema, readerSchema, 
requiredSchemaAndRenamedColumns.getRight());

Review Comment:
   🤖 The `DELETE_LOG_SCHEMA` defines `ordering_val` (nullable bytes), but here 
we always pass `OrderingValues.getDefault()` rather than reading it from the 
record. The existing pattern (`BufferedRecords.fromDeleteRecord` → 
`RecordContext.getOrderingValue(DeleteRecord)`) preserves the actual ordering 
value when it isn't commit-time. If the write side writes a real `ordering_val` 
for any event-time-ordered table, those deletes would lose to inserts they 
should override. Could you confirm what the writer puts in this field and read 
it accordingly?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/HoodieLsmFileGroupReader.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.IteratorMode;
+import org.apache.hudi.common.table.read.ParquetRowIndexBasedSchemaHandler;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+
+/**
+ * A file group reader for LSM file groups backed by native parquet log files.
+ *
+ * <p>This reader is intentionally separate from {@code 
HoodieFileGroupReader}. Callers opt into
+ * this reader when they know the file group follows LSM sorted-file semantics.
+ */
+public final class HoodieLsmFileGroupReader<T> implements Closeable {
+
+  private final HoodieReaderContext<T> readerContext;
+  private final HoodieTableMetaClient metaClient;
+  private final InputSplit inputSplit;
+  private final List<String> orderingFieldNames;
+  private final HoodieStorage storage;
+  private final TypedProperties props;
+  private final ReaderParameters readerParameters;
+  private final Option<UnaryOperator<T>> outputConverter;
+  private final Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback;
+  private ClosableIterator<BufferedRecord<T>> lsmRecordIterator;
+  @Getter
+  private final HoodieReadStats readStats;
+
+  @Builder(setterPrefix = "with")
+  private HoodieLsmFileGroupReader(
+      HoodieReaderContext<T> readerContext,
+      String latestCommitTime,
+      HoodieSchema dataSchema,
+      HoodieSchema requestedSchema,
+      Option<InternalSchema> internalSchemaOpt,
+      HoodieTableMetaClient hoodieTableMetaClient,
+      TypedProperties props,
+      Option<HoodieBaseFile> baseFileOption,
+      Stream<HoodieLogFile> logFiles,
+      String partitionPath,
+      Long start,
+      Long length,
+      Boolean allowInflightInstants,
+      Boolean emitDelete,
+      Option<BaseFileUpdateCallback<T>> fileGroupUpdateCallback) {
+
+    ValidationUtils.checkArgument(readerContext != null, "Reader context is 
required");
+    ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie table 
meta client is required");
+    ValidationUtils.checkArgument(latestCommitTime != null, "Latest commit 
time is required");
+    ValidationUtils.checkArgument(dataSchema != null, "Data schema is 
required");
+    ValidationUtils.checkArgument(requestedSchema != null, "Requested schema 
is required");
+    ValidationUtils.checkArgument(props != null, "Props is required");
+    ValidationUtils.checkArgument(partitionPath != null, "Partition path is 
required");
+    
ValidationUtils.checkArgument(hoodieTableMetaClient.getTableConfig().getLogFileFormat()
 == HoodieFileFormat.PARQUET,
+        "LSM file group reader expects parquet log files");
+
+    if (internalSchemaOpt == null) {
+      internalSchemaOpt = Option.empty();
+    }
+    if (baseFileOption == null) {
+      baseFileOption = Option.empty();
+    }
+    if (start == null) {
+      start = 0L;
+    }
+    if (length == null) {
+      length = Long.MAX_VALUE;
+    }
+    if (allowInflightInstants == null) {
+      allowInflightInstants = false;
+    }
+    if (emitDelete == null) {
+      emitDelete = false;
+    }
+    if (fileGroupUpdateCallback == null) {
+      fileGroupUpdateCallback = Option.empty();
+    }
+
+    String tablePath = hoodieTableMetaClient.getBasePath().toString();
+    HoodieStorage storage = hoodieTableMetaClient.getStorage().newInstance(new 
StoragePath(tablePath), readerContext.getStorageConfiguration());
+
+    this.readerParameters = ReaderParameters.builder()
+        .shouldUseRecordPosition(false)
+        .emitDeletes(emitDelete)
+        .sortOutputs(false)
+        .inflightInstantsAllowed(allowInflightInstants)
+        .build();
+    this.inputSplit = InputSplit.builder()
+        .baseFileOption(baseFileOption)
+        .logFileStream(logFiles)
+        .partitionPath(partitionPath)
+        .start(start)
+        .length(length)
+        .build();
+
+    this.readerContext = readerContext;
+    this.fileGroupUpdateCallback = fileGroupUpdateCallback;
+    this.metaClient = hoodieTableMetaClient;
+    this.storage = storage;
+
+    readerContext.setHasLogFiles(this.inputSplit.hasLogFiles());

Review Comment:
   🤖 If a caller invokes multiple iterator getters (e.g. 
`getClosableIterator()` then `getClosableHoodieRecordIterator()`) on the same 
reader, this line overwrites `lsmRecordIterator` without closing the previous 
one — that earlier iterator (and any open parquet readers / spill files it 
holds) leaks until JVM exit. Could you guard with a null/closed check, or 
document that only one iterator may be obtained per reader instance?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -336,22 +345,19 @@ public static String getFileExtensionFromLog(StoragePath 
logPath) {
   }
 
   public static String getFileIdFromFileName(String fileName) {
-    if (FSUtils.isLogFile(fileName)) {
-      Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
-      if (!matcher.matches()) {
-        throw new InvalidHoodieFileNameException(fileName, "LogFile");
-      }
-      return matcher.group(1);
+    Option<Matcher> logFileMatcher = matchLogFile(fileName);
+    if (logFileMatcher.isPresent()) {
+      return logFileMatcher.get().group(1);
     }
     return FSUtils.getFileId(fileName);
   }

Review Comment:
   🤖 Switching to `matchLogFile` here looks like a regression for archive files 
— the original `LOG_FILE_PATTERN.matcher(...)` matched both `log|archive`, but 
`matchLogFile` only returns matches when group(3).equals("log"). The existing 
`testArchiveLogFileName` test asserts `getFileIdFromLogPath(archive_path)` 
returns the file id, which would now throw `InvalidHoodiePathException`. Could 
you preserve the archive-supporting behavior, or use `LOG_FILE_PATTERN` 
directly here?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.table.read.UpdateProcessor;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.OrderingValues;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.UnaryOperator;
+
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD;
+import static 
org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent;
+import static 
org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath;
+
+/**
+ * Streaming sorted-merge reader for LSM file groups whose delta files are 
parquet files.
+ *
+ * <p>Each input file is expected to be sorted by record key. The iterator 
keeps one record from
+ * each file in memory, merges all versions for the same key with the regular 
file-group reader
+ * merge semantics, and emits the final row.
+ */
+public class LsmFileGroupRecordIterator<T> implements 
ClosableIterator<BufferedRecord<T>> {
+
+  private static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key";
+  private static final HoodieSchema DELETE_LOG_SCHEMA = HoodieSchema.parse(
+      "{"
+          + "\"type\":\"record\","
+          + "\"name\":\"hudi_delete_log_record\","
+          + "\"fields\":["
+          + "{\"name\":\"record_key\",\"type\":\"string\"},"
+          + 
"{\"name\":\"partition_path\",\"type\":[\"null\",\"string\"],\"default\":null},"
+          + 
"{\"name\":\"ordering_val\",\"type\":[\"null\",\"bytes\"],\"default\":null}"
+          + "]"
+          + "}");
+
+  private final HoodieReaderContext<T> readerContext;
+  private final HoodieStorage storage;
+  private final InputSplit inputSplit;
+  private final HoodieSchema readerSchema;
+  private final List<String> orderingFieldNames;
+  private final boolean includeBaseFile;
+  private final BufferedRecordMerger<T> bufferedRecordMerger;
+  private final UpdateProcessor<T> updateProcessor;
+  private final LoserTree<T> readers;
+  private final int spillThreshold;
+  private final String spillBasePath;
+  private BufferedRecord<T> nextRecord;
+
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback) throws IOException {
+    this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, 
props, readerParameters, readStats, fileGroupUpdateCallback, true);
+  }
+
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback,
+                                    boolean includeBaseFile) throws 
IOException {
+    this.readerContext = readerContext;
+    this.storage = storage;
+    this.inputSplit = inputSplit;
+    this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
+    this.orderingFieldNames = orderingFieldNames;
+    this.includeBaseFile = includeBaseFile;
+    this.bufferedRecordMerger = BufferedRecordMergerFactory.create(
+        readerContext, readerContext.getMergeMode(), false, 
readerContext.getRecordMerger(),
+        readerSchema, readerContext.getPayloadClasses(props), props, 
metaClient.getTableConfig().getPartialUpdateMode());
+    this.updateProcessor = UpdateProcessor.create(readStats, readerContext, 
readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props);
+    this.spillThreshold = 
props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), 
LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue());
+    this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), 
getDefaultSpillableMapBasePath());
+    this.readers = new LoserTree<>(initializeReaders());
+  }
+
+  private List<ReaderState<T>> initializeReaders() throws IOException {
+    List<ReaderState<T>> readerStates = new ArrayList<>();
+    int mergeOrder = 0;
+    if (includeBaseFile && inputSplit.getBaseFileOption().isPresent()) {
+      addReader(readerStates, mergeOrder++, 
createBaseFileIterator(inputSplit.getBaseFileOption().get()));
+    }
+    for (HoodieLogFile logFile : inputSplit.getLogFiles()) {
+      ClosableIterator<BufferedRecord<T>> iterator = 
createFileIterator(logFile.getPathInfo(), logFile.getPath(), 
logFile.getFileSize());
+      addReader(readerStates, mergeOrder, maybeSpillIterator(mergeOrder, 
iterator));
+      mergeOrder++;
+    }
+    return readerStates;
+  }
+
+  private ClosableIterator<BufferedRecord<T>> maybeSpillIterator(int 
mergeOrder,
+                                                                 
ClosableIterator<BufferedRecord<T>> iterator) {
+    if (mergeOrder < spillThreshold) {
+      return iterator;
+    }
+    return new SpillableLsmRecordIterator<>(iterator, 
readerContext.getRecordSerializer(), readerContext.getRecordContext(), 
spillBasePath);
+  }
+
+  private void addReader(List<ReaderState<T>> readerStates, int mergeOrder, 
ClosableIterator<BufferedRecord<T>> iterator) {
+    ReaderState<T> readerState = new ReaderState<>(mergeOrder, iterator);
+    if (readerState.advance()) {
+      readerStates.add(readerState);
+    } else {
+      readerState.close();
+    }
+  }
+
+  private ClosableIterator<BufferedRecord<T>> 
createBaseFileIterator(HoodieBaseFile baseFile) throws IOException {
+    BaseFile file = baseFile.getBootstrapBaseFile().orElse(baseFile);
+    return createFileIterator(file.getPathInfo(), file.getStoragePath(), 
file.getFileSize());
+  }
+
+  private ClosableIterator<BufferedRecord<T>> 
createFileIterator(StoragePathInfo pathInfo,
+                                                                 StoragePath 
path,
+                                                                 long 
fileSize) throws IOException {
+    StoragePath storagePath = pathInfo != null ? pathInfo.getPath() : path;
+    if (FSUtils.isNativeDeleteLogFile(storagePath.getName())) {
+      return createNativeDeleteLogIterator(pathInfo, storagePath, fileSize);
+    }
+    Pair<HoodieSchema, Map<String, String>> requiredSchemaAndRenamedColumns =
+        
readerContext.getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(storagePath);
+    HoodieSchema fileRequiredSchema = 
requiredSchemaAndRenamedColumns.getLeft();
+    ClosableIterator<T> recordIterator;
+    if (pathInfo != null) {
+      recordIterator = readerContext.getFileRecordIterator(
+          pathInfo, 0, pathInfo.getLength(), 
readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage);
+    } else {
+      long length = fileSize >= 0 ? fileSize : 
storage.getPathInfo(storagePath).getLength();
+      recordIterator = readerContext.getFileRecordIterator(
+          storagePath, 0, length, 
readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage);
+    }
+    if (!areSchemasProjectionEquivalent(fileRequiredSchema, readerSchema) || 
!requiredSchemaAndRenamedColumns.getRight().isEmpty()) {
+      UnaryOperator<T> projector = readerContext.getRecordContext()
+          .projectRecord(fileRequiredSchema, readerSchema, 
requiredSchemaAndRenamedColumns.getRight());
+      recordIterator = new CloseableMappingIterator<>(recordIterator, 
projector);
+    }
+    if (readerContext.getInstantRange().isPresent()) {
+      recordIterator = readerContext.applyInstantRangeFilter(recordIterator);
+    }
+    return new CloseableMappingIterator<>(recordIterator, record -> 
BufferedRecords.fromEngineRecord(
+        readerContext.getRecordContext().seal(record),
+        readerSchema,
+        readerContext.getRecordContext(),
+        orderingFieldNames,
+        readerContext.getRecordContext().isDeleteRecord(record, 
readerContext.getSchemaHandler().getDeleteContext().withReaderSchema(readerSchema))));
+  }
+
+  private ClosableIterator<BufferedRecord<T>> 
createNativeDeleteLogIterator(StoragePathInfo pathInfo,
+                                                                            
StoragePath storagePath,
+                                                                            
long fileSize) throws IOException {
+    ClosableIterator<T> recordIterator;
+    if (pathInfo != null) {
+      recordIterator = readerContext.getFileRecordIterator(
+          pathInfo, 0, pathInfo.getLength(), DELETE_LOG_SCHEMA, 
DELETE_LOG_SCHEMA, storage);
+    } else {
+      long length = fileSize >= 0 ? fileSize : 
storage.getPathInfo(storagePath).getLength();
+      recordIterator = readerContext.getFileRecordIterator(
+          storagePath, 0, length, DELETE_LOG_SCHEMA, DELETE_LOG_SCHEMA, 
storage);
+    }
+    return new CloseableMappingIterator<>(recordIterator, record -> {
+      Object recordKey = readerContext.getRecordContext().getValue(record, 
DELETE_LOG_SCHEMA, DELETE_LOG_RECORD_KEY_FIELD);
+      return BufferedRecords.createDelete(recordKey.toString(), 
OrderingValues.getDefault());
+    });
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (nextRecord != null) {
+      return true;
+    }
+    while (!readers.isEmpty()) {
+      BufferedRecord<T> mergedRecord = nextMergedRecord();
+      nextRecord = updateProcessor.processUpdate(
+          mergedRecord.getRecordKey(), null, mergedRecord, 
mergedRecord.isDelete());
+      if (nextRecord != null) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private BufferedRecord<T> nextMergedRecord() {
+    BufferedRecord<T> firstRecord = readers.peekWinner();
+    String recordKey = firstRecord.getRecordKey();
+    BufferedRecord<T> mergedRecord = null;
+    while (!readers.isEmpty() && 
recordKey.equals(readers.peekWinner().getRecordKey())) {
+      mergedRecord = merge(mergedRecord, readers.popWinner());
+    }
+    return mergedRecord;
+  }
+
+  private BufferedRecord<T> merge(BufferedRecord<T> existingRecord, 
BufferedRecord<T> newRecord) {
+    if (existingRecord == null) {
+      return newRecord;
+    }
+    try {
+      return bufferedRecordMerger.deltaMerge(newRecord, 
existingRecord).orElse(existingRecord);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to merge LSM records for key " + 
newRecord.getRecordKey(), e);
+    }
+  }
+
+  @Override
+  public BufferedRecord<T> next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    BufferedRecord<T> record = nextRecord;
+    nextRecord = null;
+    return record;
+  }
+
+  @Override
+  public void close() {
+    readers.close();
+  }
+
+  private enum State {
+    WINNER_WITH_NEW_KEY,
+    WINNER_WITH_SAME_KEY,
+    WINNER_POPPED,
+    LOSER_WITH_NEW_KEY,
+    LOSER_WITH_SAME_KEY,
+    LOSER_POPPED
+  }
+
+  /**
+   * Loser-tree state machine for k-way merging. Each leaf keeps one active 
record from
+   * one sorted input stream; {@code tree[0]} stores the current champion and 
internal
+   * nodes store the loser from the corresponding tournament match.
+   */
+  private static class LoserTree<T> {
+    private final List<ReaderState<T>> leaves;
+    private final int leafBase;
+    private final int[] tree;
+    private final int[] winners;
+
+    private LoserTree(List<ReaderState<T>> leaves) {
+      this.leaves = leaves;
+      this.leafBase = nextPowerOfTwo(Math.max(1, leaves.size()));
+      this.tree = new int[leafBase];
+      this.winners = new int[leafBase << 1];
+      Arrays.fill(tree, -1);
+      Arrays.fill(winners, -1);
+      build();
+    }
+
+    private void build() {
+      for (int i = 0; i < leaves.size(); i++) {
+        winners[leafBase + i] = leaves.get(i).current == null ? -1 : i;
+      }
+      if (leafBase == 1) {
+        tree[0] = winners[leafBase];
+      } else {
+        for (int node = leafBase - 1; node > 0; node--) {
+          replay(node);
+        }
+      }
+      setChampionState(null);
+    }
+
+    private boolean isEmpty() {
+      return tree[0] < 0;
+    }
+
+    private BufferedRecord<T> peekWinner() {
+      int winnerIndex = tree[0];
+      return winnerIndex < 0 ? null : leaves.get(winnerIndex).current;
+    }
+
+    private BufferedRecord<T> popWinner() {
+      int winnerIndex = tree[0];
+      ReaderState<T> winner = leaves.get(winnerIndex);
+      BufferedRecord<T> record = winner.current;
+      String recordKey = record.getRecordKey();
+      winner.state = State.WINNER_POPPED;
+      winner.firstSameKeyIndex = -1;
+      if (!winner.advance()) {
+        winner.state = State.LOSER_POPPED;
+        winner.close();
+      }
+      update(winnerIndex, recordKey);
+      return record;
+    }
+
+    private void update(int leafIndex, String poppedKey) {
+      winners[leafBase + leafIndex] = leaves.get(leafIndex).current == null ? 
-1 : leafIndex;
+      if (leafBase == 1) {
+        tree[0] = winners[leafBase];
+        setChampionState(poppedKey);
+        return;
+      }
+      int node = (leafBase + leafIndex) >> 1;
+      while (node > 0) {
+        replay(node);
+        node >>= 1;
+      }
+      setChampionState(poppedKey);
+    }
+
+    private void replay(int node) {
+      int left = winners[node << 1];
+      int right = winners[(node << 1) + 1];
+      if (left < 0 && right < 0) {
+        winners[node] = -1;
+        tree[node] = -1;
+      } else if (left < 0) {
+        winners[node] = right;
+        tree[node] = -1;
+      } else if (right < 0) {
+        winners[node] = left;
+        tree[node] = -1;
+      } else {
+        int compareResult = compare(left, right);
+        if (compareResult <= 0) {
+          winners[node] = left;
+          tree[node] = right;
+          markLoser(right, left, compareResult);
+        } else {
+          winners[node] = right;
+          tree[node] = left;
+          markLoser(left, right, compareResult);
+        }
+      }
+      if (node == 1) {
+        tree[0] = winners[node];
+      }
+    }
+
+    private int compare(int leftIndex, int rightIndex) {
+      ReaderState<T> left = leaves.get(leftIndex);
+      ReaderState<T> right = leaves.get(rightIndex);
+      int keyCompare = 
left.current.getRecordKey().compareTo(right.current.getRecordKey());
+      if (keyCompare != 0) {
+        return keyCompare;
+      }
+      // Process older sources first so the regular merger sees later sources 
last.
+      // This preserves HoodieFileGroupReader tie semantics when ordering 
values are equal:
+      // base < older log instant/version < newer log instant/version.
+      return Integer.compare(left.mergeOrder, right.mergeOrder);
+    }
+
+    private void markLoser(int loserIndex, int winnerIndex, int compareResult) 
{

Review Comment:
   🤖 nit: `compareResult` is accepted here but never referenced in the method 
body — could you drop it from the signature? The `sameKey` check re-derives the 
relationship directly from the keys, so the parameter is dead weight that makes 
callers look like they're communicating something meaningful to `markLoser`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/SpillableLsmRecordIterator.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.engine.RecordContext;
+import org.apache.hudi.common.serialization.CustomSerializer;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/**
+ * Sequential disk-backed iterator for sorted LSM inputs.
+ *
+ * <p>The source iterator is drained into a length-prefixed spill file and 
closed. The resulting
+ * iterator reads the records back sequentially, which matches the loser-tree 
access pattern.
+ */
+class SpillableLsmRecordIterator<T> implements 
ClosableIterator<BufferedRecord<T>> {
+
+  private static final int BUFFER_SIZE = 128 * 1024;
+  private static final String SPILL_FILE_PREFIX = "hudi-lsm-";
+  private static final String SPILL_FILE_SUFFIX = ".spill";
+
+  private final CustomSerializer<BufferedRecord<T>> serializer;
+  private final RecordContext<T> recordContext;
+  private final File spillFile;
+  private final long recordCount;
+  private DataInputStream inputStream;
+  private long recordsRead;
+  private BufferedRecord<T> nextRecord;
+  private boolean closed;
+
+  SpillableLsmRecordIterator(ClosableIterator<BufferedRecord<T>> 
sourceIterator,
+                             CustomSerializer<BufferedRecord<T>> serializer,
+                             RecordContext<T> recordContext,
+                             String spillBasePath) {
+    this.serializer = serializer;
+    this.recordContext = recordContext;
+    try {
+      Path spillDirectory = Paths.get(spillBasePath);
+      Files.createDirectories(spillDirectory);
+      this.spillFile = Files.createTempFile(spillDirectory, SPILL_FILE_PREFIX, 
SPILL_FILE_SUFFIX).toFile();
+      this.spillFile.deleteOnExit();

Review Comment:
   🤖 If `spill(...)` throws (or `Files.createTempFile` does), and 
`sourceIterator.close()` in the finally also throws, the original exception is 
replaced and lost. Worth catching/suppressing the close exception so the 
underlying IOException surfaces — debugging "failed to spill" is much easier 
with the real cause.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -79,6 +78,8 @@ public class FSUtils {
   public static final String PATH_SEPARATOR = "/";
   public static final Pattern LOG_FILE_PATTERN =
       
Pattern.compile("^\\.([^._]+)_([^.]*)\\.(log|archive)\\.(\\d+)(_((\\d+)-(\\d+)-(\\d+))(\\.cdc)?)?$");
+  public static final Pattern NATIVE_LOG_FILE_PATTERN =

Review Comment:
   🤖 The pattern uses `[^_]+` for the file id, but Hudi file ids elsewhere in 
`BASE_FILE_PATTERN` allow `[a-zA-Z0-9-]+` and `LOG_FILE_PATTERN` allows 
`[^._]+`. Are we sure native log file ids will never contain `.` or other 
special characters? Tightening this to `[a-zA-Z0-9-]+` (or `[^._]+` for 
consistency with the existing log pattern) would avoid accidental matches 
against unrelated `.parquet` paths.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.table.read.UpdateProcessor;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.OrderingValues;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.UnaryOperator;
+
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD;
+import static 
org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent;
+import static 
org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath;
+
+/**
+ * Streaming sorted-merge reader for LSM file groups whose delta files are 
parquet files.
+ *
+ * <p>Each input file is expected to be sorted by record key. The iterator 
keeps one record from
+ * each file in memory, merges all versions for the same key with the regular 
file-group reader
+ * merge semantics, and emits the final row.
+ */
+public class LsmFileGroupRecordIterator<T> implements 
ClosableIterator<BufferedRecord<T>> {
+
+  private static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key";
+  private static final HoodieSchema DELETE_LOG_SCHEMA = HoodieSchema.parse(
+      "{"
+          + "\"type\":\"record\","
+          + "\"name\":\"hudi_delete_log_record\","
+          + "\"fields\":["
+          + "{\"name\":\"record_key\",\"type\":\"string\"},"
+          + 
"{\"name\":\"partition_path\",\"type\":[\"null\",\"string\"],\"default\":null},"
+          + 
"{\"name\":\"ordering_val\",\"type\":[\"null\",\"bytes\"],\"default\":null}"
+          + "]"
+          + "}");
+
+  private final HoodieReaderContext<T> readerContext;
+  private final HoodieStorage storage;
+  private final InputSplit inputSplit;
+  private final HoodieSchema readerSchema;
+  private final List<String> orderingFieldNames;
+  private final boolean includeBaseFile;
+  private final BufferedRecordMerger<T> bufferedRecordMerger;
+  private final UpdateProcessor<T> updateProcessor;
+  private final LoserTree<T> readers;
+  private final int spillThreshold;
+  private final String spillBasePath;
+  private BufferedRecord<T> nextRecord;
+
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback) throws IOException {
+    this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, 
props, readerParameters, readStats, fileGroupUpdateCallback, true);
+  }
+
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback,
+                                    boolean includeBaseFile) throws 
IOException {
+    this.readerContext = readerContext;
+    this.storage = storage;
+    this.inputSplit = inputSplit;
+    this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
+    this.orderingFieldNames = orderingFieldNames;
+    this.includeBaseFile = includeBaseFile;
+    this.bufferedRecordMerger = BufferedRecordMergerFactory.create(
+        readerContext, readerContext.getMergeMode(), false, 
readerContext.getRecordMerger(),
+        readerSchema, readerContext.getPayloadClasses(props), props, 
metaClient.getTableConfig().getPartialUpdateMode());
+    this.updateProcessor = UpdateProcessor.create(readStats, readerContext, 
readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props);
+    this.spillThreshold = 
props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), 
LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue());
+    this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), 
getDefaultSpillableMapBasePath());
+    this.readers = new LoserTree<>(initializeReaders());
+  }
+
+  private List<ReaderState<T>> initializeReaders() throws IOException {
+    List<ReaderState<T>> readerStates = new ArrayList<>();
+    int mergeOrder = 0;
+    if (includeBaseFile && inputSplit.getBaseFileOption().isPresent()) {
+      addReader(readerStates, mergeOrder++, 
createBaseFileIterator(inputSplit.getBaseFileOption().get()));
+    }
+    for (HoodieLogFile logFile : inputSplit.getLogFiles()) {
+      ClosableIterator<BufferedRecord<T>> iterator = 
createFileIterator(logFile.getPathInfo(), logFile.getPath(), 
logFile.getFileSize());
+      addReader(readerStates, mergeOrder, maybeSpillIterator(mergeOrder, 
iterator));
+      mergeOrder++;
+    }
+    return readerStates;
+  }
+
+  private ClosableIterator<BufferedRecord<T>> maybeSpillIterator(int 
mergeOrder,
+                                                                 
ClosableIterator<BufferedRecord<T>> iterator) {
+    if (mergeOrder < spillThreshold) {
+      return iterator;
+    }
+    return new SpillableLsmRecordIterator<>(iterator, 
readerContext.getRecordSerializer(), readerContext.getRecordContext(), 
spillBasePath);
+  }
+
+  private void addReader(List<ReaderState<T>> readerStates, int mergeOrder, 
ClosableIterator<BufferedRecord<T>> iterator) {
+    ReaderState<T> readerState = new ReaderState<>(mergeOrder, iterator);
+    if (readerState.advance()) {
+      readerStates.add(readerState);
+    } else {
+      readerState.close();
+    }
+  }
+
+  private ClosableIterator<BufferedRecord<T>> 
createBaseFileIterator(HoodieBaseFile baseFile) throws IOException {
+    BaseFile file = baseFile.getBootstrapBaseFile().orElse(baseFile);
+    return createFileIterator(file.getPathInfo(), file.getStoragePath(), 
file.getFileSize());
+  }
+
+  private ClosableIterator<BufferedRecord<T>> 
createFileIterator(StoragePathInfo pathInfo,
+                                                                 StoragePath 
path,
+                                                                 long 
fileSize) throws IOException {
+    StoragePath storagePath = pathInfo != null ? pathInfo.getPath() : path;
+    if (FSUtils.isNativeDeleteLogFile(storagePath.getName())) {
+      return createNativeDeleteLogIterator(pathInfo, storagePath, fileSize);
+    }
+    Pair<HoodieSchema, Map<String, String>> requiredSchemaAndRenamedColumns =
+        
readerContext.getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(storagePath);
+    HoodieSchema fileRequiredSchema = 
requiredSchemaAndRenamedColumns.getLeft();
+    ClosableIterator<T> recordIterator;
+    if (pathInfo != null) {
+      recordIterator = readerContext.getFileRecordIterator(
+          pathInfo, 0, pathInfo.getLength(), 
readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage);
+    } else {
+      long length = fileSize >= 0 ? fileSize : 
storage.getPathInfo(storagePath).getLength();
+      recordIterator = readerContext.getFileRecordIterator(
+          storagePath, 0, length, 
readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage);
+    }
+    if (!areSchemasProjectionEquivalent(fileRequiredSchema, readerSchema) || 
!requiredSchemaAndRenamedColumns.getRight().isEmpty()) {
+      UnaryOperator<T> projector = readerContext.getRecordContext()
+          .projectRecord(fileRequiredSchema, readerSchema, 
requiredSchemaAndRenamedColumns.getRight());
+      recordIterator = new CloseableMappingIterator<>(recordIterator, 
projector);
+    }
+    if (readerContext.getInstantRange().isPresent()) {
+      recordIterator = readerContext.applyInstantRangeFilter(recordIterator);
+    }
+    return new CloseableMappingIterator<>(recordIterator, record -> 
BufferedRecords.fromEngineRecord(
+        readerContext.getRecordContext().seal(record),
+        readerSchema,
+        readerContext.getRecordContext(),
+        orderingFieldNames,

Review Comment:
   🤖 The `State` enum values, `ReaderState.state`, and 
`ReaderState.firstSameKeyIndex` look like they're written by `popWinner` / 
`markLoser` / `setChampionState` but never read by the merge loop or anywhere 
else. If they're intended for a future optimization (e.g., skipping the inner 
merge loop when `WINNER_WITH_NEW_KEY`), worth either wiring that up or removing 
them to keep the loser-tree implementation easier to follow.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/lsm/LsmFileGroupRecordIterator.java:
##########
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read.lsm;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.read.BaseFileUpdateCallback;
+import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.BufferedRecordMerger;
+import org.apache.hudi.common.table.read.BufferedRecordMergerFactory;
+import org.apache.hudi.common.table.read.BufferedRecords;
+import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.InputSplit;
+import org.apache.hudi.common.table.read.ReaderParameters;
+import org.apache.hudi.common.table.read.UpdateProcessor;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.OrderingValues;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.function.UnaryOperator;
+
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.LSM_SORT_MERGE_SPILL_THRESHOLD;
+import static 
org.apache.hudi.common.schema.HoodieSchemaCompatibility.areSchemasProjectionEquivalent;
+import static 
org.apache.hudi.io.util.FileIOUtils.getDefaultSpillableMapBasePath;
+
+/**
+ * Streaming sorted-merge reader for LSM file groups whose delta files are 
parquet files.
+ *
+ * <p>Each input file is expected to be sorted by record key. The iterator 
keeps one record from
+ * each file in memory, merges all versions for the same key with the regular 
file-group reader
+ * merge semantics, and emits the final row.
+ */
+public class LsmFileGroupRecordIterator<T> implements 
ClosableIterator<BufferedRecord<T>> {
+
+  private static final String DELETE_LOG_RECORD_KEY_FIELD = "record_key";
+  private static final HoodieSchema DELETE_LOG_SCHEMA = HoodieSchema.parse(
+      "{"
+          + "\"type\":\"record\","
+          + "\"name\":\"hudi_delete_log_record\","
+          + "\"fields\":["
+          + "{\"name\":\"record_key\",\"type\":\"string\"},"
+          + 
"{\"name\":\"partition_path\",\"type\":[\"null\",\"string\"],\"default\":null},"
+          + 
"{\"name\":\"ordering_val\",\"type\":[\"null\",\"bytes\"],\"default\":null}"
+          + "]"
+          + "}");
+
+  private final HoodieReaderContext<T> readerContext;
+  private final HoodieStorage storage;
+  private final InputSplit inputSplit;
+  private final HoodieSchema readerSchema;
+  private final List<String> orderingFieldNames;
+  private final boolean includeBaseFile;
+  private final BufferedRecordMerger<T> bufferedRecordMerger;
+  private final UpdateProcessor<T> updateProcessor;
+  private final LoserTree<T> readers;
+  private final int spillThreshold;
+  private final String spillBasePath;
+  private BufferedRecord<T> nextRecord;
+
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback) throws IOException {
+    this(readerContext, storage, inputSplit, orderingFieldNames, metaClient, 
props, readerParameters, readStats, fileGroupUpdateCallback, true);
+  }
+
+  public LsmFileGroupRecordIterator(HoodieReaderContext<T> readerContext,
+                                    HoodieStorage storage,
+                                    InputSplit inputSplit,
+                                    List<String> orderingFieldNames,
+                                    HoodieTableMetaClient metaClient,
+                                    TypedProperties props,
+                                    ReaderParameters readerParameters,
+                                    HoodieReadStats readStats,
+                                    Option<BaseFileUpdateCallback<T>> 
fileGroupUpdateCallback,
+                                    boolean includeBaseFile) throws 
IOException {
+    this.readerContext = readerContext;
+    this.storage = storage;
+    this.inputSplit = inputSplit;
+    this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
+    this.orderingFieldNames = orderingFieldNames;
+    this.includeBaseFile = includeBaseFile;
+    this.bufferedRecordMerger = BufferedRecordMergerFactory.create(
+        readerContext, readerContext.getMergeMode(), false, 
readerContext.getRecordMerger(),
+        readerSchema, readerContext.getPayloadClasses(props), props, 
metaClient.getTableConfig().getPartialUpdateMode());
+    this.updateProcessor = UpdateProcessor.create(readStats, readerContext, 
readerParameters.isEmitDeletes(), fileGroupUpdateCallback, props);
+    this.spillThreshold = 
props.getInteger(LSM_SORT_MERGE_SPILL_THRESHOLD.key(), 
LSM_SORT_MERGE_SPILL_THRESHOLD.defaultValue());
+    this.spillBasePath = props.getString(SPILLABLE_MAP_BASE_PATH.key(), 
getDefaultSpillableMapBasePath());
+    this.readers = new LoserTree<>(initializeReaders());
+  }
+
+  private List<ReaderState<T>> initializeReaders() throws IOException {
+    List<ReaderState<T>> readerStates = new ArrayList<>();
+    int mergeOrder = 0;
+    if (includeBaseFile && inputSplit.getBaseFileOption().isPresent()) {
+      addReader(readerStates, mergeOrder++, 
createBaseFileIterator(inputSplit.getBaseFileOption().get()));
+    }
+    for (HoodieLogFile logFile : inputSplit.getLogFiles()) {
+      ClosableIterator<BufferedRecord<T>> iterator = 
createFileIterator(logFile.getPathInfo(), logFile.getPath(), 
logFile.getFileSize());
+      addReader(readerStates, mergeOrder, maybeSpillIterator(mergeOrder, 
iterator));
+      mergeOrder++;
+    }
+    return readerStates;
+  }
+
+  private ClosableIterator<BufferedRecord<T>> maybeSpillIterator(int 
mergeOrder,
+                                                                 
ClosableIterator<BufferedRecord<T>> iterator) {
+    if (mergeOrder < spillThreshold) {
+      return iterator;
+    }
+    return new SpillableLsmRecordIterator<>(iterator, 
readerContext.getRecordSerializer(), readerContext.getRecordContext(), 
spillBasePath);
+  }
+
+  private void addReader(List<ReaderState<T>> readerStates, int mergeOrder, 
ClosableIterator<BufferedRecord<T>> iterator) {
+    ReaderState<T> readerState = new ReaderState<>(mergeOrder, iterator);
+    if (readerState.advance()) {
+      readerStates.add(readerState);
+    } else {
+      readerState.close();
+    }
+  }
+
+  private ClosableIterator<BufferedRecord<T>> 
createBaseFileIterator(HoodieBaseFile baseFile) throws IOException {
+    BaseFile file = baseFile.getBootstrapBaseFile().orElse(baseFile);
+    return createFileIterator(file.getPathInfo(), file.getStoragePath(), 
file.getFileSize());
+  }
+
+  private ClosableIterator<BufferedRecord<T>> 
createFileIterator(StoragePathInfo pathInfo,
+                                                                 StoragePath 
path,
+                                                                 long 
fileSize) throws IOException {
+    StoragePath storagePath = pathInfo != null ? pathInfo.getPath() : path;
+    if (FSUtils.isNativeDeleteLogFile(storagePath.getName())) {
+      return createNativeDeleteLogIterator(pathInfo, storagePath, fileSize);
+    }
+    Pair<HoodieSchema, Map<String, String>> requiredSchemaAndRenamedColumns =
+        
readerContext.getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(storagePath);
+    HoodieSchema fileRequiredSchema = 
requiredSchemaAndRenamedColumns.getLeft();
+    ClosableIterator<T> recordIterator;
+    if (pathInfo != null) {
+      recordIterator = readerContext.getFileRecordIterator(
+          pathInfo, 0, pathInfo.getLength(), 
readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage);
+    } else {
+      long length = fileSize >= 0 ? fileSize : 
storage.getPathInfo(storagePath).getLength();
+      recordIterator = readerContext.getFileRecordIterator(
+          storagePath, 0, length, 
readerContext.getSchemaHandler().getTableSchema(), fileRequiredSchema, storage);
+    }
+    if (!areSchemasProjectionEquivalent(fileRequiredSchema, readerSchema) || 
!requiredSchemaAndRenamedColumns.getRight().isEmpty()) {
+      UnaryOperator<T> projector = readerContext.getRecordContext()
+          .projectRecord(fileRequiredSchema, readerSchema, 
requiredSchemaAndRenamedColumns.getRight());
+      recordIterator = new CloseableMappingIterator<>(recordIterator, 
projector);
+    }
+    if (readerContext.getInstantRange().isPresent()) {
+      recordIterator = readerContext.applyInstantRangeFilter(recordIterator);
+    }
+    return new CloseableMappingIterator<>(recordIterator, record -> 
BufferedRecords.fromEngineRecord(
+        readerContext.getRecordContext().seal(record),
+        readerSchema,
+        readerContext.getRecordContext(),
+        orderingFieldNames,
+        readerContext.getRecordContext().isDeleteRecord(record, 
readerContext.getSchemaHandler().getDeleteContext().withReaderSchema(readerSchema))));
+  }
+
+  private ClosableIterator<BufferedRecord<T>> 
createNativeDeleteLogIterator(StoragePathInfo pathInfo,
+                                                                            
StoragePath storagePath,
+                                                                            
long fileSize) throws IOException {
+    ClosableIterator<T> recordIterator;
+    if (pathInfo != null) {
+      recordIterator = readerContext.getFileRecordIterator(
+          pathInfo, 0, pathInfo.getLength(), DELETE_LOG_SCHEMA, 
DELETE_LOG_SCHEMA, storage);
+    } else {
+      long length = fileSize >= 0 ? fileSize : 
storage.getPathInfo(storagePath).getLength();
+      recordIterator = readerContext.getFileRecordIterator(
+          storagePath, 0, length, DELETE_LOG_SCHEMA, DELETE_LOG_SCHEMA, 
storage);
+    }
+    return new CloseableMappingIterator<>(recordIterator, record -> {
+      Object recordKey = readerContext.getRecordContext().getValue(record, 
DELETE_LOG_SCHEMA, DELETE_LOG_RECORD_KEY_FIELD);
+      return BufferedRecords.createDelete(recordKey.toString(), 
OrderingValues.getDefault());
+    });
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (nextRecord != null) {
+      return true;
+    }
+    while (!readers.isEmpty()) {
+      BufferedRecord<T> mergedRecord = nextMergedRecord();
+      nextRecord = updateProcessor.processUpdate(
+          mergedRecord.getRecordKey(), null, mergedRecord, 
mergedRecord.isDelete());
+      if (nextRecord != null) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private BufferedRecord<T> nextMergedRecord() {
+    BufferedRecord<T> firstRecord = readers.peekWinner();
+    String recordKey = firstRecord.getRecordKey();
+    BufferedRecord<T> mergedRecord = null;
+    while (!readers.isEmpty() && 
recordKey.equals(readers.peekWinner().getRecordKey())) {
+      mergedRecord = merge(mergedRecord, readers.popWinner());
+    }
+    return mergedRecord;
+  }
+
+  private BufferedRecord<T> merge(BufferedRecord<T> existingRecord, 
BufferedRecord<T> newRecord) {
+    if (existingRecord == null) {
+      return newRecord;
+    }
+    try {
+      return bufferedRecordMerger.deltaMerge(newRecord, 
existingRecord).orElse(existingRecord);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to merge LSM records for key " + 
newRecord.getRecordKey(), e);
+    }
+  }
+
+  @Override
+  public BufferedRecord<T> next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    BufferedRecord<T> record = nextRecord;
+    nextRecord = null;
+    return record;
+  }
+
+  @Override
+  public void close() {
+    readers.close();
+  }
+
+  private enum State {
+    WINNER_WITH_NEW_KEY,
+    WINNER_WITH_SAME_KEY,
+    WINNER_POPPED,
+    LOSER_WITH_NEW_KEY,
+    LOSER_WITH_SAME_KEY,
+    LOSER_POPPED
+  }
+
+  /**
+   * Loser-tree state machine for k-way merging. Each leaf keeps one active 
record from
+   * one sorted input stream; {@code tree[0]} stores the current champion and 
internal
+   * nodes store the loser from the corresponding tournament match.
+   */
+  private static class LoserTree<T> {
+    private final List<ReaderState<T>> leaves;
+    private final int leafBase;
+    private final int[] tree;
+    private final int[] winners;
+
+    private LoserTree(List<ReaderState<T>> leaves) {
+      this.leaves = leaves;
+      this.leafBase = nextPowerOfTwo(Math.max(1, leaves.size()));
+      this.tree = new int[leafBase];
+      this.winners = new int[leafBase << 1];
+      Arrays.fill(tree, -1);
+      Arrays.fill(winners, -1);
+      build();
+    }
+
+    private void build() {
+      for (int i = 0; i < leaves.size(); i++) {
+        winners[leafBase + i] = leaves.get(i).current == null ? -1 : i;
+      }
+      if (leafBase == 1) {
+        tree[0] = winners[leafBase];
+      } else {
+        for (int node = leafBase - 1; node > 0; node--) {
+          replay(node);
+        }
+      }
+      setChampionState(null);
+    }
+
+    private boolean isEmpty() {
+      return tree[0] < 0;
+    }
+
+    private BufferedRecord<T> peekWinner() {
+      int winnerIndex = tree[0];
+      return winnerIndex < 0 ? null : leaves.get(winnerIndex).current;
+    }
+
+    private BufferedRecord<T> popWinner() {
+      int winnerIndex = tree[0];
+      ReaderState<T> winner = leaves.get(winnerIndex);
+      BufferedRecord<T> record = winner.current;
+      String recordKey = record.getRecordKey();
+      winner.state = State.WINNER_POPPED;
+      winner.firstSameKeyIndex = -1;
+      if (!winner.advance()) {
+        winner.state = State.LOSER_POPPED;
+        winner.close();
+      }
+      update(winnerIndex, recordKey);
+      return record;
+    }
+
+    private void update(int leafIndex, String poppedKey) {
+      winners[leafBase + leafIndex] = leaves.get(leafIndex).current == null ? 
-1 : leafIndex;
+      if (leafBase == 1) {
+        tree[0] = winners[leafBase];
+        setChampionState(poppedKey);
+        return;
+      }
+      int node = (leafBase + leafIndex) >> 1;
+      while (node > 0) {
+        replay(node);
+        node >>= 1;
+      }
+      setChampionState(poppedKey);
+    }
+
+    private void replay(int node) {
+      int left = winners[node << 1];
+      int right = winners[(node << 1) + 1];
+      if (left < 0 && right < 0) {
+        winners[node] = -1;
+        tree[node] = -1;
+      } else if (left < 0) {
+        winners[node] = right;
+        tree[node] = -1;
+      } else if (right < 0) {
+        winners[node] = left;
+        tree[node] = -1;
+      } else {
+        int compareResult = compare(left, right);
+        if (compareResult <= 0) {
+          winners[node] = left;
+          tree[node] = right;
+          markLoser(right, left, compareResult);
+        } else {
+          winners[node] = right;
+          tree[node] = left;
+          markLoser(left, right, compareResult);
+        }
+      }
+      if (node == 1) {
+        tree[0] = winners[node];
+      }
+    }
+
+    private int compare(int leftIndex, int rightIndex) {
+      ReaderState<T> left = leaves.get(leftIndex);
+      ReaderState<T> right = leaves.get(rightIndex);
+      int keyCompare = 
left.current.getRecordKey().compareTo(right.current.getRecordKey());
+      if (keyCompare != 0) {
+        return keyCompare;
+      }
+      // Process older sources first so the regular merger sees later sources 
last.
+      // This preserves HoodieFileGroupReader tie semantics when ordering 
values are equal:
+      // base < older log instant/version < newer log instant/version.
+      return Integer.compare(left.mergeOrder, right.mergeOrder);
+    }
+
+    private void markLoser(int loserIndex, int winnerIndex, int compareResult) 
{
+      ReaderState<T> loser = leaves.get(loserIndex);
+      boolean sameKey = 
leaves.get(loserIndex).current.getRecordKey().equals(leaves.get(winnerIndex).current.getRecordKey());

Review Comment:
   🤖 nit: `loser` already holds `leaves.get(loserIndex)`, so 
`leaves.get(loserIndex).current.getRecordKey()` on this line could just be 
`loser.current.getRecordKey()` — a bit easier to read.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to