suryaprasanna commented on code in PR #18136:
URL: https://github.com/apache/hudi/pull/18136#discussion_r2843630529


##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>> 
loadFileSlicesForPartitions(List<Par
       validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
     }
 
-    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+    Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
+    validate(activeTimeline, queryInstant);
 
-    try (HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
-      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
-      validate(activeTimeline, queryInstant);
+    HoodieTimer timer = HoodieTimer.start();
+    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions, 
activeTimeline);
+    log.info("On {} with query instant as {}, it took {}ms to list all files 
{} Hudi partitions",
+        metaClient.getTableConfig().getTableName(), queryInstant.map(instant 
-> instant).orElse("N/A"),
+        timer.endTimer(), partitions.size());
+
+    if (useROPathFilterForListing && !shouldIncludePendingCommits) {
+      // Group files by partition path, then by file group ID
+      Map<String, PartitionPath> partitionsMap = new HashMap<>();
+      partitions.forEach(p -> partitionsMap.put(p.path, p));
+      Map<PartitionPath, List<FileSlice>> partitionToFileSlices = new 
HashMap<>();
+

Review Comment:
   @yihua  PartitionPath object stores only relative partition path str, not 
absolute paths. Which code path are you referring it?



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -143,6 +146,7 @@ public abstract class BaseHoodieTableFileIndex implements 
AutoCloseable {
    * @param configProperties             unifying configuration (in the form 
of generic properties)
    * @param queryType                    target query type
    * @param queryPaths                   target DFS paths being queried

Review Comment:
   Added MOR table type condition.



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>> 
loadFileSlicesForPartitions(List<Par
       validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
     }
 
-    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+    Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
+    validate(activeTimeline, queryInstant);
 
-    try (HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
-      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
-      validate(activeTimeline, queryInstant);
+    HoodieTimer timer = HoodieTimer.start();
+    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions, 
activeTimeline);
+    log.info("On {} with query instant as {}, it took {}ms to list all files 
{} Hudi partitions",
+        metaClient.getTableConfig().getTableName(), queryInstant.map(instant 
-> instant).orElse("N/A"),
+        timer.endTimer(), partitions.size());

Review Comment:
   Added table type equals COW condition.



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>> 
loadFileSlicesForPartitions(List<Par
       validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
     }
 
-    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+    Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
+    validate(activeTimeline, queryInstant);
 
-    try (HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
-      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
-      validate(activeTimeline, queryInstant);
+    HoodieTimer timer = HoodieTimer.start();
+    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions, 
activeTimeline);
+    log.info("On {} with query instant as {}, it took {}ms to list all files 
{} Hudi partitions",
+        metaClient.getTableConfig().getTableName(), queryInstant.map(instant 
-> instant).orElse("N/A"),
+        timer.endTimer(), partitions.size());
+
+    if (useROPathFilterForListing && !shouldIncludePendingCommits) {
+      // Group files by partition path, then by file group ID
+      Map<String, PartitionPath> partitionsMap = new HashMap<>();
+      partitions.forEach(p -> partitionsMap.put(p.path, p));
+      Map<PartitionPath, List<FileSlice>> partitionToFileSlices = new 
HashMap<>();
+
+      for (StoragePathInfo pathInfo : allFiles) {
+        // Create FileSlice obj from StoragePathInfo.
+        String partitionPathStr = pathInfo.getPath().getParent().toString();
+        String relPartitionPath = FSUtils.getRelativePartitionPath(basePath, 
pathInfo.getPath().getParent());
+        HoodieBaseFile baseFile = new HoodieBaseFile(pathInfo);
+        FileSlice fileSlice = new FileSlice(partitionPathStr, 
baseFile.getCommitTime(), baseFile.getFileId());
+        fileSlice.setBaseFile(baseFile);
+
+        // Add the FileSlice to partitionToFileSlices
+        PartitionPath partitionPathObj = partitionsMap.get(relPartitionPath);
+        List<FileSlice> fileSlices = 
partitionToFileSlices.computeIfAbsent(partitionPathObj, k -> new ArrayList<>());
+        fileSlices.add(fileSlice);

Review Comment:
   Creating a file system view is costly, we have actually created fsv  via 
ropathfilter. 
   Adding fsv again again here means we are doing it twice so we should avoid 
it here.



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>> 
loadFileSlicesForPartitions(List<Par
       validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
     }
 
-    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+    Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
+    validate(activeTimeline, queryInstant);
 
-    try (HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
-      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
-      validate(activeTimeline, queryInstant);
+    HoodieTimer timer = HoodieTimer.start();
+    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions, 
activeTimeline);
+    log.info("On {} with query instant as {}, it took {}ms to list all files 
{} Hudi partitions",
+        metaClient.getTableConfig().getTableName(), queryInstant.map(instant 
-> instant).orElse("N/A"),
+        timer.endTimer(), partitions.size());
+
+    if (useROPathFilterForListing && !shouldIncludePendingCommits) {
+      // Group files by partition path, then by file group ID
+      Map<String, PartitionPath> partitionsMap = new HashMap<>();

Review Comment:
   Done.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -146,6 +147,12 @@ public List<StoragePathInfo> 
getAllFilesInPartition(StoragePath partitionPath) t
   }
 
   @Override

Review Comment:
   Yes my bad, it is a refactoring mistake, fixed it now.



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java:
##########
@@ -99,25 +100,25 @@ public class HoodieROTablePathFilter implements 
Configurable, PathFilter, Serial
 
   private transient HoodieLocalEngineContext engineContext;
 
-
   private transient HoodieStorage storage;
 
   public HoodieROTablePathFilter() {
-    this(new Configuration());
+    this(HadoopFSUtils.getStorageConf());

Review Comment:
   @yihua  HoodieCopyOnWriteSnapshotHadoopFsRelationFactory uses 
HoodieFileIndex. To optimize the file system view calls in HoodieFileIndex we 
are using HoodieROPathFilter. I think originally HoodieROPathFilter is used at 
a Relation level, now we downgraded to PathFilter level. So, with the current 
setup it should be fine right?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -509,6 +509,7 @@ object HoodieFileIndex extends Logging {
   }
 
   def getConfigProperties(spark: SparkSession, options: Map[String, String], 
tableConfig: HoodieTableConfig): TypedProperties = {
+    logInfo("Options provided to the file index are " + options)
     val sqlConf: SQLConf = spark.sessionState.conf

Review Comment:
   Yeah, in my another PR. I added another log statement that should cover for 
debugging, so removing this from here.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -439,6 +443,21 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
 
   private def arePartitionPathsUrlEncoded: Boolean =
     metaClient.getTableConfig.getUrlEncodePartitioning.toBoolean
+
+  override protected def getPartitionPathFilter(activeTimeline: 
HoodieTimeline): 
org.apache.hudi.common.util.Option[org.apache.hudi.storage.StoragePathFilter] = 
{
+    if (useROPathFilterForListing && !shouldIncludePendingCommits) {
+      val conf = 
HadoopFSUtils.getStorageConf(spark.sparkContext.hadoopConfiguration)
+      if (specifiedQueryInstant.isDefined) {
+        conf.set(HoodieCommonConfig.TIMESTAMP_AS_OF.key(), 
specifiedQueryInstant.get)
+      }

Review Comment:
   Good catch, made the change requested.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -227,6 +227,18 @@ object DataSourceReadOptions {
         " by carefully analyzing provided partition-column predicates and 
deducing corresponding partition-path prefix from " +
         " them (if possible).")
 
+  val FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER: 
ConfigProperty[Boolean] =
+    
ConfigProperty.key("hoodie.datasource.read.file.index.list.file.statuses.using.ro.path.filter")

Review Comment:
   Made the change.



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -481,14 +481,22 @@ public static boolean isDataFile(StoragePath path) {
   public static List<StoragePathInfo> getAllDataFilesInPartition(HoodieStorage 
storage,
                                                                  StoragePath 
partitionPath)
       throws IOException {
+    return getAllDataFilesInPartition(storage, partitionPath, Option.empty());
+  }
+
+  public static List<StoragePathInfo> getAllDataFilesInPartition(HoodieStorage 
storage,

Review Comment:
   Renamed it to getAllDataFilesInPartitionByPathFilter



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -527,6 +528,21 @@ object HoodieFileIndex extends Logging {
       
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
 listingModeOverride)
     }
 
+    var hoodieROTablePathFilterBasedFileListingEnabled = 
getConfigValue(options, sqlConf,
+      
DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key, 
null)
+    if (hoodieROTablePathFilterBasedFileListingEnabled != null) {
+      
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,
+        hoodieROTablePathFilterBasedFileListingEnabled)
+    } else {
+      // For 0.14 rollout we also allow passing in the HMS listing config via 
Spark itself
+      hoodieROTablePathFilterBasedFileListingEnabled = getConfigValue(options, 
sqlConf,
+        "spark." + 
DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key, 
null)
+      if (hoodieROTablePathFilterBasedFileListingEnabled != null) {
+        
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LIST_FILE_STATUSES_USING_RO_PATH_FILTER.key,

Review Comment:
   My bad, corrected the comment.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -146,6 +147,12 @@ public List<StoragePathInfo> 
getAllFilesInPartition(StoragePath partitionPath) t
   }
 
   @Override
+  public Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitions,
+                                                                    
Option<StoragePathFilter> unused)
+      throws IOException {
+    return getAllFilesInPartitions(partitions);
+  }
+
   public Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitions)

Review Comment:
   Yeah, makes sense. Refactored the code accordingly.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -527,6 +528,21 @@ object HoodieFileIndex extends Logging {
       
properties.setProperty(DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key,
 listingModeOverride)
     }
 
+    var hoodieROTablePathFilterBasedFileListingEnabled = 
getConfigValue(options, sqlConf,

Review Comment:
   Yes, updated.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java:
##########
@@ -156,6 +157,12 @@ List<String> 
getPartitionPathWithPathPrefixUsingFilterExpression(List<String> re
   Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitionPaths)
       throws IOException;
 
+  default Map<String, List<StoragePathInfo>> 
getAllFilesInPartitions(Collection<String> partitionPaths,
+                                                                      
Option<StoragePathFilter> pathFilterOption)
+      throws IOException {
+    return getAllFilesInPartitions(partitionPaths);
+  }

Review Comment:
   Good idea, made the suggested code changes.



##########
hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java:
##########
@@ -267,14 +275,46 @@ private Map<PartitionPath, List<FileSlice>> 
loadFileSlicesForPartitions(List<Par
       validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
     }
 
-    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions);
     HoodieTimeline activeTimeline = getActiveTimeline();
     Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
+    Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
+    validate(activeTimeline, queryInstant);
 
-    try (HoodieTableFileSystemView fileSystemView = new 
HoodieTableFileSystemView(metaClient, activeTimeline, allFiles)) {
-      Option<String> queryInstant = specifiedQueryInstant.or(() -> 
latestInstant.map(HoodieInstant::requestedTime));
-      validate(activeTimeline, queryInstant);
+    HoodieTimer timer = HoodieTimer.start();
+    List<StoragePathInfo> allFiles = listPartitionPathFiles(partitions, 
activeTimeline);
+    log.info("On {} with query instant as {}, it took {}ms to list all files 
{} Hudi partitions",
+        metaClient.getTableConfig().getTableName(), queryInstant.map(instant 
-> instant).orElse("N/A"),

Review Comment:
   Made the suggested change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to