FrankChen021 commented on code in PR #19418:
URL: https://github.com/apache/druid/pull/19418#discussion_r3201245374


##########
extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java:
##########
@@ -153,39 +150,31 @@ public static Collection<Path> getPaths(List<String> 
inputPaths, Configuration c
       return Collections.emptySet();
     }
 
-    // Use FileInputFormat to read splits. To do this, we need to make a fake 
Job.
-    Job job = Job.getInstance(configuration);
-
-    // Add paths to the fake JobContext.
-    for (String inputPath : inputPaths) {
-      FileInputFormat.addInputPaths(job, inputPath);
+    final Set<Path> paths = new LinkedHashSet<>();
+    for (final String inputPath : inputPaths) {
+      final Path p = new Path(inputPath);

Review Comment:
   Thanks, this looks addressed in the current code: `getPaths()` now uses 
Hadoop `StringUtils.split()` before constructing each `Path`, and the added 
comma-separated test covers the regression case.



##########
extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java:
##########
@@ -153,39 +150,31 @@ public static Collection<Path> getPaths(List<String> 
inputPaths, Configuration c
       return Collections.emptySet();
     }
 
-    // Use FileInputFormat to read splits. To do this, we need to make a fake 
Job.
-    Job job = Job.getInstance(configuration);
-
-    // Add paths to the fake JobContext.
-    for (String inputPath : inputPaths) {
-      FileInputFormat.addInputPaths(job, inputPath);
+    final Set<Path> paths = new LinkedHashSet<>();
+    for (final String inputPath : inputPaths) {
+      final Path p = new Path(inputPath);
+      final FileSystem fs = p.getFileSystem(configuration);
+      final FileStatus[] statuses = fs.globStatus(p);
+      if (statuses != null) {
+        for (final FileStatus status : statuses) {
+          addFilesRecursively(fs, status, paths);
+        }
+      }
     }
-
-    return new HdfsFileInputFormat().getSplits(job)
-                                    .stream()
-                                    .filter(split -> ((FileSplit) 
split).getLength() > 0)
-                                    .map(split -> ((FileSplit) 
split).getPath())
-                                    .collect(Collectors.toSet());
+    return paths;
   }
 
-  /**
-   * Helper for leveraging hadoop code to interpret HDFS paths with globs
-   */
-  private static class HdfsFileInputFormat extends FileInputFormat<Object, 
Object>
+  private static void addFilesRecursively(FileSystem fs, FileStatus status, 
Set<Path> paths) throws IOException
   {
-    @Override
-    public RecordReader<Object, Object> createRecordReader(
-        org.apache.hadoop.mapreduce.InputSplit inputSplit,
-        TaskAttemptContext taskAttemptContext
-    )
-    {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    protected boolean isSplitable(JobContext context, Path filename)
-    {
-      return false;  // prevent generating extra paths
+    if (status.isDirectory()) {
+      final FileStatus[] children = fs.listStatus(status.getPath());

Review Comment:
   Thanks, this addresses my concern: hidden names are filtered, directory 
inputs are listed non-recursively by default, and the docs now point users to 
explicit glob patterns for nested ingestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to