This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 181af01a1a25 perf: Adding support for LatestBaseFilesPathFilter to 
Spark File Index (#18136)
181af01a1a25 is described below

commit 181af01a1a25855af68798b50d1260d26fea4734
Author: Surya Prasanna <[email protected]>
AuthorDate: Thu Feb 26 08:46:47 2026 -0800

    perf: Adding support for LatestBaseFilesPathFilter to Spark File Index 
(#18136)
    
    This PR adds an opt-in path filtering mechanism during file listing to 
prevent Spark driver OOM errors when querying large Hudi datasets with multiple 
file versions per partition.
    
    Problem: When file listing is performed without filtering, all file 
versions (including older ones) are loaded into driver memory, causing OOM on 
large tables.
    
    Solution: Added a new config 
hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter 
(default: false) that enables HoodieROPathFilter during file listing to exclude 
older file versions.
    
    Summary and Changelog
    Users can now enable path filtering during file listing to avoid loading 
multiple file versions into memory on the driver. This is controlled by the new 
config 
hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter.
    
    Changes:
    
    New Config: FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER (default: 
false)
    
    Enables path filtering during file listing to reduce driver memory pressure
    Filters out older file versions, keeping only the latest files needed for 
queries
    API Extensions:
    
    Extended HoodieTableMetadata, BaseTableMetadata, and 
FileSystemBackedTableMetadata to accept optional StoragePathFilter parameter
    Added FSUtils.getAllDataFilesInPartition overload with path filter support
    Created HoodieROTableStoragePathFilter wrapper to adapt Hadoop PathFilter 
to Hudi's StoragePathFilter interface
    Spark Integration:
    
    Updated BaseHoodieTableFileIndex to use path filter when enabled
    Modified SparkHoodieTableFileIndex to apply HoodieROTablePathFilter during 
partition listing
---
 .../org/apache/hudi/BaseHoodieTableFileIndex.java  | 108 +++++--
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  10 +-
 .../hudi/common/table/view/NoOpTableMetadata.java  |   6 +
 .../apache/hudi/metadata/BaseTableMetadata.java    |   4 +-
 .../metadata/FileSystemBackedTableMetadata.java    |   6 +-
 .../apache/hudi/metadata/HoodieTableMetadata.java  |  10 +-
 .../index/TestBaseHoodieTableFileIndex.java        |   2 +-
 .../hudi/hadoop/HiveHoodieTableFileIndex.java      |   1 +
 .../hadoop/HoodieLatestBaseFilesPathFilter.java    |  45 +++
 .../hudi/hadoop/HoodieROTablePathFilter.java       |  15 +-
 .../hudi/hadoop/TestHoodieROTablePathFilter.java   |  16 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  12 +
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |  15 +
 .../apache/hudi/SparkHoodieTableFileIndex.scala    |  22 +-
 .../sql/hudi/common/TestROPathFilterOnRead.scala   | 352 +++++++++++++++++++++
 15 files changed, 584 insertions(+), 40 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java 
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 71a05ebd2e0e..b1e2ff8624da 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -20,6 +20,8 @@ package org.apache.hudi;
 
 import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.storage.StoragePathFilter;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -27,6 +29,7 @@ import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieTableQueryType;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.serialization.HoodieFileSliceSerializer;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -107,8 +110,9 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
   @Getter(AccessLevel.PROTECTED)
   private final List<StoragePath> queryPaths;
 
-  private final boolean shouldIncludePendingCommits;
+  protected final boolean shouldIncludePendingCommits;
   private final boolean shouldValidateInstant;
+  protected final boolean useLatestBaseFilesPathFilterForListing;
 
   // The `shouldListLazily` variable controls how we initialize/refresh the 
TableFileIndex:
   //  - non-lazy/eager listing (shouldListLazily=false):  all partitions and 
file slices will be loaded eagerly during initialization.
@@ -138,17 +142,18 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
   private transient HoodieTableMetadata tableMetadata = null;
 
   /**
-   * @param engineContext                Hudi engine-specific context
-   * @param metaClient                   Hudi table's meta-client
-   * @param configProperties             unifying configuration (in the form 
of generic properties)
-   * @param queryType                    target query type
-   * @param queryPaths                   target DFS paths being queried
-   * @param specifiedQueryInstant        instant as of which table is being 
queried
-   * @param shouldIncludePendingCommits  flags whether file-index should 
exclude any pending operations
-   * @param shouldValidateInstant        flags to validate whether query 
instant is present in the timeline
-   * @param fileStatusCache              transient cache of fetched 
[[FileStatus]]es
-   * @param incrementalQueryStartTime          start completion time for 
incremental query (optional)
-   * @param incrementalQueryEndTime            end completion time for 
incremental query (optional)
+   * @param engineContext                                Hudi engine-specific 
context
+   * @param metaClient                                  Hudi table's 
meta-client
+   * @param configProperties                            unifying configuration 
(in the form of generic properties)
+   * @param queryType                                   target query type
+   * @param queryPaths                                  target DFS paths being 
queried
+   * @param useLatestBaseFilesPathFilterForListing      memory optimization on 
the driver while fetching read optimized results
+   * @param specifiedQueryInstant                       instant as of which 
table is being queried
+   * @param shouldIncludePendingCommits                 flags whether 
file-index should exclude any pending operations
+   * @param shouldValidateInstant                       flags to validate 
whether query instant is present in the timeline
+   * @param fileStatusCache                             transient cache of 
fetched [[FileStatus]]es
+   * @param incrementalQueryStartTime                   start completion time 
for incremental query (optional)
+   * @param incrementalQueryEndTime                     end completion time 
for incremental query (optional)
    */
   public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
                                   HoodieTableMetaClient metaClient,
@@ -156,6 +161,7 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
                                   HoodieTableQueryType queryType,
                                   List<StoragePath> queryPaths,
                                   Option<String> specifiedQueryInstant,
+                                  boolean 
useLatestBaseFilesPathFilterForListing,
                                   boolean shouldIncludePendingCommits,
                                   boolean shouldValidateInstant,
                                   FileStatusCache fileStatusCache,
@@ -165,14 +171,17 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
     this.partitionColumns = metaClient.getTableConfig().getPartitionFields()
         .orElseGet(() -> new String[0]);
 
+    // Disable metadata when ro_path_filter is enabled.
     this.metadataConfig = HoodieMetadataConfig.newBuilder()
         .fromProperties(configProperties)
         .enable(configProperties.getBoolean(ENABLE.key(), 
DEFAULT_METADATA_ENABLE_FOR_READERS)
-            && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient))
+            && HoodieTableMetadataUtil.isFilesPartitionAvailable(metaClient)
+            && !useLatestBaseFilesPathFilterForListing)
         .build();
 
     this.queryType = queryType;
     this.queryPaths = queryPaths;
+    this.useLatestBaseFilesPathFilterForListing = 
useLatestBaseFilesPathFilterForListing;
     this.specifiedQueryInstant = specifiedQueryInstant;
     this.shouldIncludePendingCommits = shouldIncludePendingCommits;
     this.shouldValidateInstant = shouldValidateInstant;
@@ -267,14 +276,70 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
       validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
     }
 
-    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+    Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
+    validate(activeTimeline, queryInstant);
 
-    try (HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
-      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
-      validate(activeTimeline, queryInstant);
+    HoodieTimer timer = HoodieTimer.start();
+    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions, 
activeTimeline);
+    log.info("On {} with query instant as {}, it took {}ms to list all files 
{} Hudi partitions",
+        metaClient.getTableConfig().getTableName(), queryInstant.orElse("N/A"),
+        timer.endTimer(), partitions.size());
+
+    // ROPathFilter optimization is only applicable for COW tables with 
snapshot queries
+    // For MOR tables with READ_OPTIMIZED queries, we also only need base files
+    if (useLatestBaseFilesPathFilterForListing
+        && !shouldIncludePendingCommits
+        && (metaClient.getTableConfig().getTableType() == 
HoodieTableType.COPY_ON_WRITE
+            || queryType == HoodieTableQueryType.READ_OPTIMIZED)) {
+      return generatePartitionFileSlicesPostROTablePathFilter(partitions, 
allFiles);
+    }
+    return filterFiles(partitions, activeTimeline, allFiles, queryInstant);
+  }
+
+  /**
+   * Generates FileSlices from the filtered files returned by ROPathFilter.
+   * This is a fast path that avoids constructing a full 
HoodieTableFileSystemView.
+   * Only applicable for COW tables since ROPathFilter only returns base files.
+   *
+   * @param partitions List of partitions to process
+   * @param allFiles   Files already filtered by ROPathFilter
+   * @return Map of PartitionPath to list of FileSlices
+   */
+  private Map<PartitionPath, List<FileSlice>> 
generatePartitionFileSlicesPostROTablePathFilter(
+      List<PartitionPath> partitions, List<StoragePathInfo> allFiles) {
+    // Group files by partition path, then by file group ID
+    Map<String, PartitionPath> partitionsMap = new HashMap<>();
+    partitions.forEach(p -> partitionsMap.put(p.path, p));
+    Map<PartitionPath, List<FileSlice>> partitionToFileSlices = new 
HashMap<>();
+
+    for (StoragePathInfo pathInfo : allFiles) {
+      // Create FileSlice obj from StoragePathInfo.
+      String relPartitionPath = FSUtils.getRelativePartitionPath(basePath, 
pathInfo.getPath().getParent());
+      HoodieBaseFile baseFile = new HoodieBaseFile(pathInfo);
+      // Use relative partition path for FileSlice - consistent with 
HoodieTableFileSystemView
+      FileSlice fileSlice = new FileSlice(relPartitionPath, 
baseFile.getCommitTime(), baseFile.getFileId());
+      fileSlice.setBaseFile(baseFile);
+
+      // Add the FileSlice to partitionToFileSlices
+      PartitionPath partitionPathObj = partitionsMap.get(relPartitionPath);
+      if (partitionPathObj != null) {
+        List<FileSlice> fileSlices = 
partitionToFileSlices.computeIfAbsent(partitionPathObj, k -> new ArrayList<>());
+        fileSlices.add(fileSlice);
+      } else {
+        log.warn("Could not find partition path object for relative path: {}. 
Skipping file: {}",
+            relPartitionPath, pathInfo.getPath());
+      }
+    }
+    return partitionToFileSlices;
+  }
 
+  private Map<PartitionPath, List<FileSlice>> filterFiles(List<PartitionPath> 
partitions,
+                                                                            
HoodieTimeline activeTimeline,
+                                                                            
List<StoragePathInfo> allFiles,
+                                                                            
Option<String> queryInstant) {
+    try (HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
       // NOTE: For MOR table, when the compaction is inflight, we need to not 
only fetch the
       // latest slices, but also include the base and log files of the 
second-last version of
       // the file slice in the same file group as the latest file slice that 
is under compaction.
@@ -391,7 +456,8 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
   /**
    * Load partition paths and it's files under the query table path.
    */
-  private List<StoragePathInfo> listPartitionPathFiles(List<PartitionPath> 
partitions) {
+  private List<StoragePathInfo> listPartitionPathFiles(List<PartitionPath> 
partitions,
+                                                       HoodieTimeline 
activeTimeline) {
     List<StoragePath> partitionPaths = partitions.stream()
         // NOTE: We're using [[createPathUnsafe]] to create Hadoop's [[Path]] 
objects
         //       instances more efficiently, provided that
@@ -420,7 +486,7 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
 
     try {
       Map<String, List<StoragePathInfo>> fetchedPartitionsMap =
-          
tableMetadata.getAllFilesInPartitions(missingPartitionPathsMap.keySet());
+          
tableMetadata.getAllFilesInPartitions(missingPartitionPathsMap.keySet(), 
getPartitionPathFilter(activeTimeline));
 
       // Ingest newly fetched partitions into cache
       fetchedPartitionsMap.forEach((absolutePath, files) -> {
@@ -440,6 +506,10 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
     }
   }
 
+  protected Option<StoragePathFilter> getPartitionPathFilter(HoodieTimeline 
activeTimeline) {
+    return Option.empty();
+  }
+
   private void doRefresh() {
     HoodieTimer timer = HoodieTimer.start();
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index d1b584eca484..56c0cc3a5b46 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -481,6 +481,13 @@ public class FSUtils {
   public static List<StoragePathInfo> getAllDataFilesInPartition(HoodieStorage 
storage,
                                                                  StoragePath 
partitionPath)
       throws IOException {
+    return getAllDataFilesInPartitionByPathFilter(storage, partitionPath, 
Option.empty());
+  }
+
+  public static List<StoragePathInfo> 
getAllDataFilesInPartitionByPathFilter(HoodieStorage storage,
+                                                                             
StoragePath partitionPath,
+                                                                             
Option<StoragePathFilter> pathFilterOption)
+      throws IOException {
     final Set<String> validFileExtensions = 
Arrays.stream(HoodieFileFormat.values())
         
.map(HoodieFileFormat::getFileExtension).collect(Collectors.toCollection(HashSet::new));
     final String logFileExtension = 
HoodieFileFormat.HOODIE_LOG.getFileExtension();
@@ -488,7 +495,8 @@ public class FSUtils {
     try {
       return storage.listDirectEntries(partitionPath, path -> {
         String extension = FSUtils.getFileExtension(path.getName());
-        return validFileExtensions.contains(extension) || 
path.getName().contains(logFileExtension);
+        return (validFileExtensions.contains(extension) || 
path.getName().contains(logFileExtension))
+            && pathFilterOption.map(filter -> 
filter.accept(path)).orElse(true);
       }).stream().filter(StoragePathInfo::isFile).collect(Collectors.toList());
     } catch (FileNotFoundException ex) {
       // return empty FileStatus if partition does not exist already
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
index 0522e9ba50af..3425ca829e0f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
@@ -36,6 +36,7 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.metadata.RawKey;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
 import org.apache.hudi.storage.StoragePathInfo;
 
 import java.io.IOException;
@@ -75,6 +76,11 @@ class NoOpTableMetadata implements HoodieTableMetadata {
     throw new HoodieMetadataException("Unsupported operation: 
getAllFilesInPartitions!");
   }
 
+  @Override
+  public Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitionPaths, 
Option<StoragePathFilter> pathFilterOption) throws IOException {
+    throw new HoodieMetadataException("Unsupported operation: 
getAllFilesInPartitions!");
+  }
+
   @Override
   public Option<BloomFilter> getBloomFilter(String partitionName, String 
fileName) throws HoodieMetadataException {
     throw new HoodieMetadataException("Unsupported operation: 
getBloomFilter!");
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index a567a8053c06..f59a257dd777 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -43,6 +43,7 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.storage.StoragePathFilter;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
@@ -146,7 +147,8 @@ public abstract class BaseTableMetadata extends 
AbstractHoodieTableMetadata {
   }
 
   @Override
-  public Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitions)
+  public Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitions,
+                                                                    
Option<StoragePathFilter> unused)
       throws IOException {
     ValidationUtils.checkArgument(isMetadataTableInitialized);
     if (partitions.isEmpty()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 5b1878ccd53e..4f6b0e439efa 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -42,6 +42,7 @@ import org.apache.hudi.expression.Predicates;
 import org.apache.hudi.internal.schema.Types;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
 import org.apache.hudi.storage.StoragePathInfo;
 
 import java.io.FileNotFoundException;
@@ -243,7 +244,8 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
   }
 
   @Override
-  public Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitionPaths)
+  public Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitionPaths,
+                                                                    
Option<StoragePathFilter> pathFilterOption)
       throws IOException {
     if (partitionPaths == null || partitionPaths.isEmpty()) {
       return Collections.emptyMap();
@@ -260,7 +262,7 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
             partitionPathStr -> {
               StoragePath partitionPath = new StoragePath(partitionPathStr);
               return Pair.of(partitionPathStr,
-                  FSUtils.getAllDataFilesInPartition(getStorage(), 
partitionPath));
+                  FSUtils.getAllDataFilesInPartitionByPathFilter(getStorage(), 
partitionPath, pathFilterOption));
             }, parallelism);
     engineContext.clearJobStatus();
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 779965e808b4..1da1f77400e6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -32,6 +32,7 @@ import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.expression.Expression;
 import org.apache.hudi.internal.schema.Types;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
 import org.apache.hudi.storage.StoragePathInfo;
 
 import org.slf4j.Logger;
@@ -153,8 +154,13 @@ public interface HoodieTableMetadata extends Serializable, 
AutoCloseable {
    *
    * NOTE: Absolute partition paths are expected here
    */
-  Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitionPaths)
-      throws IOException;
+  default Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitionPaths)
+      throws IOException {
+    return getAllFilesInPartitions(partitionPaths, Option.empty());
+  }
+
+  Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitionPaths,
+                                                             
Option<StoragePathFilter> pathFilterOption) throws IOException;
 
   /**
    * Get the bloom filter for the FileID from the metadata table.
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBaseHoodieTableFileIndex.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBaseHoodieTableFileIndex.java
index a596d48bb54e..5ad1a207b39c 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBaseHoodieTableFileIndex.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/bootstrap/index/TestBaseHoodieTableFileIndex.java
@@ -74,7 +74,7 @@ class TestBaseHoodieTableFileIndex extends 
HoodieCommonTestHarness {
     public TestLocalIndex(HoodieEngineContext engineContext, 
HoodieTableMetaClient metaClient, TypedProperties configProperties, 
HoodieTableQueryType queryType,
                           List<StoragePath> queryPaths, Option<String> 
specifiedQueryInstant, boolean shouldIncludePendingCommits, boolean 
shouldValidateInstant,
                           FileStatusCache fileStatusCache, boolean 
shouldListLazily, Option<String> startCompletionTime, Option<String> 
endCompletionTime) {
-      super(engineContext, metaClient, configProperties, queryType, 
queryPaths, specifiedQueryInstant, shouldIncludePendingCommits, 
shouldValidateInstant, fileStatusCache, shouldListLazily,
+      super(engineContext, metaClient, configProperties, queryType, 
queryPaths, specifiedQueryInstant, false, shouldIncludePendingCommits, 
shouldValidateInstant, fileStatusCache, shouldListLazily,
           startCompletionTime, endCompletionTime);
     }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
index a77860e5892d..3502cc5345a8 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
@@ -55,6 +55,7 @@ public class HiveHoodieTableFileIndex extends 
BaseHoodieTableFileIndex {
         queryType,
         queryPaths,
         specifiedQueryInstant,
+        false,
         shouldIncludePendingCommits,
         true,
         new NoopCache(),
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLatestBaseFilesPathFilter.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLatestBaseFilesPathFilter.java
new file mode 100644
index 000000000000..362ac8c53677
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieLatestBaseFilesPathFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathFilter;
+
+public class HoodieLatestBaseFilesPathFilter implements StoragePathFilter {
+
+  private HoodieROTablePathFilter roTablePathFilter;
+
+  public HoodieLatestBaseFilesPathFilter(HoodieROTablePathFilter 
roTablePathFilter) {
+    this.roTablePathFilter = roTablePathFilter;
+  }
+
+  public HoodieLatestBaseFilesPathFilter(StorageConfiguration conf,
+                                         HoodieTableMetaClient metaClient,
+                                         HoodieTimeline completedTimeline) {
+    roTablePathFilter = new HoodieROTablePathFilter(conf, metaClient, 
completedTimeline);
+  }
+
+  @Override
+  public boolean accept(StoragePath path) {
+    return roTablePathFilter.accept(path);
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index 99a8c7f6710b..23ec057cf18c 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
@@ -99,17 +100,17 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
 
   private transient HoodieLocalEngineContext engineContext;
 
-
   private transient HoodieStorage storage;
 
   public HoodieROTablePathFilter() {
-    this(new Configuration());
+    this(HadoopFSUtils.getStorageConf());
   }
 
-  public HoodieROTablePathFilter(Configuration conf) {
+  @VisibleForTesting
+  public HoodieROTablePathFilter(StorageConfiguration storageConf) {
     this.hoodiePathCache = new ConcurrentHashMap<>();
     this.nonHoodiePathCache = new HashSet<>();
-    this.conf = HadoopFSUtils.getStorageConfWithCopy(conf);
+    this.conf = storageConf;
     this.metaClientCache = new HashMap<>();
     this.completedTimelineCache =  new HashMap<>();
   }
@@ -117,7 +118,7 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
   /**
    * By passing metaClient and completedTimeline, we can sync the view seen 
from this class against HoodieFileIndex class
    */
-  public HoodieROTablePathFilter(Configuration conf,
+  public HoodieROTablePathFilter(StorageConfiguration conf,
                                  HoodieTableMetaClient metaClient,
                                  HoodieTimeline completedTimeline) {
     this(conf);
@@ -138,6 +139,10 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
     return null;
   }
 
+  public boolean accept(StoragePath path) {
+    return accept(new Path(path.toString()));
+  }
+
   @Override
   public boolean accept(Path path) {
 
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java
index 545182ed8c59..e7d25a8b6bd5 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java
@@ -25,10 +25,10 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieTimeTravelException;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -51,7 +51,7 @@ public class TestHoodieROTablePathFilter extends 
HoodieCommonTestHarness {
   @BeforeEach
   public void setUp() throws Exception {
     initMetaClient();
-    pathFilter = new 
HoodieROTablePathFilter(metaClient.getStorageConf().unwrapAs(Configuration.class));
+    pathFilter = new HoodieROTablePathFilter(metaClient.getStorageConf());
     testTable = HoodieTestTable.of(metaClient);
   }
 
@@ -159,7 +159,7 @@ public class TestHoodieROTablePathFilter extends 
HoodieCommonTestHarness {
     validateTimestampAsOf(metaClient, commit001); // Should not throw
 
     // Test 3: HoodieROTablePathFilter with Configuration-only constructor
-    Configuration confWithAsOf = new 
Configuration(metaClient.getStorageConf().unwrapAs(Configuration.class));
+    StorageConfiguration confWithAsOf = metaClient.getStorageConf();
     confWithAsOf.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(), commit003);
 
     HoodieROTablePathFilter pathFilterWithAsOf = new 
HoodieROTablePathFilter(confWithAsOf);
@@ -208,14 +208,14 @@ public class TestHoodieROTablePathFilter extends 
HoodieCommonTestHarness {
 
     // Test 1: HoodieROTablePathFilter without TIMESTAMP_AS_OF should work 
(normal operation)
     HoodieROTablePathFilter filterWithoutAsOf = new HoodieROTablePathFilter(
-        metaClient.getStorageConf().unwrapAs(Configuration.class), metaClient,
+        metaClient.getStorageConf(), metaClient,
         metaClient.getActiveTimeline().filterCompletedInstants());
 
     assertTrue(filterWithoutAsOf.accept(file1Path), "File from commit001 
should be accepted");
     assertTrue(filterWithoutAsOf.accept(file3Path), "File from commit003 
should be accepted");
 
     // Test 2: HoodieROTablePathFilter with TIMESTAMP_AS_OF before inflight 
should work
-    Configuration confBeforeInflight = new 
Configuration(metaClient.getStorageConf().unwrapAs(Configuration.class));
+    StorageConfiguration confBeforeInflight = metaClient.getStorageConf();
     confBeforeInflight.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(), 
commit001);
 
     HoodieROTablePathFilter filterBeforeInflight = new 
HoodieROTablePathFilter(confBeforeInflight, metaClient,
@@ -224,7 +224,7 @@ public class TestHoodieROTablePathFilter extends 
HoodieCommonTestHarness {
     assertTrue(filterBeforeInflight.accept(file1Path), "File from commit001 
should be accepted with as.of.instant=001");
 
     // Test 3: HoodieROTablePathFilter with TIMESTAMP_AS_OF after inflight 
should fail during accept()
-    Configuration confAfterInflight = new 
Configuration(metaClient.getStorageConf().unwrapAs(Configuration.class));
+    StorageConfiguration confAfterInflight = metaClient.getStorageConf();
     confAfterInflight.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(), commit003);
 
     HoodieROTablePathFilter filterAfterInflight = new 
HoodieROTablePathFilter(confAfterInflight, metaClient,
@@ -236,7 +236,7 @@ public class TestHoodieROTablePathFilter extends 
HoodieCommonTestHarness {
     }, "Calling accept() with as.of.instant=003 should fail due to inflight 
commit002");
 
     // Test 4: Configuration-only constructor with TIMESTAMP_AS_OF after 
inflight should fail during accept()
-    Configuration confOnlyAfterInflight = new 
Configuration(metaClient.getStorageConf().unwrapAs(Configuration.class));
+    StorageConfiguration confOnlyAfterInflight = metaClient.getStorageConf();
     confOnlyAfterInflight.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(), 
commit003);
 
     HoodieROTablePathFilter filterConfOnlyAfterInflight = new 
HoodieROTablePathFilter(confOnlyAfterInflight, metaClient,
@@ -278,7 +278,7 @@ public class TestHoodieROTablePathFilter extends 
HoodieCommonTestHarness {
     // when there are inflight commits that would cause validation to fail
 
     // Test 1: HoodieROTablePathFilter should work without as.of.instant even 
when inflight commits exist
-    Configuration confWithoutAsOf = new 
Configuration(metaClient.getStorageConf().unwrapAs(Configuration.class));
+    StorageConfiguration confWithoutAsOf = metaClient.getStorageConf();
     HoodieROTablePathFilter filterWithoutAsOf = new 
HoodieROTablePathFilter(confWithoutAsOf, metaClient,
         metaClient.getActiveTimeline().filterCompletedInstants());
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 8f10823cb870..7e92b47f3321 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -227,6 +227,18 @@ object DataSourceReadOptions {
         " by carefully analyzing provided partition-column predicates and 
deducing corresponding partition-path prefix from " +
         " them (if possible).")
 
+  val FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER: 
ConfigProperty[Boolean] =
+    
ConfigProperty.key("hoodie.datasource.read.file.index.optimize.listing.using.path.filter")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("1.2.0")
+      .withDocumentation("Controls whether file listing is done using the 
HoodieROTablePathFilter. " +
+        " This is mainly necessary when the metadata table is not enabled or 
corrupted and the job " +
+        " is doing recursive calls to fetch the partition paths and the 
dataset has multiple versions" +
+        " of the same file in the same partition and it could lead to Out of 
Memory on the driver if" +
+        " the dataset is too large. Another important limitation is that this 
config should not be" +
+        " used if there are bootstrap files present in the file system. NOTE: 
Only works for COW tables with snapshot queries.")
+
   val INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN: ConfigProperty[String] = 
ConfigProperty
     .key("hoodie.datasource.read.incr.fallback.fulltablescan.enable")
     .defaultValue("true")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 9658fd451ec8..34a67cfb3a5b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -527,6 +527,21 @@ object HoodieFileIndex extends Logging {
       
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
 listingModeOverride)
     }
 
+    var pathFilterOptimizedListingEnabled = getConfigValue(options, sqlConf,
+      
DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key, 
null)
+    if (pathFilterOptimizedListingEnabled != null) {
+      
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
+        pathFilterOptimizedListingEnabled)
+    } else {
+      // Also allow passing in the path filter config via Spark session conf 
for convenience
+      pathFilterOptimizedListingEnabled = getConfigValue(options, sqlConf,
+        "spark." + 
DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key, 
null)
+      if (pathFilterOptimizedListingEnabled != null) {
+        
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
+          pathFilterOptimizedListingEnabled)
+      }
+    }
+
     if (tableConfig != null) {
       properties.setProperty(RECORDKEY_FIELD.key, 
tableConfig.getRecordKeyFields.orElse(Array.empty).mkString(","))
       properties.setProperty(PARTITIONPATH_FIELD.key, 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig).orElse(""))
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
index 567053c618a7..077e1129eb5e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
@@ -22,14 +22,16 @@ import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, 
extractEqualityPredicatesLiteralValues, generateFieldMap, 
haveProperPartitionValues, shouldListLazily, 
shouldUsePartitionPathPrefixAnalysis, shouldValidatePartitionColumns}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.config.{HoodieCommonConfig, TypedProperties}
 import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
 import 
org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION
 import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.util.ReflectionUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
+import org.apache.hudi.hadoop.HoodieLatestBaseFilesPathFilter
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.internal.schema.Types.RecordType
 import org.apache.hudi.internal.schema.utils.Conversions
@@ -88,6 +90,8 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
     deduceQueryType(configProperties),
     queryPaths.asJava,
     toJavaOption(specifiedQueryInstant),
+    
configProperties.getBoolean(FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
+      FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.defaultValue()),
     false,
     false,
     SparkHoodieTableFileIndex.adapt(fileStatusCache),
@@ -439,6 +443,22 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
 
   private def arePartitionPathsUrlEncoded: Boolean =
     metaClient.getTableConfig.getUrlEncodePartitioning.toBoolean
+
+  override protected def getPartitionPathFilter(activeTimeline: 
HoodieTimeline): 
org.apache.hudi.common.util.Option[org.apache.hudi.storage.StoragePathFilter] = 
{
+    if (useLatestBaseFilesPathFilterForListing && 
!shouldIncludePendingCommits) {
+      // Use getStorageConfWithCopy to avoid mutating the shared Spark session 
config
+      val conf = 
HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration)
+      if (specifiedQueryInstant.isDefined) {
+        conf.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(), 
specifiedQueryInstant.get)
+      }
+      org.apache.hudi.common.util.Option.of(
+        new HoodieLatestBaseFilesPathFilter(conf, metaClient,
+          activeTimeline.filterCompletedInstantsOrRewriteTimeline()))
+    } else {
+      org.apache.hudi.common.util.Option.empty()
+    }
+  }
+
 }
 
 object SparkHoodieTableFileIndex extends SparkAdapterSupport {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestROPathFilterOnRead.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestROPathFilterOnRead.scala
new file mode 100644
index 000000000000..8327cf7a8408
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestROPathFilterOnRead.scala
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+/**
+ * Tests for ROPathFilter optimization with advanced scenarios and edge cases.
+ */
+class TestROPathFilterAdvanced extends HoodieSparkSqlTestBase {
+
+  val RO_PATH_FILTER_OPT_KEY = 
"hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter"
+
+  test("Test ROPathFilter with empty table") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  orderingFields = 'ts'
+           | )
+       """.stripMargin)
+
+      // Query empty table with ROPathFilter enabled
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        val result = spark.sql(s"select * from $tableName").collect()
+        assert(result.length == 0)
+      }
+    }
+  }
+
+  test("Test ROPathFilter with partition pruning") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  dt string
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  orderingFields = 'ts'
+           | )
+           | partitioned by (dt)
+       """.stripMargin)
+
+      // Query empty table with ROPathFilter enabled
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        val result = spark.sql(s"select * from $tableName").collect()
+        assert(result.length == 0)
+      }
+
+      // Insert data across multiple partitions
+      spark.sql(s"""insert into $tableName values(1, "a1", 10.0, 1000, 
"2024-01-01")""")
+      spark.sql(s"""insert into $tableName values(2, "a2", 20.0, 2000, 
"2024-01-02")""")
+
+      // Update data in first partition
+      spark.sql(s"update $tableName set price = 15.0 where id = 1")
+
+      // Query single partition with ROPathFilter
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where dt 
= '2024-01-01'")(
+          Seq(1, "a1", 15.0, 1000, "2024-01-01")
+        )
+      }
+    }
+  }
+
+  test("Test ROPathFilter with concurrent inserts to different partitions") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  region string
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  orderingFields = 'ts'
+           | )
+           | partitioned by (region)
+       """.stripMargin)
+
+      // Insert data to different partitions
+      spark.sql(s"""insert into $tableName values(1, "a1", 10.0, 1000, 
"US")""")
+      spark.sql(s"""insert into $tableName values(2, "a2", 20.0, 2000, 
"EU")""")
+      spark.sql(s"""insert into $tableName values(3, "a3", 30.0, 3000, 
"APAC")""")
+      spark.sql(s"""insert into $tableName values(4, "a4", 40.0, 4000, 
"US")""")
+
+      // Query all data with ROPathFilter
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        checkAnswer(s"select id, name, price, ts, region from $tableName order 
by id")(
+          Seq(1, "a1", 10.0, 1000, "US"),
+          Seq(2, "a2", 20.0, 2000, "EU"),
+          Seq(3, "a3", 30.0, 3000, "APAC"),
+          Seq(4, "a4", 40.0, 4000, "US")
+        )
+      }
+    }
+  }
+
+  test("Test ROPathFilter with multiple deletes and updates") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  orderingFields = 'ts'
+           | )
+       """.stripMargin)
+
+      // Insert initial data
+      for (i <- 1 to 10) {
+        spark.sql(s"""insert into $tableName values($i, "name$i", ${i * 10.0}, 
${i * 1000})""")
+      }
+
+      // Perform mix of updates and deletes
+      spark.sql(s"update $tableName set price = price * 2 where id % 2 = 0")
+      spark.sql(s"delete from $tableName where id % 3 = 0")
+
+      // Query with ROPathFilter
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        val result = spark.sql(s"select id, name, price, ts from $tableName 
order by id").collect()
+        // Should have deleted records where id % 3 = 0 (3, 6, 9)
+        // Should have doubled price for even ids (2, 4, 8, 10)
+        assert(result.length == 7) // 10 - 3 deleted = 7
+
+        // Check a few specific values
+        val row2 = result.find(_.getInt(0) == 2).get
+        assert(row2.getDouble(2) == 40.0) // doubled from 20.0
+
+        val row5 = result.find(_.getInt(0) == 5).get
+        assert(row5.getDouble(2) == 50.0) // not doubled (odd)
+      }
+    }
+  }
+
+  test("Test ROPathFilter with mixed partition and non-partition columns in 
filter") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  category string
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  orderingFields = 'ts'
+           | )
+           | partitioned by (category)
+       """.stripMargin)
+
+      // Insert data
+      spark.sql(s"""insert into $tableName values(1, "a1", 10.0, 1000, 
"electronics")""")
+      spark.sql(s"""insert into $tableName values(2, "a2", 20.0, 2000, 
"electronics")""")
+      spark.sql(s"""insert into $tableName values(3, "a3", 30.0, 3000, 
"books")""")
+      spark.sql(s"""insert into $tableName values(4, "a4", 40.0, 4000, 
"books")""")
+
+      // Update some records
+      spark.sql(s"update $tableName set price = 15.0 where id = 1")
+
+      // Query with both partition and data filters
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        checkAnswer(s"select id, name, price, ts, category from $tableName 
where category = 'electronics' and price > 12.0 order by id")(
+          Seq(1, "a1", 15.0, 1000, "electronics"),
+          Seq(2, "a2", 20.0, 2000, "electronics")
+        )
+      }
+    }
+  }
+
+  test("Test ROPathFilter correctness with complex update patterns") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  version int,
+           |  data string,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  orderingFields = 'ts'
+           | )
+       """.stripMargin)
+
+      // Insert and update the same record multiple times
+      spark.sql(s"""insert into $tableName values(1, 1, "initial", 1000)""")
+      spark.sql(s"""update $tableName set version = 2, data = "updated_v2" 
where id = 1""")
+      spark.sql(s"""update $tableName set version = 3, data = "updated_v3" 
where id = 1""")
+      spark.sql(s"""update $tableName set version = 4, data = "updated_v4" 
where id = 1""")
+
+      // Insert another record
+      spark.sql(s"""insert into $tableName values(2, 1, "second_record", 
2000)""")
+
+      // Query with ROPathFilter should return only latest versions
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        checkAnswer(s"select id, version, data, ts from $tableName order by 
id")(
+          Seq(1, 4, "updated_v4", 1000),
+          Seq(2, 1, "second_record", 2000)
+        )
+      }
+
+      // Without ROPathFilter should still return same results (correct 
filtering)
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "false") {
+        checkAnswer(s"select id, version, data, ts from $tableName order by 
id")(
+          Seq(1, 4, "updated_v4", 1000),
+          Seq(2, 1, "second_record", 2000)
+        )
+      }
+    }
+  }
+
+  test("Test ROPathFilter with time travel queries") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = tmp.getCanonicalPath
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  orderingFields = 'ts'
+           | )
+       """.stripMargin)
+
+      // Insert initial data (commit 1) - use single insert to ensure all 
records are in same commit
+      spark.sql(s"""insert into $tableName values
+                   |(1, "v1_name1", 10.0, 1000),
+                   |(2, "v1_name2", 20.0, 2000),
+                   |(3, "v1_name3", 30.0, 3000)""".stripMargin)
+
+      // Get first commit timestamp
+      val commit1 = spark.sql(s"select distinct(_hoodie_commit_time) from 
$tableName").collect()(0).getString(0)
+
+      // Update data (commit 2)
+      spark.sql(s"""update $tableName set name = "v2_name1", price = 15.0 
where id = 1""")
+      val commit2 = spark.sql(s"select distinct(_hoodie_commit_time) from 
$tableName where id = 1").collect()(0).getString(0)
+
+      // Update data again (commit 3)
+      spark.sql(s"""update $tableName set name = "v3_name2", price = 25.0 
where id = 2""")
+      val commit3 = spark.sql(s"select distinct(_hoodie_commit_time) from 
$tableName where id = 2").collect()(0).getString(0)
+
+      // Delete a record (commit 4)
+      spark.sql(s"""delete from $tableName where id = 3""")
+
+      // Test current state with ROPathFilter
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+          Seq(1, "v2_name1", 15.0, 1000),
+          Seq(2, "v3_name2", 25.0, 2000)
+        )
+      }
+
+      // Test time travel to commit 1 with ROPathFilter
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        checkAnswer(s"select id, name, price, ts from $tableName timestamp as 
of '$commit1' order by id")(
+          Seq(1, "v1_name1", 10.0, 1000),
+          Seq(2, "v1_name2", 20.0, 2000),
+          Seq(3, "v1_name3", 30.0, 3000)
+        )
+      }
+
+      // Test time travel to commit 2 with ROPathFilter
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        checkAnswer(s"select id, name, price, ts from $tableName timestamp as 
of '$commit2' order by id")(
+          Seq(1, "v2_name1", 15.0, 1000),
+          Seq(2, "v1_name2", 20.0, 2000),
+          Seq(3, "v1_name3", 30.0, 3000)
+        )
+      }
+
+      // Test time travel to commit 3 with ROPathFilter
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "true") {
+        checkAnswer(s"select id, name, price, ts from $tableName timestamp as 
of '$commit3' order by id")(
+          Seq(1, "v2_name1", 15.0, 1000),
+          Seq(2, "v3_name2", 25.0, 2000),
+          Seq(3, "v1_name3", 30.0, 3000)
+        )
+      }
+
+      // Verify time travel returns same results with and without ROPathFilter
+      withSQLConf(s"spark.$RO_PATH_FILTER_OPT_KEY" -> "false") {
+        checkAnswer(s"select id, name, price, ts from $tableName timestamp as 
of '$commit1' order by id")(
+          Seq(1, "v1_name1", 10.0, 1000),
+          Seq(2, "v1_name2", 20.0, 2000),
+          Seq(3, "v1_name3", 30.0, 3000)
+        )
+      }
+    }
+  }
+}

Reply via email to