[ https://issues.apache.org/jira/browse/HUDI-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sagar Sumit updated HUDI-7420: ------------------------------ Sprint: Sprint 2024-03-25, Sprint 2024-04-26, 2024/06/03-16 (was: Sprint 2024-03-25, Sprint 2024-04-26, 2024/06/17-30, 2024/06/03-16) > Parallelize the process of constructing `logFilesMarkerPath` in > CommitMetadatautils#reconcileMetadataForMissingFiles > -------------------------------------------------------------------------------------------------------------------- > > Key: HUDI-7420 > URL: https://issues.apache.org/jira/browse/HUDI-7420 > Project: Apache Hudi > Issue Type: Task > Reporter: Sagar Sumit > Assignee: Sagar Sumit > Priority: Major > Fix For: 0.16.0, 1.0.0 > > > This is related to HUDI-1517. > Current logic is: > {code:java} > Set<String> logFilesMarkerPath = new HashSet<>(); > allLogFilesMarkerPath.stream().filter(logFilePath -> > !logFilePath.endsWith("cdc")).forEach(logFilesMarkerPath::add); > // remove valid log files > // TODO: refactor based on HoodieData > for (Map.Entry<String, List<HoodieWriteStat>> partitionAndWriteStats : > commitMetadata.getPartitionToWriteStats().entrySet()) { > for (HoodieWriteStat hoodieWriteStat : partitionAndWriteStats.getValue()) { > logFilesMarkerPath.remove(hoodieWriteStat.getPath()); > } > } {code} > The for loop can be achieved via context.parallelize as below, but need to > check for thread-safety. > {code:java} > Set<String> logFilesMarkerPath = new HashSet<>(); > allLogFilesMarkerPath.stream().filter(logFilePath -> > !logFilePath.endsWith("cdc")).forEach(logFilesMarkerPath::add); > // Convert the partition and write stats to a list of log file paths to be > removed > List<String> validLogFilePaths = context.parallelize(new > ArrayList<>(commitMetadata.getPartitionToWriteStats().entrySet())) > .flatMapToPair((SerializablePairFunction<Map.Entry<String, > List<HoodieWriteStat>>, String, Void>) entry -> { > List<Pair<String, Void>> pathsToRemove = new ArrayList<>(); > entry.getValue().forEach(hoodieWriteStat -> > pathsToRemove.add(Pair.of(hoodieWriteStat.getPath(), null))); > return pathsToRemove.iterator(); > }) > .map(t -> t.getLeft()) > .collect(); > // Remove the valid log file paths from logFilesMarkerPath in a parallel > manner > // Depending on the specifics of your environment and HoodieEngineContext, > this might need to be adapted. > // For a straightforward approach without parallelization of the remove > operation: > validLogFilePaths.forEach(logFilesMarkerPath::remove); {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)