[ https://issues.apache.org/jira/browse/HUDI-4242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17581601#comment-17581601 ]
sivabalan narayanan commented on HUDI-4242: ------------------------------------------- mostly this is taken care alreayd. but need to verify once and close > Follow up on getAllPartitionPaths perf enhancement > -------------------------------------------------- > > Key: HUDI-4242 > URL: https://issues.apache.org/jira/browse/HUDI-4242 > Project: Apache Hudi > Issue Type: Improvement > Components: reader-core > Reporter: sivabalan narayanan > Priority: Critical > Fix For: 0.12.1 > > > GetAllPartitionPaths had some perf degradation from 0.9.0 to 0.10.0 and hence > we had reverted the change for now. But the change as such was good. So, we > want to follow up to see if we can fix/enhance the new code. Old code does > not leverage the spark engine to parallelize across diff folders. So, there > could be scope for improvement. but from the perf nos, its not straight > forward. So creating a follow up ticket. > > excerpt from the findings. > For one of my test tables in S3, with EMR cluster (10k partitions) > # With 0.11.0: > 147 secs. > # With this patch as is (where engine context is not used for 2nd phase) > 5.7 secs. > # Latest master + adding engineContext for 2nd phase: > 16 secs. > # I also tried completely rewriting the dag. > 12 secs. > while (!pathsToList.isEmpty()) \{ > // TODO: Get the parallelism from HoodieWriteConfig > int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, > pathsToList.size()); > // List all directories in parallel > List<FileStatus> dirToFileListing = > engineContext.flatMap(pathsToList, path -> { > FileSystem fileSystem = path.getFileSystem(hadoopConf.get()); > return Arrays.stream(fileSystem.listStatus(path)); > }, listingParallelism); > pathsToList.clear(); > // if current dictionary contains PartitionMetadata, add it to result > // if current dictionary does not contain PartitionMetadata, add it > to queue > int fileListingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, > dirToFileListing.size()); > List<Pair<Option<String>, Option<Path>>> result = > engineContext.map(dirToFileListing, fileStatus -> \{ > FileSystem fileSystem = > fileStatus.getPath().getFileSystem(hadoopConf.get()); > if (fileStatus.isDirectory()) { > if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, > fileStatus.getPath())) { > return Pair.of(Option.of(FSUtils.getRelativePartitionPath(new > Path(datasetBasePath), fileStatus.getPath())), Option.empty()); > } else if > (!fileStatus.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)) > \{ > return Pair.of(Option.empty(), Option.of(fileStatus.getPath())); > } > } else if > (fileStatus.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX)) > \{ > String partitionName = FSUtils.getRelativePartitionPath(new > Path(datasetBasePath), fileStatus.getPath().getParent()); > return Pair.of(Option.of(partitionName), Option.empty()); > } > return Pair.of(Option.empty(), Option.empty()); > }, fileListingParallelism); > partitionPaths.addAll(result.stream().filter(entry -> > entry.getKey().isPresent()).map(entry -> entry.getKey().get()) > .collect(Collectors.toList())); > pathsToList.addAll(result.stream().filter(entry -> > entry.getValue().isPresent()).map(entry -> entry.getValue().get()) > .collect(Collectors.toList())); > } > So, based on above findings, I will go w/ what we have in this patch in its > current state. Will address Raymond's and Alexey's feedback alone and unblock > 0.11.1. > > Ref patch: https://github.com/apache/hudi/pull/5829 -- This message was sent by Atlassian Jira (v8.20.10#820010)