alexeykudinkin commented on a change in pull request #4178: URL: https://github.com/apache/hudi/pull/4178#discussion_r765092480
########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java ########## @@ -95,8 +95,7 @@ public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext .map(inputGroup -> runClusteringForGroupAsync(inputGroup, clusteringPlan.getStrategy().getStrategyParams(), Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(false), - instantTime)) - .map(CompletableFuture::join); + instantTime)).collect(Collectors.toList()).stream().map(CompletableFuture::join); Review comment: How does this guarantee jobs will run in parallel? We simply dereference stream into list, but then still join the Futures sequentially. Instead we should use following util ``` public static <T> CompletableFuture<List<T>> allOf(@Nonnull List<CompletableFuture<T>> futures) { return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(aVoid -> futures.stream() // NOTE: This join wouldn't block, since all the // futures are completed at this point .map(CompletableFuture::join) .collect(Collectors.toList()) ); } ``` And then invoke it like following ``` allOf( clusteringPlan.getInputGroups() .stream() .map(...) // returns `CompletableFuture` .collect(Collectors.toList()) ) .join(); ``` This would guarantee parallel execution for each individual clustering group -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org