This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch bump-trino-plugin-to-1.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 8441f80e4b295123c0fd0ab706d4ae8ef1098fb6
Author: voon <[email protected]>
AuthorDate: Wed May 13 12:17:55 2026 +0800

    chore: Bump hudi-trino-plugin to 1.1.0
---
 hudi-trino-plugin/pom.xml                          |  25 ++-
 .../java/io/trino/plugin/hudi/HudiMetadata.java    |   2 +
 .../java/io/trino/plugin/hudi/HudiPageSource.java  |  76 ++++++---
 .../trino/plugin/hudi/HudiPageSourceProvider.java  |  37 ++---
 .../java/io/trino/plugin/hudi/HudiSplitSource.java |   8 +-
 .../java/io/trino/plugin/hudi/HudiTableHandle.java |  15 +-
 .../main/java/io/trino/plugin/hudi/HudiUtil.java   | 121 +++++++++++---
 .../plugin/hudi/io/HudiTrinoFileReaderFactory.java |  25 ++-
 .../hudi/io/InlineSeekableDataInputStream.java     |   2 +-
 .../query/index/HudiColumnStatsIndexSupport.java   |  10 +-
 .../index/HudiPartitionStatsIndexSupport.java      |   9 +-
 .../query/index/HudiRecordLevelIndexSupport.java   |   4 +-
 .../query/index/HudiSecondaryIndexSupport.java     |   4 +-
 .../plugin/hudi/reader/HudiTrinoReaderContext.java | 126 ++++----------
 .../trino/plugin/hudi/reader/HudiTrinoRecord.java  | 183 ---------------------
 .../plugin/hudi/stats/TableMetadataReader.java     |  86 +++-------
 .../plugin/hudi/stats/TableStatisticsReader.java   |   4 +-
 .../hudi/TestHudiAlluxioCacheFileOperations.java   |  78 +++------
 .../hudi/TestHudiMemoryCacheFileOperations.java    |  77 +++------
 .../plugin/hudi/TestHudiNoCacheFileOperations.java |  77 +++------
 .../io/trino/plugin/hudi/TestHudiSmokeTest.java    |   2 +-
 .../plugin/hudi/split/TestHudiSplitFactory.java    |   1 +
 .../hudi/testing/TpchHudiTablesInitializer.java    |  10 +-
 .../plugin/hudi/util/FileOperationAssertions.java  | 143 ++++++++++++++++
 24 files changed, 524 insertions(+), 601 deletions(-)

diff --git a/hudi-trino-plugin/pom.xml b/hudi-trino-plugin/pom.xml
index c67ab6a07f51..5809c60e98c5 100644
--- a/hudi-trino-plugin/pom.xml
+++ b/hudi-trino-plugin/pom.xml
@@ -16,13 +16,15 @@
 
     <properties>
         <air.compiler.fail-warnings>true</air.compiler.fail-warnings>
-        <dep.hudi.version>1.0.2</dep.hudi.version>
+        <dep.hudi.version>1.1.0</dep.hudi.version>
         <trino.parquet.version>1.15.2</trino.parquet.version>
     </properties>
 
     <dependencies>
         <dependency>
-            <!--Used to test execution in task executor after de-serializing-->
+            <!-- Required for compilation: HoodieRecord implements 
KryoSerializable,
+                 so the compiler needs Kryo on classpath to verify the class 
hierarchy
+                 when loading HoodieRecord for static imports -->
             <groupId>com.esotericsoftware</groupId>
             <artifactId>kryo</artifactId>
             <version>4.0.2</version>
@@ -169,6 +171,7 @@
             <groupId>org.apache.hudi</groupId>
             <artifactId>hudi-io</artifactId>
             <version>${dep.hudi.version}</version>
+            <classifier>shaded</classifier>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.protobuf</groupId>
@@ -470,13 +473,31 @@
                         <!-- org.apache.hudi:hudi-client-common and 
org.apache.hudi:hudi-java-client log4j.properties duplicates -->
                         
<ignoredResourcePattern>log4j.properties</ignoredResourcePattern>
                         
<ignoredResourcePattern>log4j-surefire.properties</ignoredResourcePattern>
+                        <ignoredResource>google/protobuf/.*</ignoredResource>
                     </ignoredResourcePatterns>
+                    <ignoredClassPatterns>
+                        <!-- org.apache.hudi:hudi-hadoop-common bundles 
parquet classes that conflict with org.apache.parquet:parquet-common -->
+                        <!-- ParquetConfiguration class is shaded/bundled 
inside the hudi-hadoop-common JAR, not a transitive dependency, so Maven 
exclusions won't work -->
+                        
<ignoredClassPattern>org.apache.parquet.conf.ParquetConfiguration</ignoredClassPattern>
+                    </ignoredClassPatterns>
                 </configuration>
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <ignoredUnusedDeclaredDependencies>
+                        <!-- Kryo is not directly used but required for 
compilation:
+                             HoodieRecord implements KryoSerializable, so the 
compiler
+                             needs Kryo to verify the class hierarchy -->
+                        
<ignoredUnusedDeclaredDependency>com.esotericsoftware:kryo</ignoredUnusedDeclaredDependency>
+                    </ignoredUnusedDeclaredDependencies>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 </project>
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
index 917c94ea43d9..2613be54ff09 100644
--- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
@@ -91,6 +91,7 @@ import static 
io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY;
 import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY;
 import static io.trino.plugin.hudi.HudiUtil.buildTableMetaClient;
 import static io.trino.plugin.hudi.HudiUtil.getLatestTableSchema;
+import static io.trino.plugin.hudi.HudiUtil.getOrderingColumnHandles;
 import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
 import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
 import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE;
@@ -173,6 +174,7 @@ public class HudiMetadata
                 table.getStorage().getLocation(),
                 hoodieTableType,
                 getPartitionKeyColumnHandles(table, typeManager),
+                Lazy.lazily(() -> getOrderingColumnHandles(table, typeManager, 
lazyMetaClient, NANOSECONDS)),
                 ImmutableSet.of(),
                 TupleDomain.all(),
                 TupleDomain.all(),
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSource.java 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSource.java
index e81887414425..5186341b0a50 100644
--- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSource.java
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSource.java
@@ -23,6 +23,7 @@ import io.trino.spi.connector.ConnectorPageSource;
 import io.trino.spi.metrics.Metrics;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.util.collection.ClosableIterator;
 
 import java.io.IOException;
 import java.util.List;
@@ -41,6 +42,7 @@ public class HudiPageSource
     PageBuilder pageBuilder;
     HudiAvroSerializer avroSerializer;
     List<HiveColumnHandle> columnHandles;
+    ClosableIterator<IndexedRecord> recordIterator;
 
     public HudiPageSource(
             ConnectorPageSource pageSource,
@@ -51,11 +53,29 @@ public class HudiPageSource
     {
         this.pageSource = pageSource;
         this.fileGroupReader = fileGroupReader;
-        this.initFileGroupReader();
         this.readerContext = readerContext;
         this.columnHandles = columnHandles;
         this.pageBuilder = new 
PageBuilder(columnHandles.stream().map(HiveColumnHandle::getType).toList());
         this.avroSerializer = new HudiAvroSerializer(columnHandles, 
synthesizedColumnHandler);
+        try {
+            this.recordIterator = fileGroupReader.getClosableIterator();
+        }
+        catch (IOException e) {
+            // Clean up resources on initialization failure
+            try {
+                fileGroupReader.close();
+            }
+            catch (IOException closeException) {
+                e.addSuppressed(closeException);
+            }
+            try {
+                pageSource.close();
+            }
+            catch (IOException closeException) {
+                e.addSuppressed(closeException);
+            }
+            throw new RuntimeException("Failed to initialize file group 
reader!", e);
+        }
     }
 
     @Override
@@ -79,25 +99,15 @@ public class HudiPageSource
     @Override
     public boolean isFinished()
     {
-        try {
-            return !fileGroupReader.hasNext();
-        }
-        catch (IOException e) {
-            throw new RuntimeException(e);
-        }
+        return !recordIterator.hasNext();
     }
 
     @Override
     public Page getNextPage()
     {
         checkState(pageBuilder.isEmpty(), "PageBuilder is not empty at the 
beginning of a new page");
-        try {
-            while (fileGroupReader.hasNext()) {
-                avroSerializer.buildRecordInPage(pageBuilder, 
fileGroupReader.next());
-            }
-        }
-        catch (IOException e) {
-            throw new RuntimeException(e);
+        while (recordIterator.hasNext()) {
+            avroSerializer.buildRecordInPage(pageBuilder, 
recordIterator.next());
         }
 
         Page newPage = pageBuilder.build();
@@ -115,8 +125,32 @@ public class HudiPageSource
     public void close()
             throws IOException
     {
-        fileGroupReader.close();
-        pageSource.close();
+        IOException closeException = null;
+
+        recordIterator.close();
+
+        try {
+            fileGroupReader.close();
+        }
+        catch (IOException e) {
+            closeException = e;
+        }
+
+        try {
+            pageSource.close();
+        }
+        catch (IOException e) {
+            if (closeException == null) {
+                closeException = e;
+            }
+            else {
+                closeException.addSuppressed(e);
+            }
+        }
+
+        if (closeException != null) {
+            throw closeException;
+        }
     }
 
     @Override
@@ -130,14 +164,4 @@ public class HudiPageSource
     {
         return pageSource.getMetrics();
     }
-
-    protected void initFileGroupReader()
-    {
-        try {
-            this.fileGroupReader.initRecordIterators();
-        }
-        catch (IOException e) {
-            throw new RuntimeException("Failed to initialize file group 
reader!", e);
-        }
-    }
 }
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java
index 9b8411fdf907..1553b8389a0b 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java
@@ -38,8 +38,6 @@ import io.trino.plugin.hive.ReaderColumns;
 import io.trino.plugin.hive.parquet.ParquetReaderConfig;
 import io.trino.plugin.hudi.file.HudiBaseFile;
 import io.trino.plugin.hudi.reader.HudiTrinoReaderContext;
-import io.trino.plugin.hudi.storage.HudiTrinoStorage;
-import io.trino.plugin.hudi.storage.TrinoStorageConfiguration;
 import io.trino.plugin.hudi.util.SynthesizedColumnHandler;
 import io.trino.spi.TrinoException;
 import io.trino.spi.connector.ColumnHandle;
@@ -57,7 +55,6 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.read.HoodieFileGroupReader;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -104,7 +101,7 @@ import static 
io.trino.plugin.hudi.HudiUtil.buildTableMetaClient;
 import static io.trino.plugin.hudi.HudiUtil.constructSchema;
 import static io.trino.plugin.hudi.HudiUtil.convertToFileSlice;
 import static io.trino.plugin.hudi.HudiUtil.getLatestTableSchema;
-import static io.trino.plugin.hudi.HudiUtil.prependHudiMetaColumns;
+import static io.trino.plugin.hudi.HudiUtil.prependHudiMetaAndOrderingColumns;
 import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
 import static java.util.stream.Collectors.toUnmodifiableList;
@@ -184,7 +181,7 @@ public class HudiPageSourceProvider
         // The `columns` list could be empty when count(*) is issued,
         // prepending hoodie meta columns for Hudi split with log files
         // to allow a non-empty dataPageSource to be returned
-        List<HiveColumnHandle> hudiMetaAndDataColumnHandles = 
prependHudiMetaColumns(dataColumnHandles);
+        List<HiveColumnHandle> hudiMetaAndDataColumnHandles = 
prependHudiMetaAndOrderingColumns(hudiTableHandle, dataColumnHandles);
 
         TrinoFileSystem fileSystem = fileSystemFactory.create(session);
         ConnectorPageSource dataPageSource = createPageSource(
@@ -219,8 +216,9 @@ public class HudiPageSourceProvider
         // TODO: Move this into HudiTableHandle
         HoodieTableMetaClient metaClient = buildTableMetaClient(
                 fileSystemFactory.create(session), 
hudiTableHandle.getSchemaTableName().toString(), hudiTableHandle.getBasePath());
-
         HudiTrinoReaderContext readerContext = new HudiTrinoReaderContext(
+                metaClient.getStorageConf(),
+                metaClient.getTableConfig(),
                 dataPageSource,
                 dataColumnHandles,
                 hudiMetaAndDataColumnHandles,
@@ -232,21 +230,18 @@ public class HudiPageSourceProvider
         // Construct an Avro schema for log file reader
         Schema requestedSchema = constructSchema(dataSchema, 
hudiMetaAndDataColumnHandles.stream().map(HiveColumnHandle::getName).toList());
         HoodieFileGroupReader<IndexedRecord> fileGroupReader =
-                new HoodieFileGroupReader<>(
-                        readerContext,
-                        new 
HudiTrinoStorage(fileSystemFactory.create(session), new 
TrinoStorageConfiguration()),
-                        hudiTableHandle.getBasePath(),
-                        hudiTableHandle.getLatestCommitTime(),
-                        convertToFileSlice(hudiSplit, 
hudiTableHandle.getBasePath()),
-                        dataSchema,
-                        requestedSchema,
-                        Option.empty(),
-                        metaClient,
-                        metaClient.getTableConfig().getProps(),
-                        start,
-                        length,
-                        false);
-
+                HoodieFileGroupReader.<IndexedRecord>newBuilder()
+                        .withReaderContext(readerContext)
+                        .withHoodieTableMetaClient(metaClient)
+                        .withFileSlice(convertToFileSlice(hudiSplit, 
hudiTableHandle.getBasePath()))
+                        .withDataSchema(dataSchema)
+                        .withRequestedSchema(requestedSchema)
+                        
.withLatestCommitTime(hudiTableHandle.getLatestCommitTime())
+                        .withProps(metaClient.getTableConfig().getProps())
+                        .withShouldUseRecordPosition(false)
+                        .withStart(start)
+                        .withLength(length)
+                        .build();
         return new HudiPageSource(
                 dataPageSource,
                 fileGroupReader,
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java
index 278ca2e463c7..6ea49d9baaf8 100644
--- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java
@@ -44,6 +44,8 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.util.Lazy;
 
@@ -104,9 +106,9 @@ public class HudiSplitSource
             HoodieTableMetaClient metaClient = tableHandle.getMetaClient();
             HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(metaClient.getStorage().getConf());
 
-            HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(
-                    engineContext,
-                    tableHandle.getMetaClient().getStorage(), metadataConfig, 
metaClient.getBasePath().toString(), true);
+            HoodieTableMetadata tableMetadata = enableMetadataTable && 
tableHandle.getMetaClient().getTableConfig().isMetadataTableAvailable() ?
+                    new HoodieBackedTableMetadata(engineContext, 
tableHandle.getMetaClient().getStorage(), metadataConfig, 
metaClient.getBasePath().toString(), true) :
+                    new FileSystemBackedTableMetadata(engineContext, 
tableHandle.getMetaClient().getStorage(), metaClient.getBasePath().toString());
             log.info("Loaded table metadata for table: %s in %s ms", 
tableHandle.getSchemaTableName(), timer.endTimer());
             return tableMetadata;
         });
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java
index 75ae962286a4..f9f828bcca55 100644
--- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java
@@ -49,6 +49,7 @@ public class HudiTableHandle
     private final String basePath;
     private final HoodieTableType tableType;
     private final List<HiveColumnHandle> partitionColumns;
+    private final Lazy<List<HiveColumnHandle>> lazyOrderingColumns;
     // Used only for validation when config property 
hudi.query-partition-filter-required is enabled
     private final Set<HiveColumnHandle> constraintColumns;
     private final TupleDomain<HiveColumnHandle> partitionPredicates;
@@ -66,12 +67,13 @@ public class HudiTableHandle
             @JsonProperty("basePath") String basePath,
             @JsonProperty("tableType") HoodieTableType tableType,
             @JsonProperty("partitionColumns") List<HiveColumnHandle> 
partitionColumns,
+            @JsonProperty("orderingColumns") List<HiveColumnHandle> 
orderingColumns,
             @JsonProperty("partitionPredicates") TupleDomain<HiveColumnHandle> 
partitionPredicates,
             @JsonProperty("regularPredicates") TupleDomain<HiveColumnHandle> 
regularPredicates,
             @JsonProperty("tableSchemaStr") String tableSchemaStr,
             @JsonProperty("latestCommitTime") String latestCommitTime)
     {
-        this(Optional.empty(), Optional.empty(), schemaName, tableName, 
basePath, tableType, partitionColumns, ImmutableSet.of(),
+        this(Optional.empty(), Optional.empty(), schemaName, tableName, 
basePath, tableType, partitionColumns, Lazy.lazily(() -> orderingColumns), 
ImmutableSet.of(),
                 partitionPredicates, regularPredicates, 
buildTableSchema(tableSchemaStr), () -> latestCommitTime);
     }
 
@@ -83,6 +85,7 @@ public class HudiTableHandle
             String basePath,
             HoodieTableType tableType,
             List<HiveColumnHandle> partitionColumns,
+            Lazy<List<HiveColumnHandle>> lazyOrderingColumns,
             Set<HiveColumnHandle> constraintColumns,
             TupleDomain<HiveColumnHandle> partitionPredicates,
             TupleDomain<HiveColumnHandle> regularPredicates,
@@ -96,6 +99,7 @@ public class HudiTableHandle
                 basePath,
                 tableType,
                 partitionColumns,
+                lazyOrderingColumns,
                 constraintColumns,
                 partitionPredicates,
                 regularPredicates,
@@ -120,6 +124,7 @@ public class HudiTableHandle
             String basePath,
             HoodieTableType tableType,
             List<HiveColumnHandle> partitionColumns,
+            Lazy<List<HiveColumnHandle>> lazyOrderingColumns,
             Set<HiveColumnHandle> constraintColumns,
             TupleDomain<HiveColumnHandle> partitionPredicates,
             TupleDomain<HiveColumnHandle> regularPredicates,
@@ -133,6 +138,7 @@ public class HudiTableHandle
         this.basePath = requireNonNull(basePath, "basePath is null");
         this.tableType = requireNonNull(tableType, "tableType is null");
         this.partitionColumns = requireNonNull(partitionColumns, 
"partitionColumns is null");
+        this.lazyOrderingColumns = requireNonNull(lazyOrderingColumns, 
"lazyOrderingColumns is null");
         this.constraintColumns = requireNonNull(constraintColumns, 
"constraintColumns is null");
         this.partitionPredicates = requireNonNull(partitionPredicates, 
"partitionPredicates is null");
         this.regularPredicates = requireNonNull(regularPredicates, 
"regularPredicates is null");
@@ -248,6 +254,12 @@ public class HudiTableHandle
         return regularPredicates;
     }
 
+    @JsonProperty
+    public List<HiveColumnHandle> getOrderingColumns()
+    {
+        return lazyOrderingColumns.get();
+    }
+
     public SchemaTableName getSchemaTableName()
     {
         return schemaTableName(schemaName, tableName);
@@ -266,6 +278,7 @@ public class HudiTableHandle
                 basePath,
                 tableType,
                 partitionColumns,
+                lazyOrderingColumns,
                 constraintColumns,
                 partitionPredicates.intersect(partitionTupleDomain),
                 regularPredicates.intersect(regularTupleDomain),
diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java
index 8eb10de4cc4c..52e594bbc4be 100644
--- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java
+++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java
@@ -22,10 +22,13 @@ import io.trino.cache.EvictableCacheBuilder;
 import io.trino.filesystem.FileIterator;
 import io.trino.filesystem.Location;
 import io.trino.filesystem.TrinoFileSystem;
+import io.trino.metastore.Column;
 import io.trino.metastore.HivePartition;
 import io.trino.metastore.HiveType;
+import io.trino.metastore.Table;
 import io.trino.plugin.hive.HiveColumnHandle;
 import io.trino.plugin.hive.HivePartitionKey;
+import io.trino.plugin.hive.HiveTimestampPrecision;
 import io.trino.plugin.hive.avro.AvroHiveFileUtils;
 import io.trino.plugin.hudi.storage.HudiTrinoStorage;
 import io.trino.plugin.hudi.storage.TrinoStorageConfiguration;
@@ -35,9 +38,12 @@ import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.predicate.Domain;
 import io.trino.spi.predicate.NullableValue;
 import io.trino.spi.predicate.TupleDomain;
+import io.trino.spi.type.TypeManager;
 import io.trino.spi.type.VarcharType;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -47,18 +53,25 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.util.Lazy;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -66,7 +79,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static io.airlift.slice.SizeOf.estimatedSizeOf;
+import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR;
+import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn;
 import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA;
+import static io.trino.plugin.hive.util.HiveTypeUtil.getType;
+import static io.trino.plugin.hive.util.HiveTypeUtil.typeSupported;
 import static io.trino.plugin.hive.util.HiveUtil.checkCondition;
 import static io.trino.plugin.hive.util.HiveUtil.parsePartitionValue;
 import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMNS;
@@ -77,11 +94,14 @@ import static 
io.trino.plugin.hudi.HudiErrorCode.HUDI_META_CLIENT_ERROR;
 import static io.trino.plugin.hudi.HudiErrorCode.HUDI_SCHEMA_ERROR;
 import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_FILE_FORMAT;
 import static java.lang.Math.toIntExact;
+import static 
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
-import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_META_FIELD_ORD;
 
 public final class HudiUtil
 {
+    public static final List<String> HOODIE_META_COLUMNS =
+            CollectionUtils.createImmutableList(RECORD_KEY_METADATA_FIELD, 
PARTITION_PATH_METADATA_FIELD);
+
     private static final Logger log = Logger.get(HudiUtil.class);
     private static final Cache<Schema, Map<String, Schema.Field>> 
SCHEMA_FIELD_CACHE =
             EvictableCacheBuilder.newBuilder()
@@ -291,7 +311,6 @@ public final class HudiUtil
      *   <li>First, attempts an exact match on the column name.</li>
      *   <li>If not found, falls back to a case-insensitive match using a 
cached lookup table</li>
      * </ul>
-     * <p>
      *
      * @param columnName Column name to search for.
      * @param schema Avro {@link Schema} in which to search.
@@ -321,29 +340,33 @@ public final class HudiUtil
                 "Failed to get column " + columnName + " from table schema");
     }
 
-    public static List<HiveColumnHandle> 
prependHudiMetaColumns(List<HiveColumnHandle> dataColumns)
+    public static List<HiveColumnHandle> 
prependHudiMetaAndOrderingColumns(HudiTableHandle tableHandle, 
List<HiveColumnHandle> dataColumns)
     {
-        //For efficient lookup
-        Set<String> dataColumnNames = dataColumns.stream()
+        Set<String> existingColumns = dataColumns.stream()
                 .map(HiveColumnHandle::getName)
-                .collect(Collectors.toSet());
-
-        // If Hudi record key meta column is already present, return the 
original list
-        if (dataColumnNames.contains(RECORD_KEY_METADATA_FIELD)) {
-            return dataColumns;
-        }
+                .collect(Collectors.toCollection(HashSet::new));
 
         List<HiveColumnHandle> columns = new ArrayList<>();
 
-        // Create and prepend the new HiveColumnHandle for the record key 
column
-        columns.add(new HiveColumnHandle(
-                RECORD_KEY_METADATA_FIELD,
-                RECORD_KEY_META_FIELD_ORD,
-                HiveType.HIVE_STRING,
-                VarcharType.VARCHAR,
-                Optional.empty(),
-                HiveColumnHandle.ColumnType.REGULAR,
-                Optional.empty()));
+        // Add missing Hudi meta columns first
+        for (int i = 0; i < HOODIE_META_COLUMNS.size(); i++) {
+            String metaColumn = HOODIE_META_COLUMNS.get(i);
+            if (existingColumns.add(metaColumn)) { // add() returns false if 
already present
+                columns.add(new HiveColumnHandle(
+                        metaColumn,
+                        i,
+                        HiveType.HIVE_STRING,
+                        VarcharType.VARCHAR,
+                        Optional.empty(),
+                        HiveColumnHandle.ColumnType.REGULAR,
+                        Optional.empty()));
+            }
+        }
+
+        // Add missing ordering columns next
+        tableHandle.getOrderingColumns().stream()
+                .filter(col -> existingColumns.add(col.getName()))
+                .forEach(columns::add);
 
         // Add all the original data columns after the new meta columns
         columns.addAll(dataColumns);
@@ -389,4 +412,62 @@ public final class HudiUtil
             throw new TrinoException(HUDI_FILESYSTEM_ERROR, e);
         }
     }
+
+    public static List<HiveColumnHandle> getOrderingColumnHandles(Table table, 
TypeManager typeManager, Lazy<HoodieTableMetaClient> lazyMetaClient, 
HiveTimestampPrecision timestampPrecision)
+    {
+        RecordMergeMode recordMergeMode = 
lazyMetaClient.get().getTableConfig().getRecordMergeMode();
+        if (Objects.isNull(recordMergeMode) || 
recordMergeMode.equals(RecordMergeMode.COMMIT_TIME_ORDERING)) {
+            // if commit time ordering is enabled, return empty list
+            return Collections.emptyList();
+        }
+
+        ImmutableList.Builder<HiveColumnHandle> columns = 
ImmutableList.builder();
+        List<String> orderingColumnNames = 
lazyMetaClient.get().getTableConfig().getOrderingFields();
+
+        int hiveColumnIndex = 0;
+        for (Column field : table.getDataColumns()) {
+            // ignore unsupported types rather than failing
+            if (orderingColumnNames.contains(field.getName())) {
+                HiveType hiveType = field.getType();
+                if (typeSupported(hiveType.getTypeInfo(), 
table.getStorage().getStorageFormat())) {
+                    columns.add(createBaseColumn(field.getName(), 
hiveColumnIndex, hiveType, getType(hiveType, typeManager, timestampPrecision), 
REGULAR, field.getComment()));
+                }
+            }
+            hiveColumnIndex++;
+        }
+
+        return columns.build();
+    }
+
+    /**
+     * Converts the given {@link HoodiePairData} into a {@link Map}.
+     * <p>
+     * Special handling is applied for null keys:
+     * <ul>
+     *   <li>If a key is null, it is stored in the map as a {@code null} 
entry.</li>
+     *   <li>If multiple entries share the same key (including null), the 
latest value overwrites the previous one.</li>
+     * </ul>
+     *
+     * @param pairData the HoodiePairData containing key-value pairs
+     * @param <K> the type of keys maintained by the resulting map
+     * @param <V> the type of mapped values
+     * @return a {@link Map} containing all key-value pairs from the input data
+     */
+    public static <K, V> Map<K, V> collectAsMap(HoodiePairData<K, V> pairData)
+    {
+        // Map each pair to (Option<Pair.key>, V) to handle null keys uniformly
+        // If there are multiple entries sharing the same key, use the 
incoming one
+        return pairData.mapToPair(pair ->
+                        Pair.of(
+                                Option.ofNullable(pair.getKey()),
+                                pair.getValue()))
+                .collectAsList()
+                .stream()
+                .collect(HashMap::new,
+                        (map, pair) -> {
+                            K key = pair.getKey().orElse(null);
+                            map.put(key, pair.getValue());
+                        },
+                        HashMap::putAll);
+    }
 }
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/HudiTrinoFileReaderFactory.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/HudiTrinoFileReaderFactory.java
index 7e8bb967ffc7..3571de43d79e 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/HudiTrinoFileReaderFactory.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/HudiTrinoFileReaderFactory.java
@@ -16,12 +16,14 @@ package io.trino.plugin.hudi.io;
 import org.apache.avro.Schema;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.storage.HFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieAvroBootstrapFileReader;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 
 import java.io.IOException;
 
@@ -45,7 +47,11 @@ public class HudiTrinoFileReaderFactory
             Option<Schema> schemaOption)
             throws IOException
     {
-        return new HoodieNativeAvroHFileReader(storage, path, schemaOption);
+        HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+                .withStorage(storage).withProps(hoodieConfig.getProps())
+                .withPath(path).build();
+        return HoodieNativeAvroHFileReader.builder()
+                
.readerFactory(readerFactory).path(path).schema(schemaOption).build();
     }
 
     @Override
@@ -56,7 +62,22 @@ public class HudiTrinoFileReaderFactory
             Option<Schema> schemaOption)
             throws IOException
     {
-        return new HoodieNativeAvroHFileReader(this.storage, content, 
schemaOption);
+        HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+                .withStorage(storage).withProps(hoodieConfig.getProps())
+                .withContent(content).build();
+        return HoodieNativeAvroHFileReader.builder()
+                
.readerFactory(readerFactory).path(path).schema(schemaOption).build();
+    }
+
+    @Override
+    protected HoodieFileReader newHFileFileReader(HoodieConfig hoodieConfig, 
StoragePathInfo pathInfo, Option<Schema> schemaOption)
+            throws IOException
+    {
+        HFileReaderFactory readerFactory = HFileReaderFactory.builder()
+                .withStorage(storage).withProps(hoodieConfig.getProps())
+                .withPath(pathInfo.getPath()).build();
+        return HoodieNativeAvroHFileReader.builder()
+                
.readerFactory(readerFactory).path(pathInfo.getPath()).schema(schemaOption).build();
     }
 
     @Override
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java
index f75ec7a55f90..b67b0012905b 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java
@@ -26,7 +26,7 @@ import java.io.IOException;
  * Example InlineFS URL:
  * <pre>
  * 
inlinefs://tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/
- * 
.col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&length=6959
+ * 
.col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&amp;length=6959
  * </pre>
  * <p>
  * Key behaviors:
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiColumnStatsIndexSupport.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiColumnStatsIndexSupport.java
index 0d1c9eaa6cd0..7626272db612 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiColumnStatsIndexSupport.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiColumnStatsIndexSupport.java
@@ -30,13 +30,14 @@ import io.trino.spi.type.Type;
 import io.trino.spi.type.VarcharType;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.util.HoodieTimer;
-import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.metadata.ColumnStatsIndexPrefixRawKey;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.util.Lazy;
@@ -92,9 +93,9 @@ public class HudiColumnStatsIndexSupport
         }
         else {
             // Get filter columns
-            List<String> encodedTargetColumnNames = regularColumns
+            List<ColumnStatsIndexPrefixRawKey> rawKeys = regularColumns
                     .stream()
-                    .map(col -> new 
ColumnIndexID(col).asBase64EncodedString()).collect(Collectors.toList());
+                    .map(ColumnStatsIndexPrefixRawKey::new).toList();
 
             Map<String, Type> columnTypes = 
regularColumnPredicates.getDomains().get().entrySet().stream()
                     .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().getType()));
@@ -107,7 +108,8 @@ public class HudiColumnStatsIndexSupport
                 }
 
                 Map<String, Map<String, Domain>> domainsWithStats =
-                        
lazyTableMetadata.get().getRecordsByKeyPrefixes(encodedTargetColumnNames,
+                        lazyTableMetadata.get().getRecordsByKeyPrefixes(
+                                        HoodieListData.lazy(rawKeys),
                                         
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, true)
                                 .collectAsList()
                                 .stream()
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiPartitionStatsIndexSupport.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiPartitionStatsIndexSupport.java
index 3491d10cccfe..7d1e6907d976 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiPartitionStatsIndexSupport.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiPartitionStatsIndexSupport.java
@@ -21,10 +21,11 @@ import io.trino.spi.predicate.Domain;
 import io.trino.spi.predicate.TupleDomain;
 import io.trino.spi.type.Type;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.HoodieTimer;
-import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.metadata.ColumnStatsIndexPrefixRawKey;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.util.Lazy;
@@ -66,15 +67,15 @@ public class HudiPartitionStatsIndexSupport
         List<String> regularColumns = new 
ArrayList<>(filteredRegularPredicates.getDomains().get().keySet());
 
         // Get columns to filter on
-        List<String> encodedTargetColumnNames = regularColumns.stream()
-                .map(col -> new 
ColumnIndexID(col).asBase64EncodedString()).toList();
+        List<ColumnStatsIndexPrefixRawKey> columnStatsIndexPrefixRawKeys = 
regularColumns.stream()
+                .map(ColumnStatsIndexPrefixRawKey::new).toList();
 
         Map<String, Type> columnTypes = 
regularColumnPredicates.getDomains().get().entrySet().stream()
                 .collect(Collectors.toMap(Map.Entry::getKey, entry -> 
entry.getValue().getType()));
 
         // Map of domains with partition stats keyed by partition name and 
column name
         Map<String, Map<String, Domain>> domainsWithStats = 
lazyMetadataTable.get().getRecordsByKeyPrefixes(
-                        encodedTargetColumnNames,
+                        HoodieListData.eager(columnStatsIndexPrefixRawKeys),
                         
HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, true)
                 .collectAsList()
                 .stream()
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiRecordLevelIndexSupport.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiRecordLevelIndexSupport.java
index 78f503f4b21e..e438e5686ebb 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiRecordLevelIndexSupport.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiRecordLevelIndexSupport.java
@@ -22,6 +22,7 @@ import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.predicate.Domain;
 import io.trino.spi.predicate.TupleDomain;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -47,6 +48,7 @@ import java.util.stream.Collectors;
 
 import static io.trino.plugin.hudi.HudiErrorCode.HUDI_BAD_DATA;
 import static 
io.trino.plugin.hudi.HudiSessionProperties.getRecordIndexWaitTimeout;
+import static io.trino.plugin.hudi.HudiUtil.collectAsMap;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 public class HudiRecordLevelIndexSupport
@@ -94,7 +96,7 @@ public class HudiRecordLevelIndexSupport
 
                 // Perform index lookup in metadataTable
                 // TODO: document here what this map is keyed by
-                Map<String, HoodieRecordGlobalLocation> recordIndex = 
lazyTableMetadata.get().readRecordIndex(recordKeys);
+                Map<String, HoodieRecordGlobalLocation> recordIndex = 
collectAsMap(lazyTableMetadata.get().readRecordIndexLocationsWithKeys(HoodieListData.eager(recordKeys)));
                 if (recordIndex.isEmpty()) {
                     log.debug("Record level index lookup took %s ms but 
returned no locations for the given keys %s for table %s",
                             timer.endTimer(), recordKeys, schemaTableName);
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiSecondaryIndexSupport.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiSecondaryIndexSupport.java
index 23121902f72c..eddf4d48f547 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiSecondaryIndexSupport.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiSecondaryIndexSupport.java
@@ -19,10 +19,12 @@ import io.trino.plugin.hudi.util.TupleDomainUtils;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.SchemaTableName;
 import io.trino.spi.predicate.TupleDomain;
+import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieIndexDefinition;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieDataUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
@@ -85,7 +87,7 @@ public class HudiSecondaryIndexSupport
 
             // Perform index lookup in metadataTable
             // TODO: document here what this map is keyed by
-            Map<String, HoodieRecordGlobalLocation> recordKeyLocationsMap = 
lazyTableMetadata.get().readSecondaryIndex(secondaryKeys, indexName);
+            Map<String, HoodieRecordGlobalLocation> recordKeyLocationsMap = 
HoodieDataUtils.dedupeAndCollectAsMap(lazyTableMetadata.get().readSecondaryIndexLocationsWithKeys(HoodieListData.eager(secondaryKeys),
 indexName));
             if (recordKeyLocationsMap.isEmpty()) {
                 log.debug("Took %s ms, but secondary index lookup returned no 
locations for the given keys for table %s", timer.endTimer(), schemaTableName);
                 // Return all original fileSlices
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java
index 36c5342920f4..1f7584c3c168 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java
@@ -19,29 +19,25 @@ import io.trino.plugin.hudi.util.SynthesizedColumnHandler;
 import io.trino.spi.Page;
 import io.trino.spi.connector.ConnectorPageSource;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.Record;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.avro.AvroRecordContext;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
-import org.apache.hudi.common.model.HoodieAvroRecordMerger;
-import org.apache.hudi.common.model.HoodieEmptyRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.UnaryOperator;
 
 public class HudiTrinoReaderContext
         extends HoodieReaderContext<IndexedRecord>
@@ -53,11 +49,14 @@ public class HudiTrinoReaderContext
     List<HiveColumnHandle> columnHandles;
 
     public HudiTrinoReaderContext(
+            StorageConfiguration storageConfiguration,
+            HoodieTableConfig tableConfig,
             ConnectorPageSource pageSource,
             List<HiveColumnHandle> dataHandles,
             List<HiveColumnHandle> columnHandles,
             SynthesizedColumnHandler synthesizedColumnHandler)
     {
+        super(storageConfiguration, tableConfig, Option.empty(), 
Option.empty(), new AvroRecordContext(tableConfig, 
tableConfig.getPayloadClass()));
         this.pageSource = pageSource;
         this.avroSerializer = new HudiAvroSerializer(columnHandles, 
synthesizedColumnHandler);
         this.dataHandles = dataHandles;
@@ -77,6 +76,23 @@ public class HudiTrinoReaderContext
             Schema dataSchema,
             Schema requiredSchema,
             HoodieStorage storage)
+    {
+        return createRecordIterator();
+    }
+
+    @Override
+    public ClosableIterator<IndexedRecord> getFileRecordIterator(
+            StoragePathInfo storagePathInfo,
+            long start,
+            long length,
+            Schema dataSchema,
+            Schema requiredSchema,
+            HoodieStorage storage)
+    {
+        return createRecordIterator();
+    }
+
+    private ClosableIterator<IndexedRecord> createRecordIterator()
     {
         return new ClosableIterator<>()
         {
@@ -130,98 +146,14 @@ public class HudiTrinoReaderContext
     }
 
     @Override
-    public IndexedRecord convertAvroRecord(IndexedRecord record)
+    protected Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode 
mergeMode, String mergeStrategyId, String mergeImplClasses)
     {
-        return record;
+        return Option.of(new HoodiePreCombineAvroRecordMerger());
     }
 
     @Override
-    public GenericRecord convertToAvroRecord(IndexedRecord record, Schema 
schema)
-    {
-        GenericRecord ret = new GenericData.Record(schema);
-        for (Schema.Field field : schema.getFields()) {
-            ret.put(field.name(), record.get(field.pos()));
-        }
-        return ret;
-    }
-
-    @Override
-    public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode 
mergeMode, String mergeStrategyId, String mergeImplClasses)
-    {
-        return Option.of(HoodieAvroRecordMerger.INSTANCE);
-    }
-
-    @Override
-    public Object getValue(IndexedRecord record, Schema schema, String 
fieldName)
-    {
-        if (colToPosMap.containsKey(fieldName)) {
-            return record.get(colToPosMap.get(fieldName));
-        }
-        else {
-            // record doesn't have the queried field, return null
-            return null;
-        }
-    }
-
-    @Override
-    public IndexedRecord seal(IndexedRecord record)
-    {
-        // TODO: this can rely on colToPos map directly instead of schema
-        Schema schema = record.getSchema();
-        IndexedRecord newRecord = new Record(schema);
-        List<Schema.Field> fields = schema.getFields();
-        for (Schema.Field field : fields) {
-            int pos = schema.getField(field.name()).pos();
-            newRecord.put(pos, record.get(pos));
-        }
-        return newRecord;
-    }
-
-    @Override
-    public IndexedRecord toBinaryRow(Schema schema, IndexedRecord record)
-    {
-        return record;
-    }
-
-    @Override
-    public ClosableIterator<IndexedRecord> mergeBootstrapReaders(
-            ClosableIterator closableIterator, Schema schema,
-            ClosableIterator closableIterator1, Schema schema1)
+    public ClosableIterator<IndexedRecord> 
mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator, 
Schema skeletonRequiredSchema, ClosableIterator<IndexedRecord> 
dataFileIterator, Schema dataRequiredSchema, List<Pair<String, Object>> 
requiredPartitionFieldAndValues)
     {
         return null;
     }
-
-    @Override
-    public UnaryOperator<IndexedRecord> projectRecord(
-            Schema from,
-            Schema to,
-            Map<String, String> renamedColumns)
-    {
-        List<Schema.Field> toFields = to.getFields();
-        int[] projection = new int[toFields.size()];
-        for (int i = 0; i < projection.length; i++) {
-            projection[i] = from.getField(toFields.get(i).name()).pos();
-        }
-
-        return fromRecord -> {
-            IndexedRecord toRecord = new Record(to);
-            for (int i = 0; i < projection.length; i++) {
-                toRecord.put(i, fromRecord.get(projection[i]));
-            }
-            return toRecord;
-        };
-    }
-
-    @Override
-    public HoodieRecord<IndexedRecord> constructHoodieRecord(
-            BufferedRecord<IndexedRecord> bufferedRecord)
-    {
-        if (bufferedRecord.isDelete()) {
-            return new HoodieEmptyRecord<>(
-                    new HoodieKey(bufferedRecord.getRecordKey(), null),
-                    HoodieRecord.HoodieRecordType.AVRO);
-        }
-
-        return new HoodieAvroIndexedRecord(bufferedRecord.getRecord());
-    }
 }
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoRecord.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoRecord.java
deleted file mode 100644
index 7fa0f71399ed..000000000000
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoRecord.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package io.trino.plugin.hudi.reader;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.model.HoodieOperation;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.MetadataValues;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.keygen.BaseKeyGenerator;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-
-public class HudiTrinoRecord
-        extends HoodieRecord<IndexedRecord>
-{
-    public HudiTrinoRecord()
-    {
-    }
-
-    @Override
-    public HoodieRecord<IndexedRecord> newInstance()
-    {
-        return null;
-    }
-
-    @Override
-    public HoodieRecord<IndexedRecord> newInstance(HoodieKey hoodieKey, 
HoodieOperation hoodieOperation)
-    {
-        return null;
-    }
-
-    @Override
-    public HoodieRecord<IndexedRecord> newInstance(HoodieKey hoodieKey)
-    {
-        return null;
-    }
-
-    @Override
-    public Comparable<?> doGetOrderingValue(Schema schema, Properties 
properties)
-    {
-        return null;
-    }
-
-    @Override
-    public HoodieRecordType getRecordType()
-    {
-        return null;
-    }
-
-    @Override
-    public String getRecordKey(Schema schema, Option<BaseKeyGenerator> option)
-    {
-        return "";
-    }
-
-    @Override
-    public String getRecordKey(Schema schema, String s)
-    {
-        return "";
-    }
-
-    @Override
-    protected void writeRecordPayload(IndexedRecord page, Kryo kryo, Output 
output)
-    {
-    }
-
-    @Override
-    protected IndexedRecord readRecordPayload(Kryo kryo, Input input)
-    {
-        return null;
-    }
-
-    @Override
-    public Object[] getColumnValues(Schema schema, String[] strings, boolean b)
-    {
-        return new Object[0];
-    }
-
-    @Override
-    public HoodieRecord joinWith(HoodieRecord hoodieRecord, Schema schema)
-    {
-        return null;
-    }
-
-    @Override
-    public HoodieRecord prependMetaFields(Schema schema, Schema schema1,
-            MetadataValues metadataValues, Properties properties)
-    {
-        return null;
-    }
-
-    @Override
-    public HoodieRecord rewriteRecordWithNewSchema(Schema schema, Properties 
properties,
-            Schema schema1, Map<String, String> map)
-    {
-        return null;
-    }
-
-    @Override
-    public boolean isDelete(Schema schema, Properties properties)
-            throws IOException
-    {
-        return false;
-    }
-
-    @Override
-    public boolean shouldIgnore(Schema schema, Properties properties)
-            throws IOException
-    {
-        return false;
-    }
-
-    @Override
-    public HoodieRecord<IndexedRecord> copy()
-    {
-        return null;
-    }
-
-    @Override
-    public Option<Map<String, String>> getMetadata()
-    {
-        return null;
-    }
-
-    @Override
-    public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(Schema schema, 
Properties properties,
-            Option<Pair<String, String>> option, Boolean aBoolean, 
Option<String> option1,
-            Boolean aBoolean1, Option<Schema> option2)
-            throws IOException
-    {
-        return null;
-    }
-
-    @Override
-    public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema schema, 
Properties properties,
-            Option<BaseKeyGenerator> option)
-    {
-        return null;
-    }
-
-    @Override
-    public HoodieRecord truncateRecordKey(Schema schema, Properties 
properties, String s)
-            throws IOException
-    {
-        return null;
-    }
-
-    @Override
-    public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, 
Properties properties)
-            throws IOException
-    {
-        return null;
-    }
-
-    @Override
-    public ByteArrayOutputStream getAvroBytes(Schema schema, Properties 
properties)
-            throws IOException
-    {
-        return null;
-    }
-}
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java
index e87122de0795..a431465d938a 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java
@@ -16,21 +16,14 @@ package io.trino.plugin.hudi.stats;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.common.util.hash.ColumnIndexID;
-import org.apache.hudi.common.util.hash.FileIndexID;
-import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
-import org.apache.hudi.metadata.HoodieMetadataMetrics;
-import org.apache.hudi.metadata.HoodieMetadataPayload;
-import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
+import org.apache.hudi.stats.ValueMetadata;
 import org.apache.hudi.storage.HoodieStorage;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -55,60 +48,29 @@ public class TableMetadataReader
      * @return a map from column name to their corresponding {@link 
HoodieColumnRangeMetadata}
      * @throws HoodieMetadataException if an error occurs while fetching the 
column statistics
      */
-    Map<String, HoodieColumnRangeMetadata> getColumnStats(List<Pair<String, 
String>> partitionNameFileNameList, List<String> columnNames)
+    public Map<String, HoodieColumnRangeMetadata> 
getColumnsRange(List<Pair<String, String>> partitionNameFileNameList, 
List<String> columnNames)
             throws HoodieMetadataException
     {
-        return 
computeFileToColumnStatsMap(computeColumnStatsLookupKeys(partitionNameFileNameList,
 columnNames));
-    }
-
-    /**
-     * @param partitionNameFileNameList a list of partition and file name 
pairs for which column stats need to be retrieved
-     * @param columnNames list of column names for which stats are needed
-     * @return a list of column stats keys to look up in the metadata table 
col_stats partition.
-     */
-    private List<String> computeColumnStatsLookupKeys(
-            final List<Pair<String, String>> partitionNameFileNameList,
-            final List<String> columnNames)
-    {
-        return columnNames.stream()
-                .flatMap(columnName -> partitionNameFileNameList.stream()
-                        .map(partitionNameFileNamePair -> 
HoodieMetadataPayload.getColumnStatsIndexKey(
-                                new 
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())),
-                                new 
FileIndexID(partitionNameFileNamePair.getRight()),
-                                new ColumnIndexID(columnName))))
-                .toList();
-    }
-
-    /**
-     * @param columnStatsLookupKeys a map from column stats key to partition 
and file name pair
-     * @return a map from column name to merged HoodieMetadataColumnStats
-     */
-    private Map<String, HoodieColumnRangeMetadata> 
computeFileToColumnStatsMap(List<String> columnStatsLookupKeys)
-    {
-        HoodieTimer timer = HoodieTimer.start();
-        Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
-                getRecordsByKeys(columnStatsLookupKeys, 
MetadataPartitionType.COLUMN_STATS.getPartitionPath());
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_COLUMN_STATS_METADATA_STR, 
timer.endTimer()));
-        return hoodieRecords.values().stream()
-                .collect(Collectors.groupingBy(
-                        r -> 
r.getData().getColumnStatMetadata().get().getColumnName(),
-                        Collectors.mapping(r -> 
r.getData().getColumnStatMetadata().get(), Collectors.toList())))
+        Map<Pair<String, String>, List<HoodieMetadataColumnStats>> 
columnStatsMap = getColumnStats(partitionNameFileNameList, columnNames);
+        return 
columnStatsMap.values().stream().flatMap(Collection::stream).collect(Collectors.groupingBy(
+                HoodieMetadataColumnStats::getColumnName,
+                Collectors.mapping(colStats -> colStats, Collectors.toList())))
                 .entrySet().stream()
-                .collect(Collectors.toMap(
-                        Map.Entry::getKey,
-                        e -> {
-                            long valueCount = 0L;
-                            long nullCount = 0L;
-                            long totalSize = 0L;
-                            long totalUncompressedSize = 0L;
-                            for (HoodieMetadataColumnStats stats : 
e.getValue()) {
-                                valueCount += stats.getValueCount();
-                                nullCount += stats.getNullCount();
-                                totalSize += stats.getTotalSize();
-                                totalUncompressedSize += 
stats.getTotalUncompressedSize();
-                            }
-                            return HoodieColumnRangeMetadata.create(
-                                    "", e.getKey(), null, null, nullCount, 
valueCount, totalSize, totalUncompressedSize);
-                        }));
+            .collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    e -> {
+                        long valueCount = 0L;
+                        long nullCount = 0L;
+                        long totalSize = 0L;
+                        long totalUncompressedSize = 0L;
+                        for (HoodieMetadataColumnStats stats : e.getValue()) {
+                            valueCount += stats.getValueCount();
+                            nullCount += stats.getNullCount();
+                            totalSize += stats.getTotalSize();
+                            totalUncompressedSize += 
stats.getTotalUncompressedSize();
+                        }
+                        return HoodieColumnRangeMetadata.create(
+                                "", e.getKey(), null, null, nullCount, 
valueCount, totalSize, totalUncompressedSize, ValueMetadata.NULL_METADATA);
+                    }));
     }
 }
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableStatisticsReader.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableStatisticsReader.java
index 55d424db40ca..5501905dcd00 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableStatisticsReader.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableStatisticsReader.java
@@ -24,11 +24,11 @@ import io.trino.spi.statistics.TableStatistics;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
-import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.stats.HoodieColumnRangeMetadata;
 
 import java.util.List;
 import java.util.Map;
@@ -113,6 +113,6 @@ public class TableStatisticsReader
                 .stream().flatMap(entry -> entry.getValue()
                         .map(baseFile -> Pair.of(entry.getKey(), 
baseFile.getFileName())))
                 .toList();
-        return tableMetadata.getColumnStats(filePaths, columnNames);
+        return tableMetadata.getColumnsRange(filePaths, columnNames);
     }
 }
diff --git 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java
 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java
index 1308cff269a9..5fe42eb55a46 100644
--- 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java
+++ 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java
@@ -13,7 +13,6 @@
  */
 package io.trino.plugin.hudi;
 
-import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultiset;
 import com.google.common.collect.Multiset;
@@ -21,7 +20,6 @@ import 
io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer;
 import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation;
 import io.trino.testing.AbstractTestQueryFramework;
 import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.QueryRunner;
 import org.intellij.lang.annotations.Language;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.parallel.Execution;
@@ -34,15 +32,13 @@ import java.util.Map;
 
 import static com.google.common.io.MoreFiles.deleteRecursively;
 import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
-import static 
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans;
 import static 
io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_MULTI_FG_PT_V8_MOR;
+import static 
io.trino.plugin.hudi.util.FileOperationAssertions.assertAlluxioFileSystemAccesses;
 import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.DATA;
 import static 
io.trino.plugin.hudi.util.FileOperationUtils.FileType.INDEX_DEFINITION;
 import static 
io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE;
 import static 
io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE_PROPERTIES;
 import static 
io.trino.plugin.hudi.util.FileOperationUtils.FileType.TABLE_PROPERTIES;
-import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
-import static java.util.stream.Collectors.toCollection;
 
 @ResourceLock("HUDI_CACHE_SYSTEM")
 @Execution(ExecutionMode.SAME_THREAD)
@@ -78,29 +74,18 @@ public class TestHudiAlluxioCacheFileOperations
             throws InterruptedException
     {
         @Language("SQL") String query = "SELECT * FROM " + 
HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'";
-        assertFileSystemAccesses(
-                query,
-                ImmutableMultiset.<FileOperation>builder()
-                        .addCopies(new FileOperation("Alluxio.readCached", 
DATA), 2)
-                        .addCopies(new FileOperation("Alluxio.readCached", 
METADATA_TABLE), 27)
-                        .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 4)
-                        .addCopies(new FileOperation("InputFile.length", 
METADATA_TABLE), 10)
-                        .addCopies(new FileOperation("InputFile.newStream", 
INDEX_DEFINITION), 2)
-                        .add(new FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES))
-                        .addCopies(new FileOperation("InputFile.newStream", 
TABLE_PROPERTIES), 2)
-                        .build());
+        Multiset<FileOperation> expectedFileOperations = 
ImmutableMultiset.<FileOperation>builder()
+                .addCopies(new FileOperation("Alluxio.readCached", DATA), 2)
+                .addCopies(new FileOperation("Alluxio.readCached", 
METADATA_TABLE), 20)
+                .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 5)
+                .addCopies(new FileOperation("InputFile.length", 
METADATA_TABLE), 11)
+                .addCopies(new FileOperation("InputFile.newStream", 
INDEX_DEFINITION), 2)
+                .add(new FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES))
+                .addCopies(new FileOperation("InputFile.newStream", 
TABLE_PROPERTIES), 2)
+                .build();
+        assertAlluxioFileSystemAccesses(getDistributedQueryRunner(), query, 
expectedFileOperations);
 
-        assertFileSystemAccesses(
-                query,
-                ImmutableMultiset.<FileOperation>builder()
-                        .addCopies(new FileOperation("Alluxio.readCached", 
DATA), 2)
-                        .addCopies(new FileOperation("Alluxio.readCached", 
METADATA_TABLE), 27)
-                        .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 4)
-                        .addCopies(new FileOperation("InputFile.length", 
METADATA_TABLE), 10)
-                        .addCopies(new FileOperation("InputFile.newStream", 
INDEX_DEFINITION), 2)
-                        .add(new FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES))
-                        .addCopies(new FileOperation("InputFile.newStream", 
TABLE_PROPERTIES), 2)
-                        .build());
+        assertAlluxioFileSystemAccesses(getDistributedQueryRunner(), query, 
expectedFileOperations);
     }
 
     @Test
@@ -112,45 +97,30 @@ public class TestHudiAlluxioCacheFileOperations
                 "INNER JOIN " + HUDI_MULTI_FG_PT_V8_MOR + " t2 ON t1.id = 
t2.id " +
                 "WHERE t2.price <= 102";
 
-        assertFileSystemAccesses(query,
+        assertAlluxioFileSystemAccesses(
+                getDistributedQueryRunner(),
+                query,
                 ImmutableMultiset.<FileOperation>builder()
                         .addCopies(new FileOperation("Alluxio.readCached", 
DATA), 6)
-                        .addCopies(new FileOperation("Alluxio.readCached", 
METADATA_TABLE), 288)
-                        .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 39)
-                        .addCopies(new FileOperation("InputFile.length", 
METADATA_TABLE), 93)
+                        .addCopies(new FileOperation("Alluxio.readCached", 
METADATA_TABLE), 222)
+                        .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 60)
+                        .addCopies(new FileOperation("InputFile.length", 
METADATA_TABLE), 114)
                         .addCopies(new FileOperation("InputFile.newStream", 
INDEX_DEFINITION), 5)
                         .addCopies(new FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES), 3)
                         .addCopies(new FileOperation("InputFile.newStream", 
TABLE_PROPERTIES), 5)
                         .build());
 
-        assertFileSystemAccesses(query,
+        assertAlluxioFileSystemAccesses(
+                getDistributedQueryRunner(),
+                query,
                 ImmutableMultiset.<FileOperation>builder()
                         .addCopies(new FileOperation("Alluxio.readCached", 
DATA), 6)
-                        .addCopies(new FileOperation("Alluxio.readCached", 
METADATA_TABLE), 215)
-                        .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 29)
-                        .addCopies(new FileOperation("InputFile.length", 
METADATA_TABLE), 69)
+                        .addCopies(new FileOperation("Alluxio.readCached", 
METADATA_TABLE), 166)
+                        .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 45)
+                        .addCopies(new FileOperation("InputFile.length", 
METADATA_TABLE), 85)
                         .addCopies(new FileOperation("InputFile.newStream", 
INDEX_DEFINITION), 4)
                         .addCopies(new FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES), 2)
                         .addCopies(new FileOperation("InputFile.newStream", 
TABLE_PROPERTIES), 4)
                         .build());
     }
-
-    private void assertFileSystemAccesses(@Language("SQL") String query, 
Multiset<FileOperation> expectedCacheAccesses)
-            throws InterruptedException
-    {
-        DistributedQueryRunner queryRunner = getDistributedQueryRunner();
-        queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
-        // Allow time for table stats computation to finish before validation.
-        Thread.sleep(1000L);
-        assertMultisetsEqual(getFileOperations(queryRunner), 
expectedCacheAccesses);
-    }
-
-    public static Multiset<FileOperation> getFileOperations(QueryRunner 
queryRunner)
-    {
-        return getCacheOperationSpans(queryRunner)
-                .stream()
-                .filter(span -> !span.getName().startsWith("InputFile.exists"))
-                .map(FileOperation::create)
-                .collect(toCollection(HashMultiset::create));
-    }
 }
diff --git 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java
 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java
index 3f6d97f16919..7c61d260cab5 100644
--- 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java
+++ 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java
@@ -13,7 +13,6 @@
  */
 package io.trino.plugin.hudi;
 
-import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultiset;
 import com.google.common.collect.Multiset;
@@ -21,7 +20,6 @@ import 
io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer;
 import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation;
 import io.trino.testing.AbstractTestQueryFramework;
 import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.QueryRunner;
 import org.intellij.lang.annotations.Language;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.parallel.Execution;
@@ -30,16 +28,13 @@ import org.junit.jupiter.api.parallel.ResourceLock;
 
 import java.util.Map;
 
-import static 
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation;
-import static 
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.isTrinoSchemaOrPermissions;
 import static 
io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_MULTI_FG_PT_V8_MOR;
+import static 
io.trino.plugin.hudi.util.FileOperationAssertions.assertFileSystemAccesses;
 import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.DATA;
 import static 
io.trino.plugin.hudi.util.FileOperationUtils.FileType.INDEX_DEFINITION;
 import static 
io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE;
 import static 
io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE_PROPERTIES;
 import static 
io.trino.plugin.hudi.util.FileOperationUtils.FileType.TABLE_PROPERTIES;
-import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
-import static java.util.stream.Collectors.toCollection;
 
 @ResourceLock("HUDI_CACHE_SYSTEM")
 @Execution(ExecutionMode.SAME_THREAD)
@@ -68,29 +63,18 @@ public class TestHudiMemoryCacheFileOperations
             throws InterruptedException
     {
         @Language("SQL") String query = "SELECT * FROM " + 
HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'";
-        assertFileSystemAccesses(
-                query,
-                ImmutableMultiset.<FileOperation>builder()
-                        .addCopies(new 
FileOperation("FileSystemCache.cacheInput", DATA), 2)
-                        .addCopies(new 
FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 4)
-                        .addCopies(new 
FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 6)
-                        .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 4)
-                        .addCopies(new FileOperation("InputFile.newStream", 
INDEX_DEFINITION), 2)
-                        .add(new FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES))
-                        .addCopies(new FileOperation("InputFile.newStream", 
TABLE_PROPERTIES), 2)
-                        .build());
+        Multiset<FileOperation> expectedFileOperations = 
ImmutableMultiset.<FileOperation>builder()
+                .addCopies(new FileOperation("FileSystemCache.cacheInput", 
DATA), 2)
+                .addCopies(new FileOperation("FileSystemCache.cacheLength", 
METADATA_TABLE), 5)
+                .addCopies(new FileOperation("FileSystemCache.cacheStream", 
METADATA_TABLE), 6)
+                .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 5)
+                .addCopies(new FileOperation("InputFile.newStream", 
INDEX_DEFINITION), 2)
+                .add(new FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES))
+                .addCopies(new FileOperation("InputFile.newStream", 
TABLE_PROPERTIES), 2)
+                .build();
+        assertFileSystemAccesses(getDistributedQueryRunner(), query, 
expectedFileOperations);
 
-        assertFileSystemAccesses(
-                query,
-                ImmutableMultiset.<FileOperation>builder()
-                        .addCopies(new 
FileOperation("FileSystemCache.cacheInput", DATA), 2)
-                        .addCopies(new 
FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 4)
-                        .addCopies(new 
FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 6)
-                        .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 4)
-                        .addCopies(new FileOperation("InputFile.newStream", 
INDEX_DEFINITION), 2)
-                        .add(new FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES))
-                        .addCopies(new FileOperation("InputFile.newStream", 
TABLE_PROPERTIES), 2)
-                        .build());
+        assertFileSystemAccesses(getDistributedQueryRunner(), query, 
expectedFileOperations);
     }
 
     @Test
@@ -102,47 +86,30 @@ public class TestHudiMemoryCacheFileOperations
                 "INNER JOIN " + HUDI_MULTI_FG_PT_V8_MOR + " t2 ON t1.id = 
t2.id " +
                 "WHERE t2.price <= 102";
 
-        assertFileSystemAccesses(query,
+        assertFileSystemAccesses(
+                getDistributedQueryRunner(),
+                query,
                 ImmutableMultiset.<FileOperation>builder()
                         .addCopies(new 
FileOperation("FileSystemCache.cacheInput", DATA), 6)
-                        .addCopies(new 
FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 39)
+                        .addCopies(new 
FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 60)
                         .addCopies(new 
FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 54)
-                        .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 39)
+                        .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 60)
                         .addCopies(new FileOperation("InputFile.newStream", 
INDEX_DEFINITION), 5)
                         .addCopies(new FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES), 3)
                         .addCopies(new FileOperation("InputFile.newStream", 
TABLE_PROPERTIES), 5)
                         .build());
 
-        assertFileSystemAccesses(query,
+        assertFileSystemAccesses(
+                getDistributedQueryRunner(),
+                query,
                 ImmutableMultiset.<FileOperation>builder()
                         .addCopies(new 
FileOperation("FileSystemCache.cacheInput", DATA), 6)
-                        .addCopies(new 
FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 29)
+                        .addCopies(new 
FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 45)
                         .addCopies(new 
FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 40)
-                        .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 29)
+                        .addCopies(new FileOperation("InputFile.lastModified", 
METADATA_TABLE), 45)
                         .addCopies(new FileOperation("InputFile.newStream", 
INDEX_DEFINITION), 4)
                         .addCopies(new FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES), 2)
                         .addCopies(new FileOperation("InputFile.newStream", 
TABLE_PROPERTIES), 4)
                         .build());
     }
-
-    private void assertFileSystemAccesses(@Language("SQL") String query, 
Multiset<FileOperation> expectedCacheAccesses)
-            throws InterruptedException
-    {
-        DistributedQueryRunner queryRunner = getDistributedQueryRunner();
-        queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
-        // Allow time for table stats computation to finish before validation.
-        Thread.sleep(1000L);
-        assertMultisetsEqual(getFileOperations(queryRunner), 
expectedCacheAccesses);
-    }
-
-    private static Multiset<FileOperation> getFileOperations(QueryRunner 
queryRunner)
-    {
-        return queryRunner.getSpans().stream()
-                .filter(span -> span.getName().startsWith("Input.") || 
span.getName().startsWith("InputFile.") || 
span.getName().startsWith("FileSystemCache."))
-                .filter(span -> 
!span.getName().startsWith("InputFile.newInput"))
-                .filter(span -> !span.getName().startsWith("InputFile.exists"))
-                .filter(span -> 
!isTrinoSchemaOrPermissions(getFileLocation(span)))
-                .map(FileOperation::create)
-                .collect(toCollection(HashMultiset::create));
-    }
 }
diff --git 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java
 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java
index 416914d3f7e2..c2fa041a99bd 100644
--- 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java
+++ 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java
@@ -13,7 +13,6 @@
  */
 package io.trino.plugin.hudi;
 
-import com.google.common.collect.HashMultiset;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMultiset;
 import com.google.common.collect.Multiset;
@@ -21,7 +20,6 @@ import 
io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer;
 import io.trino.plugin.hudi.util.FileOperationUtils;
 import io.trino.testing.AbstractTestQueryFramework;
 import io.trino.testing.DistributedQueryRunner;
-import io.trino.testing.QueryRunner;
 import org.intellij.lang.annotations.Language;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.parallel.Execution;
@@ -30,16 +28,13 @@ import org.junit.jupiter.api.parallel.ResourceLock;
 
 import java.util.Map;
 
-import static 
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation;
-import static 
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.isTrinoSchemaOrPermissions;
 import static 
io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_MULTI_FG_PT_V8_MOR;
+import static 
io.trino.plugin.hudi.util.FileOperationAssertions.assertFileSystemAccesses;
 import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.DATA;
 import static 
io.trino.plugin.hudi.util.FileOperationUtils.FileType.INDEX_DEFINITION;
 import static 
io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE;
 import static 
io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE_PROPERTIES;
 import static 
io.trino.plugin.hudi.util.FileOperationUtils.FileType.TABLE_PROPERTIES;
-import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
-import static java.util.stream.Collectors.toCollection;
 
 @ResourceLock("HUDI_CACHE_SYSTEM")
 @Execution(ExecutionMode.SAME_THREAD)
@@ -68,29 +63,18 @@ public class TestHudiNoCacheFileOperations
             throws InterruptedException
     {
         @Language("SQL") String query = "SELECT * FROM " + 
HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'";
-        assertFileSystemAccesses(
-                query,
-                ImmutableMultiset.<FileOperationUtils.FileOperation>builder()
-                        .addCopies(new 
FileOperationUtils.FileOperation("Input.readTail", DATA), 2)
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 4)
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 4)
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 6)
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 2)
-                        .add(new 
FileOperationUtils.FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES))
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2)
-                        .build());
+        Multiset<FileOperationUtils.FileOperation> expectedFileOperations = 
ImmutableMultiset.<FileOperationUtils.FileOperation>builder()
+                .addCopies(new 
FileOperationUtils.FileOperation("Input.readTail", DATA), 2)
+                .addCopies(new 
FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 5)
+                .addCopies(new 
FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 5)
+                .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 6)
+                .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 2)
+                .add(new 
FileOperationUtils.FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES))
+                .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2)
+                .build();
+        assertFileSystemAccesses(getDistributedQueryRunner(), query, 
expectedFileOperations);
 
-        assertFileSystemAccesses(
-                query,
-                ImmutableMultiset.<FileOperationUtils.FileOperation>builder()
-                        .addCopies(new 
FileOperationUtils.FileOperation("Input.readTail", DATA), 2)
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 4)
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 4)
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 2)
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 6)
-                        .add(new 
FileOperationUtils.FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES))
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2)
-                        .build());
+        assertFileSystemAccesses(getDistributedQueryRunner(), query, 
expectedFileOperations);
     }
 
     @Test
@@ -102,47 +86,30 @@ public class TestHudiNoCacheFileOperations
                 "INNER JOIN " + HUDI_MULTI_FG_PT_V8_MOR + " t2 ON t1.id = 
t2.id " +
                 "WHERE t2.price <= 102";
 
-        assertFileSystemAccesses(query,
+        assertFileSystemAccesses(
+                getDistributedQueryRunner(),
+                query,
                 ImmutableMultiset.<FileOperationUtils.FileOperation>builder()
                         .addCopies(new 
FileOperationUtils.FileOperation("Input.readTail", DATA), 6)
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 39)
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 39)
+                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 60)
+                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 60)
                         .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 5)
                         .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 54)
                         .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES), 3)
                         .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5)
                         .build());
 
-        assertFileSystemAccesses(query,
+        assertFileSystemAccesses(
+                getDistributedQueryRunner(),
+                query,
                 ImmutableMultiset.<FileOperationUtils.FileOperation>builder()
                         .addCopies(new 
FileOperationUtils.FileOperation("Input.readTail", DATA), 6)
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 29)
-                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 29)
+                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 45)
+                        .addCopies(new 
FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 45)
                         .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 4)
                         .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 40)
                         .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", 
METADATA_TABLE_PROPERTIES), 2)
                         .addCopies(new 
FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4)
                         .build());
     }
-
-    private void assertFileSystemAccesses(@Language("SQL") String query, 
Multiset<FileOperationUtils.FileOperation> expectedCacheAccesses)
-               throws InterruptedException
-    {
-        DistributedQueryRunner queryRunner = getDistributedQueryRunner();
-        queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
-        // Allow time for table stats computation to finish before validation.
-        Thread.sleep(1000L);
-        assertMultisetsEqual(getFileOperations(queryRunner), 
expectedCacheAccesses);
-    }
-
-    private static Multiset<FileOperationUtils.FileOperation> 
getFileOperations(QueryRunner queryRunner)
-    {
-        return queryRunner.getSpans().stream()
-                .filter(span -> span.getName().startsWith("Input.") || 
span.getName().startsWith("InputFile.") || 
span.getName().startsWith("FileSystemCache."))
-                .filter(span -> 
!span.getName().startsWith("InputFile.newInput"))
-                .filter(span -> !span.getName().startsWith("InputFile.exists"))
-                .filter(span -> 
!isTrinoSchemaOrPermissions(getFileLocation(span)))
-                .map(FileOperationUtils.FileOperation::create)
-                .collect(toCollection(HashMultiset::create));
-    }
 }
diff --git 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java
index d1c26fbf5edd..dbb7195d38ef 100644
--- 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java
+++ 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java
@@ -754,8 +754,8 @@ public class TestHudiSmokeTest
                 .withPartitionStatsIndexEnabled(false)
                 .withResolveColumnNameCasingEnabled(true)
                 .build();
-        MaterializedResult totalRes = getQueryRunner().execute(session, 
"SELECT * FROM " + HUDI_COW_TABLE_WITH_FIELD_NAMES_IN_CAPS);
         MaterializedResult prunedRes = getQueryRunner().execute(session, 
"SELECT * FROM  " + HUDI_COW_TABLE_WITH_FIELD_NAMES_IN_CAPS + " WHERE id='1'");
+        MaterializedResult totalRes = getQueryRunner().execute(session, 
"SELECT * FROM " + HUDI_COW_TABLE_WITH_FIELD_NAMES_IN_CAPS);
         int totalSplits = totalRes.getStatementStats().get().getTotalSplits();
         int totalRows = totalRes.getRowCount();
         int prunedSplits = 
prunedRes.getStatementStats().get().getTotalSplits();
diff --git 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java
 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java
index 9c4f4a6176fc..266a5397af41 100644
--- 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java
+++ 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java
@@ -181,6 +181,7 @@ public class TestHudiSplitFactory
                 "/test/path",
                 HoodieTableType.MERGE_ON_READ,
                 ImmutableList.of(),
+                ImmutableList.of(),
                 TupleDomain.all(),
                 TupleDomain.all(),
                 "",
diff --git 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java
 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java
index 10d1b9873ad3..640980c41936 100644
--- 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java
+++ 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java
@@ -45,6 +45,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieJavaEngineContext;
 import org.apache.hudi.common.bootstrap.index.NoOpBootstrapIndex;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -54,7 +55,6 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.marker.MarkerType;
-import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -64,12 +64,10 @@ import 
org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.intellij.lang.annotations.Language;
 
 import java.io.IOException;
-import java.time.Instant;
 import java.time.LocalDate;
 import java.time.temporal.ChronoField;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -168,9 +166,9 @@ public class TpchHudiTablesInitializer
                     .map(MaterializedRow::getFields)
                     .map(recordConverter::toRecord)
                     .collect(Collectors.toList());
-            String timestamp = 
HoodieInstantTimeGenerator.formatDate(Date.from(Instant.now()));
-            writeClient.startCommitWithTime(timestamp);
-            writeClient.insert(records, timestamp);
+            String instantTime = writeClient.startCommit();
+            List<WriteStatus> writeStatuses = writeClient.insert(records, 
instantTime);
+            writeClient.commit(instantTime, writeStatuses);
         }
     }
 
diff --git 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/util/FileOperationAssertions.java
 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/util/FileOperationAssertions.java
new file mode 100644
index 000000000000..9523ff61e366
--- /dev/null
+++ 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/util/FileOperationAssertions.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi.util;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+import io.airlift.log.Logger;
+import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation;
+import io.trino.testing.QueryRunner;
+import org.intellij.lang.annotations.Language;
+
+import java.util.Comparator;
+
+import static 
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans;
+import static 
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation;
+import static 
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.isTrinoSchemaOrPermissions;
+import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
+import static java.util.stream.Collectors.toCollection;
+
+public final class FileOperationAssertions
+{
+    private static final Logger log = 
Logger.get(FileOperationAssertions.class);
+
+    private FileOperationAssertions() {}
+
+    /**
+     * Asserts that file system accesses match expected operations.
+     * This version uses manual filtering for Input/InputFile operations.
+     * Logs detailed comparison at WARN level for debugging test failures.
+     */
+    public static void assertFileSystemAccesses(
+            QueryRunner queryRunner,
+            @Language("SQL") String query,
+            Multiset<FileOperation> expectedCacheAccesses)
+            throws InterruptedException
+    {
+        queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
+        // Allow time for table stats computation to finish before validation.
+        Thread.sleep(1000L);
+        Multiset<FileOperation> actualCacheAccesses = 
getFileOperations(queryRunner);
+        printFileAccessDebugInfo(queryRunner, actualCacheAccesses, 
expectedCacheAccesses);
+        assertMultisetsEqual(actualCacheAccesses, expectedCacheAccesses);
+    }
+
+    /**
+     * Asserts that file system accesses match expected operations for Alluxio 
cache tests.
+     * This version uses getCacheOperationSpans for filtering.
+     * Logs detailed comparison at WARN level for debugging test failures.
+     */
+    public static void assertAlluxioFileSystemAccesses(
+            QueryRunner queryRunner,
+            @Language("SQL") String query,
+            Multiset<FileOperation> expectedCacheAccesses)
+            throws InterruptedException
+    {
+        queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
+        // Allow time for table stats computation to finish before validation.
+        Thread.sleep(1000L);
+        Multiset<FileOperation> actualCacheAccesses = 
getAlluxioFileOperations(queryRunner);
+        printFileAccessDebugInfo(queryRunner, actualCacheAccesses, 
expectedCacheAccesses);
+        assertMultisetsEqual(actualCacheAccesses, expectedCacheAccesses);
+    }
+
+    /**
+     * Gets file operations from query runner spans using manual filtering.
+     */
+    public static Multiset<FileOperation> getFileOperations(QueryRunner 
queryRunner)
+    {
+        return queryRunner.getSpans().stream()
+                .filter(span -> span.getName().startsWith("Input.") || 
span.getName().startsWith("InputFile.") || 
span.getName().startsWith("FileSystemCache."))
+                .filter(span -> 
!span.getName().startsWith("InputFile.newInput"))
+                .filter(span -> !span.getName().startsWith("InputFile.exists"))
+                .filter(span -> 
!isTrinoSchemaOrPermissions(getFileLocation(span)))
+                .map(FileOperation::create)
+                .collect(toCollection(HashMultiset::create));
+    }
+
+    /**
+     * Gets file operations for Alluxio cache tests using 
getCacheOperationSpans.
+     */
+    public static Multiset<FileOperation> getAlluxioFileOperations(QueryRunner 
queryRunner)
+    {
+        return getCacheOperationSpans(queryRunner)
+                .stream()
+                .filter(span -> !span.getName().startsWith("InputFile.exists"))
+                .map(FileOperation::create)
+                .collect(toCollection(HashMultiset::create));
+    }
+
+    private static void printFileAccessDebugInfo(
+            QueryRunner queryRunner,
+            Multiset<FileOperation> actualCacheAccesses,
+            Multiset<FileOperation> expectedCacheAccesses)
+    {
+        // Log all file paths accessed for debugging
+        log.warn("=== All File Paths Accessed ===");
+        queryRunner.getSpans().stream()
+                .filter(span -> 
span.getName().equals("InputFile.lastModified") || 
span.getName().equals("InputFile.length"))
+                .forEach(span -> log.warn("%s: %s", span.getName(), 
getFileLocation(span)));
+
+        // Log actual and expected cache accesses
+        log.warn("=== Actual Cache Accesses ===");
+        logSortedMultiset(actualCacheAccesses);
+
+        log.warn("=== Expected Cache Accesses ===");
+        logSortedMultiset(expectedCacheAccesses);
+
+        // Calculate and log differences
+        Multiset<FileOperation> extraInActual = 
HashMultiset.create(actualCacheAccesses);
+        extraInActual.removeAll(expectedCacheAccesses);
+
+        Multiset<FileOperation> missingFromActual = 
HashMultiset.create(expectedCacheAccesses);
+        missingFromActual.removeAll(actualCacheAccesses);
+
+        if (!extraInActual.isEmpty()) {
+            log.warn("=== Extra in Actual (not expected) ===");
+            logSortedMultiset(extraInActual);
+        }
+
+        if (!missingFromActual.isEmpty()) {
+            log.warn("=== Missing from Actual (expected but not found) ===");
+            logSortedMultiset(missingFromActual);
+        }
+    }
+
+    private static void logSortedMultiset(Multiset<FileOperation> multiset)
+    {
+        multiset.entrySet().stream()
+                .sorted(Comparator.comparing(a -> a.getElement().toString()))
+                .forEach(entry -> log.warn("%s: %s", entry.getElement(), 
entry.getCount()));
+    }
+}

Reply via email to