Copilot commented on code in PR #6526:
URL: https://github.com/apache/hive/pull/6526#discussion_r3369270640
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +76,90 @@ protected MapredParquetInputFormat(final
ParquetInputFormat<ArrayWritable> input
vectorizedSelf = new VectorizedParquetInputFormat();
}
+ /**
+ * On blob storage with multiple recursive input directories, list them in
parallel instead of the
+ * default serial per-directory listing that dominates split generation.
Listed files flow through
+ * the inherited {@link FileInputFormat#getSplits} unchanged; all other
cases defer to the default.
+ */
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ Path[] dirs = getInputPaths(job);
+ // Only the recursive case (the Tez default) takes the parallel path;
non-recursive listing has
+ // subtler sub-directory semantics, so defer to the default.
+ if (dirs.length <= 1
+ || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
+ || !BlobStorageUtils.isBlobStorageFileSystem(job,
dirs[0].getFileSystem(job))) {
+ return super.listStatus(job);
+ }
+
+ long start = System.currentTimeMillis();
+ // List as the caller's end-user, not the pool threads' login user;
FileSystem.get is UGI-keyed.
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ int numThreads = Math.max(2, HiveConf.getIntVar(job,
HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS));
+ ExecutorService pool = getThreadPool(numThreads);
+ CompletionService<List<FileStatus>> completionService = new
ExecutorCompletionService<>(pool);
+
Review Comment:
The parallel listing path forces a minimum of 2 threads (`Math.max(2,
...)`), so setting `hive.compute.splits.num.threads` to 0/1 cannot disable
parallel directory listing. This differs from other split/listing codepaths
that only create a pool when the configured value is > 1 (e.g.
`HiveInputFormat.processPathsForMmRead`). Consider honoring the configured
value and falling back to `super.listStatus(job)` when <= 1, and also cap the
thread count to the number of input dirs to avoid creating an oversized pool.
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +76,90 @@ protected MapredParquetInputFormat(final
ParquetInputFormat<ArrayWritable> input
vectorizedSelf = new VectorizedParquetInputFormat();
}
+ /**
+ * On blob storage with multiple recursive input directories, list them in
parallel instead of the
+ * default serial per-directory listing that dominates split generation.
Listed files flow through
+ * the inherited {@link FileInputFormat#getSplits} unchanged; all other
cases defer to the default.
+ */
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ Path[] dirs = getInputPaths(job);
+ // Only the recursive case (the Tez default) takes the parallel path;
non-recursive listing has
+ // subtler sub-directory semantics, so defer to the default.
+ if (dirs.length <= 1
+ || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
+ || !BlobStorageUtils.isBlobStorageFileSystem(job,
dirs[0].getFileSystem(job))) {
+ return super.listStatus(job);
+ }
Review Comment:
`listStatus(JobConf)` now contains fairly complex branching and concurrent
execution (blob FS + recursive + multi-dir), but the existing unit test for
this class only covers constructors. Since this is user-visible planning
behavior, it would be good to add unit tests that validate (a) the non-parallel
fallbacks (non-recursive / single dir / non-blob) delegate to
`super.listStatus`, and (b) the parallel path aggregates results from multiple
dirs. Static mocking is already used in this repo (e.g. `mockStatic(...)`), so
you can mock `BlobStorageUtils.isBlobStorageFileSystem` and
`FileUtils.listStatusRecursively` to keep the test hermetic.
##########
ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java:
##########
@@ -72,6 +76,90 @@ protected MapredParquetInputFormat(final
ParquetInputFormat<ArrayWritable> input
vectorizedSelf = new VectorizedParquetInputFormat();
}
+ /**
+ * On blob storage with multiple recursive input directories, list them in
parallel instead of the
+ * default serial per-directory listing that dominates split generation.
Listed files flow through
+ * the inherited {@link FileInputFormat#getSplits} unchanged; all other
cases defer to the default.
+ */
+ @Override
+ protected FileStatus[] listStatus(JobConf job) throws IOException {
+ Path[] dirs = getInputPaths(job);
+ // Only the recursive case (the Tez default) takes the parallel path;
non-recursive listing has
+ // subtler sub-directory semantics, so defer to the default.
+ if (dirs.length <= 1
+ || !job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false)
+ || !BlobStorageUtils.isBlobStorageFileSystem(job,
dirs[0].getFileSystem(job))) {
+ return super.listStatus(job);
+ }
+
+ long start = System.currentTimeMillis();
+ // List as the caller's end-user, not the pool threads' login user;
FileSystem.get is UGI-keyed.
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+ int numThreads = Math.max(2, HiveConf.getIntVar(job,
HiveConf.ConfVars.HIVE_COMPUTE_SPLITS_NUM_THREADS));
+ ExecutorService pool = getThreadPool(numThreads);
+ CompletionService<List<FileStatus>> completionService = new
ExecutorCompletionService<>(pool);
+
+ List<Future<List<FileStatus>>> pathFutures = new ArrayList<>(dirs.length);
+ List<FileStatus> files = new ArrayList<>();
+ try {
+ for (Path dir : dirs) {
+ pathFutures.add(completionService.submit(() -> listDirectory(job, dir,
ugi)));
+ }
+ for (int resultsLeft = dirs.length; resultsLeft > 0; resultsLeft--) {
+ files.addAll(completionService.take().get());
+ }
+ } catch (InterruptedException e) {
+ cancelFutures(pathFutures);
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while listing input directories", e);
+
+ } catch (ExecutionException e) {
+ cancelFutures(pathFutures);
+ Throwable cause = e.getCause();
+
+ if (cause instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while listing input directories",
cause);
+ }
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ }
+ throw new IOException("Failed to list input directories", cause);
+ }
+
+ LOG.info("Parquet parallel listStatus: {} files from {} dirs in {} ms ({}
threads)",
+ files.size(), dirs.length, System.currentTimeMillis() - start,
numThreads);
+ return files.toArray(new FileStatus[0]);
+ }
+
+ private static List<FileStatus> listDirectory(JobConf job, Path dir,
UserGroupInformation ugi)
+ throws IOException, InterruptedException {
+ return ugi.doAs((PrivilegedExceptionAction<List<FileStatus>>) () -> {
+ FileSystem dirFs = dir.getFileSystem(job);
+ List<FileStatus> dirFiles = new ArrayList<>();
+ FileUtils.listStatusRecursively(dirFs, new FileStatus(0, true, 0, 0, 0,
dir), dirFiles);
+ return dirFiles;
+ });
+ }
+
+ private static synchronized ExecutorService getThreadPool(int numThreads) {
+ if (threadPool == null) {
+ threadPool = Executors.newFixedThreadPool(numThreads,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("PARQUET_GET_SPLITS #%d")
+ .build());
+ }
+ return threadPool;
+ }
Review Comment:
`getThreadPool(int numThreads)` only initializes the static pool once and
never adjusts its size. If the first call happens to request a small pool
(e.g., few input dirs), subsequent calls with higher
`hive.compute.splits.num.threads` will silently keep using the smaller pool,
limiting the intended parallelism. Consider resizing the existing
`ThreadPoolExecutor` upward when a larger `numThreads` is requested (or
documenting that the first call wins).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]