vinothchandar commented on a change in pull request #1924:
URL: https://github.com/apache/hudi/pull/1924#discussion_r466217168



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
##########
@@ -41,37 +48,87 @@
    * Returns leaf folders with files under a path.
    * @param fs  File System
    * @param basePathStr Base Path to look for leaf folders
-   * @param filePathFilter  Filters to skip directories/paths
+   * @param jsc Java spark context
    * @return list of partition paths with files under them.
    * @throws IOException
    */
   public static List<Pair<String, List<HoodieFileStatus>>> 
getAllLeafFoldersWithFiles(FileSystem fs, String basePathStr,
-                                                                               
       PathFilter filePathFilter) throws IOException {
+      JavaSparkContext jsc) throws IOException {
     final Path basePath = new Path(basePathStr);
     final Map<Integer, List<String>> levelToPartitions = new HashMap<>();
     final Map<String, List<HoodieFileStatus>> partitionToFiles = new 
HashMap<>();
-    FSUtils.processFiles(fs, basePathStr, (status) -> {
-      if (status.isFile() && filePathFilter.accept(status.getPath())) {
-        String relativePath = FSUtils.getRelativePartitionPath(basePath, 
status.getPath().getParent());
-        List<HoodieFileStatus> statusList = partitionToFiles.get(relativePath);
-        if (null == statusList) {
-          Integer level = (int) relativePath.chars().filter(ch -> ch == 
'/').count();
-          List<String> dirs = levelToPartitions.get(level);
-          if (null == dirs) {
-            dirs = new ArrayList<>();
-            levelToPartitions.put(level, dirs);
+    PathFilter filePathFilter = getFilePathFilter();
+    PathFilter metaPathFilter = getExcludeMetaPathFilter();
+
+    FileStatus[] topLevelStatuses = fs.listStatus(new Path(basePathStr));
+    List<String> subDirectories = new ArrayList<>();
+
+    List<Pair<HoodieFileStatus, Pair<Integer, String>>> result = new 
ArrayList<>();
+
+    for (FileStatus topLevelStatus: topLevelStatuses) {
+      if (topLevelStatus.isFile() && 
filePathFilter.accept(topLevelStatus.getPath())) {
+        String relativePath = FSUtils.getRelativePartitionPath(basePath, 
topLevelStatus.getPath().getParent());
+        Integer level = (int) relativePath.chars().filter(ch -> ch == 
'/').count();
+        HoodieFileStatus hoodieFileStatus = 
FileStatusUtils.fromFileStatus(topLevelStatus);
+        result.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath)));
+      } else if (metaPathFilter.accept(topLevelStatus.getPath())) {
+        subDirectories.add(topLevelStatus.getPath().toString());
+      }
+    }
+
+    if (subDirectories.size() > 0) {
+      result.addAll(jsc.parallelize(subDirectories, 
subDirectories.size()).flatMap(directory -> {
+        PathFilter pathFilter = getFilePathFilter();
+        Path path = new Path(directory);
+        FileSystem fileSystem = path.getFileSystem(new Configuration());
+        RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, 
true);
+        List<Pair<HoodieFileStatus, Pair<Integer, String>>> res = new 
ArrayList<>();
+        while (itr.hasNext()) {
+          FileStatus status = itr.next();
+          if (pathFilter.accept(status.getPath())) {
+            String relativePath = FSUtils.getRelativePartitionPath(new 
Path(basePathStr), status.getPath().getParent());
+            Integer level = (int) relativePath.chars().filter(ch -> ch == 
'/').count();
+            HoodieFileStatus hoodieFileStatus = 
FileStatusUtils.fromFileStatus(status);
+            res.add(Pair.of(hoodieFileStatus, Pair.of(level, relativePath)));
           }
-          dirs.add(relativePath);
-          statusList = new ArrayList<>();
-          partitionToFiles.put(relativePath, statusList);
         }
-        statusList.add(FileStatusUtils.fromFileStatus(status));
+        return res.iterator();
+      }).collect());
+    }
+
+    result.forEach(val -> {
+      String relativePath = val.getRight().getRight();
+      List<HoodieFileStatus> statusList = partitionToFiles.get(relativePath);
+      if (null == statusList) {
+        Integer level = val.getRight().getLeft();
+        List<String> dirs = levelToPartitions.get(level);
+        if (null == dirs) {
+          dirs = new ArrayList<>();
+          levelToPartitions.put(level, dirs);
+        }
+        dirs.add(relativePath);
+        statusList = new ArrayList<>();
+        partitionToFiles.put(relativePath, statusList);
       }
-      return true;
-    }, true);
+      statusList.add(val.getLeft());
+    });
+
     OptionalInt maxLevelOpt = levelToPartitions.keySet().stream().mapToInt(x 
-> x).max();
     int maxLevel = maxLevelOpt.orElse(-1);
     return maxLevel >= 0 ? levelToPartitions.get(maxLevel).stream()
-        .map(d -> Pair.of(d, 
partitionToFiles.get(d))).collect(Collectors.toList()) : new ArrayList<>();
+            .map(d -> Pair.of(d, 
partitionToFiles.get(d))).collect(Collectors.toList()) : new ArrayList<>();
+  }
+
+  private static PathFilter getFilePathFilter() {
+    return (path) -> {
+      // TODO: Needs to be abstracted out when supporting different formats
+      // TODO: Remove hoodieFilter
+      return 
path.getName().endsWith(HoodieFileFormat.PARQUET.getFileExtension());

Review comment:
       can we just use the table's base file format here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to