yihua commented on code in PR #11936:
URL: https://github.com/apache/hudi/pull/11936#discussion_r1757363601


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java:
##########
@@ -151,32 +155,47 @@ public Pair<Option<String>, String> 
getNextFilePathsAndMaxModificationTime(Optio
       }
 
       // read the files out.
-      String pathStr = filteredFiles.stream().map(f -> 
f.getPath().toString()).collect(Collectors.joining(","));
+      String pathStr = filteredFiles.stream()
+          .map(f -> f.getPath().toString()).collect(Collectors.joining(","));
 
       return new ImmutablePair<>(Option.ofNullable(pathStr), 
String.valueOf(newCheckpointTime));
     } catch (IOException ioe) {
       throw new HoodieIOException("Unable to read from source from checkpoint: 
" + lastCheckpointStr, ioe);
     }
   }
 
+  protected List<StoragePathInfo> listEligibleFiles(HoodieStorage storage,
+                                               StoragePath path,
+                                               long lastCheckpointTime) throws 
IOException {
+    return listEligibleFiles(storage, path, lastCheckpointTime, new 
HashSet<>());
+  }
+
   /**
    * List files recursively, filter out illegible files/directories while 
doing so.
    */
-  protected List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long 
lastCheckpointTime) throws IOException {
+  protected List<StoragePathInfo> listEligibleFiles(HoodieStorage storage,
+                                               StoragePath path,
+                                               long lastCheckpointTime,
+                                               Set<StoragePathInfo> visited) 
throws IOException {
     // skip files/dirs whose names start with (_, ., etc)
-    FileStatus[] statuses = fs.listStatus(path, file ->
+    List<StoragePathInfo> pathInfos = storage.listDirectEntries(path, file ->
         IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> 
file.getName().startsWith(pfx)));
-    List<FileStatus> res = new ArrayList<>();
-    for (FileStatus status : statuses) {
-      if (status.isDirectory()) {
-        // avoid infinite loop
-        if (!status.isSymlink()) {

Review Comment:
   Since the source relies on Spark, we can keep these classes to use Hadoop 
`FileSystem` API.  Specifically the logic on symlink needs to be kept, or we 
handle it inside HoodieHadoopStorage`.  Is there any other place that needs to 
be unified in the original PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to