[ 
https://issues.apache.org/jira/browse/HUDI-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Sumit updated HUDI-7420:
------------------------------
    Sprint: Sprint 2024-03-25, Sprint 2024-04-26, 2024/06/03-16  (was: Sprint 
2024-03-25, Sprint 2024-04-26, 2024/06/17-30, 2024/06/03-16)

> Parallelize the process of constructing `logFilesMarkerPath` in 
> CommitMetadatautils#reconcileMetadataForMissingFiles
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-7420
>                 URL: https://issues.apache.org/jira/browse/HUDI-7420
>             Project: Apache Hudi
>          Issue Type: Task
>            Reporter: Sagar Sumit
>            Assignee: Sagar Sumit
>            Priority: Major
>             Fix For: 0.16.0, 1.0.0
>
>
> This is related to HUDI-1517.
> Current logic is:
> {code:java}
> Set<String> logFilesMarkerPath = new HashSet<>();
> allLogFilesMarkerPath.stream().filter(logFilePath -> 
> !logFilePath.endsWith("cdc")).forEach(logFilesMarkerPath::add);
> // remove valid log files
> // TODO: refactor based on HoodieData
> for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats : 
> commitMetadata.getPartitionToWriteStats().entrySet()) {
>   for (HoodieWriteStat hoodieWriteStat : partitionAndWriteStats.getValue()) {
>     logFilesMarkerPath.remove(hoodieWriteStat.getPath());
>   }
> } {code}
> The for loop can be achieved via context.parallelize as below, but need to 
> check for thread-safety.
> {code:java}
> Set<String> logFilesMarkerPath = new HashSet<>();
> allLogFilesMarkerPath.stream().filter(logFilePath -> 
> !logFilePath.endsWith("cdc")).forEach(logFilesMarkerPath::add);
> // Convert the partition and write stats to a list of log file paths to be 
> removed
> List<String> validLogFilePaths = context.parallelize(new 
> ArrayList<>(commitMetadata.getPartitionToWriteStats().entrySet()))
>     .flatMapToPair((SerializablePairFunction<Map.Entry<String, 
> List<HoodieWriteStat>>, String, Void>) entry -> {
>         List<Pair<String, Void>> pathsToRemove = new ArrayList<>();
>         entry.getValue().forEach(hoodieWriteStat -> 
> pathsToRemove.add(Pair.of(hoodieWriteStat.getPath(), null)));
>         return pathsToRemove.iterator();
>     })
>     .map(t -> t.getLeft())
>     .collect();
> // Remove the valid log file paths from logFilesMarkerPath in a parallel 
> manner
> // Depending on the specifics of your environment and HoodieEngineContext, 
> this might need to be adapted.
> // For a straightforward approach without parallelization of the remove 
> operation:
> validLogFilePaths.forEach(logFilesMarkerPath::remove); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to