[ 
https://issues.apache.org/jira/browse/HUDI-3864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Sumit updated HUDI-3864:
------------------------------
    Fix Version/s: 0.12.1
                       (was: 0.12.0)

> Avoid fetching all files for all partitions on the read/query path for flink
> ----------------------------------------------------------------------------
>
>                 Key: HUDI-3864
>                 URL: https://issues.apache.org/jira/browse/HUDI-3864
>             Project: Apache Hudi
>          Issue Type: Task
>          Components: flink
>            Reporter: sivabalan narayanan
>            Assignee: Danny Chen
>            Priority: Major
>             Fix For: 0.12.1
>
>
> Fetching all files across all partitions should be avoided in hot path. 
> especially on the query side. we should only fetch files for interested 
> partitions. 
> I inspected HoodieFileIndex for spark and things looks to be ok. We only load 
> files for the partitions involved in the query. 
>  
> {code:java}
> public BaseHoodieTableFileIndex(HoodieEngineContext engineContext,
>                                 HoodieTableMetaClient metaClient,
>                                 TypedProperties configProperties,
>                                 HoodieTableQueryType queryType,
>                                 List<Path> queryPaths, 
> {code}
> Querypaths in above argument contains only the partitions involved in the 
> split. 
> later when we load the files, we load only for the matched partitions. 
> {code:java}
> private Map<PartitionPath, FileStatus[]> loadPartitionPathFiles() {
>   // List files in all partition paths
>   List<PartitionPath> pathToFetch = new ArrayList<>();
>   Map<PartitionPath, FileStatus[]> cachedPartitionToFiles = new HashMap<>();
>   // Fetch from the FileStatusCache
>   List<PartitionPath> partitionPaths = getAllQueryPartitionPaths();
>   partitionPaths.forEach(partitionPath -> {
>     Option<FileStatus[]> filesInPartition = 
> fileStatusCache.get(partitionPath.fullPartitionPath(basePath));
>     if (filesInPartition.isPresent()) {
>       cachedPartitionToFiles.put(partitionPath, filesInPartition.get());
>     } else {
>       pathToFetch.add(partitionPath);
>     }
>   });
>   Map<PartitionPath, FileStatus[]> fetchedPartitionToFiles;
>   if (pathToFetch.isEmpty()) {
>     fetchedPartitionToFiles = Collections.emptyMap();
>   } else {
>     Map<String, PartitionPath> fullPartitionPathsMapToFetch = 
> pathToFetch.stream()
>         .collect(Collectors.toMap(
>             partitionPath -> 
> partitionPath.fullPartitionPath(basePath).toString(),
>             Function.identity())
>         );
>     fetchedPartitionToFiles =
>         FSUtils.getFilesInPartitions(
>                 engineContext,
>                 metadataConfig,
>                 basePath,
>                 fullPartitionPathsMapToFetch.keySet().toArray(new String[0]),
>                 fileSystemStorageConfig.getSpillableDir())
>             .entrySet()
>             .stream()
>             .collect(Collectors.toMap(e -> 
> fullPartitionPathsMapToFetch.get(e.getKey()), e -> e.getValue()));
>   }
>   // Update the fileStatusCache
>   fetchedPartitionToFiles.forEach((partitionPath, filesInPartition) -> {
>     fileStatusCache.put(partitionPath.fullPartitionPath(basePath), 
> filesInPartition);
>   });
>   return CollectionUtils.combine(cachedPartitionToFiles, 
> fetchedPartitionToFiles);
> } {code}
>  
> I also inspected flink and may we we are loading all files across all 
> partitions. 
>  
> IncrementalInputSplits 
> [L180|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L180]
> fileStatuses = fileIndex.getFilesInPartitions();
>  
> HoodieTableSource 
> [L298|https://github.com/apache/hudi/blob/d16740976e3aa89f2d934b0f1c48208dfe40bc5f/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java#L298]
> FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
>  
> I do see we pass in required partition paths in both places. But will leave 
> it to flink experts to inspect the code once and close out the ticket if no 
> action required. 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to