vinothchandar commented on a change in pull request #1924: URL: https://github.com/apache/hudi/pull/1924#discussion_r466217168
########## File path: hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java ########## @@ -41,37 +48,87 @@ * Returns leaf folders with files under a path. * @param fs File System * @param basePathStr Base Path to look for leaf folders - * @param filePathFilter Filters to skip directories/paths + * @param jsc Java spark context * @return list of partition paths with files under them. * @throws IOException */ public static List<Pair<String, List<HoodieFileStatus>>> getAllLeafFoldersWithFiles(FileSystem fs, String basePathStr, - PathFilter filePathFilter) throws IOException { + JavaSparkContext jsc) throws IOException { final Path basePath = new Path(basePathStr); final Map<Integer, List<String>> levelToPartitions = new HashMap<>(); final Map<String, List<HoodieFileStatus>> partitionToFiles = new HashMap<>(); - FSUtils.processFiles(fs, basePathStr, (status) -> { - if (status.isFile() && filePathFilter.accept(status.getPath())) { - String relativePath = FSUtils.getRelativePartitionPath(basePath, status.getPath().getParent()); - List<HoodieFileStatus> statusList = partitionToFiles.get(relativePath); - if (null == statusList) { - Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count(); - List<String> dirs = levelToPartitions.get(level); - if (null == dirs) { - dirs = new ArrayList<>(); - levelToPartitions.put(level, dirs); + PathFilter filePathFilter = getFilePathFilter(); + PathFilter metaPathFilter = getExcludeMetaPathFilter(); + + FileStatus[] topLevelStatuses = fs.listStatus(new Path(basePathStr)); + List<String> subDirectories = new ArrayList<>(); + + List<Pair<HoodieFileStatus, Pair<Integer, String>>> result = new ArrayList<>(); + + for (FileStatus topLevelStatus: topLevelStatuses) { + if (topLevelStatus.isFile() && filePathFilter.accept(topLevelStatus.getPath())) { + String relativePath = FSUtils.getRelativePartitionPath(basePath, topLevelStatus.getPath().getParent()); + Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count(); + HoodieFileStatus hoodieFileStatus = FileStatusUtils.fromFileStatus(topLevelStatus); + result.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath))); + } else if (metaPathFilter.accept(topLevelStatus.getPath())) { + subDirectories.add(topLevelStatus.getPath().toString()); + } + } + + if (subDirectories.size() > 0) { + result.addAll(jsc.parallelize(subDirectories, subDirectories.size()).flatMap(directory -> { + PathFilter pathFilter = getFilePathFilter(); + Path path = new Path(directory); + FileSystem fileSystem = path.getFileSystem(new Configuration()); + RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, true); + List<Pair<HoodieFileStatus, Pair<Integer, String>>> res = new ArrayList<>(); + while (itr.hasNext()) { + FileStatus status = itr.next(); + if (pathFilter.accept(status.getPath())) { + String relativePath = FSUtils.getRelativePartitionPath(new Path(basePathStr), status.getPath().getParent()); + Integer level = (int) relativePath.chars().filter(ch -> ch == '/').count(); + HoodieFileStatus hoodieFileStatus = FileStatusUtils.fromFileStatus(status); + res.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath))); } - dirs.add(relativePath); - statusList = new ArrayList<>(); - partitionToFiles.put(relativePath, statusList); } - statusList.add(FileStatusUtils.fromFileStatus(status)); + return res.iterator(); + }).collect()); + } + + result.forEach(val -> { + String relativePath = val.getRight().getRight(); + List<HoodieFileStatus> statusList = partitionToFiles.get(relativePath); + if (null == statusList) { + Integer level = val.getRight().getLeft(); + List<String> dirs = levelToPartitions.get(level); + if (null == dirs) { + dirs = new ArrayList<>(); + levelToPartitions.put(level, dirs); + } + dirs.add(relativePath); + statusList = new ArrayList<>(); + partitionToFiles.put(relativePath, statusList); } - return true; - }, true); + statusList.add(val.getLeft()); + }); + OptionalInt maxLevelOpt = levelToPartitions.keySet().stream().mapToInt(x -> x).max(); int maxLevel = maxLevelOpt.orElse(-1); return maxLevel >= 0 ? levelToPartitions.get(maxLevel).stream() - .map(d -> Pair.of(d, partitionToFiles.get(d))).collect(Collectors.toList()) : new ArrayList<>(); + .map(d -> Pair.of(d, partitionToFiles.get(d))).collect(Collectors.toList()) : new ArrayList<>(); + } + + private static PathFilter getFilePathFilter() { + return (path) -> { + // TODO: Needs to be abstracted out when supporting different formats + // TODO: Remove hoodieFilter + return path.getName().endsWith(HoodieFileFormat.PARQUET.getFileExtension()); Review comment: can we just use the table's base file format here? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org