chenyuzhi459 commented on a change in pull request #10027:
URL: https://github.com/apache/druid/pull/10027#discussion_r446193150



##########
File path: 
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
##########
@@ -141,33 +144,34 @@ public ChainedExecutionQueryRunner(
                           );
                         }
                     )
-                )
-            );
+                );
 
-            queryWatcher.registerQueryFuture(query, futures);
+            ListenableFuture<List<Iterable<T>>> future = 
Futures.allAsList(futures);
+            queryWatcher.registerQueryFuture(query, future);
 
             try {
               return new MergeIterable<>(
                   ordering.nullsFirst(),
                   QueryContexts.hasTimeout(query) ?
-                      futures.get(QueryContexts.getTimeout(query), 
TimeUnit.MILLISECONDS) :
-                      futures.get()
+                      future.get(QueryContexts.getTimeout(query), 
TimeUnit.MILLISECONDS) :
+                      future.get()
               ).iterator();
             }
             catch (InterruptedException e) {
               log.noStackTrace().warn(e, "Query interrupted, cancelling 
pending results, query id [%s]", query.getId());
-              futures.cancel(true);
+              GuavaUtils.cancelAll(true, 
ImmutableList.<Future>builder().add(future).addAll(futures).build());

Review comment:
       > It seems easy to forget canceling `future` and so error-prone. How 
about modifying `GuavaUtils.cancelAll()` to take `future` as well? So it would 
be like
   > 
   > ```java
   >   public static <F extends Future<?>> void cancelAll(
   >       boolean mayInterruptIfRunning,
   >       @Nullable ListenableFuture<?> combinedFuture,
   >       List<F> futures
   >   )
   >   {
   >     final List<Future> allFuturesToCancel = new ArrayList<>(futures);
   >     allFuturesToCancel.add(combinedFuture);
   >     if (allFuturesToCancel.isEmpty()) {
   >       return;
   >     }
   >     allFuturesToCancel.forEach(f -> {
   >       try {
   >         f.cancel(mayInterruptIfRunning);
   >       }
   >       catch (Throwable t) {
   >         log.warn(t, "Error while cancelling future.");
   >       }
   >     });
   >   }
   > ```
   
   Well, thanks for your tips, i'll follow it except one point. It's better to 
cancel the `combinedFuture` first, because if we cancel the first future in  
`underlyingFutures`  , it will trigger the listener of 
`com.google.common.util.concurrent.Futures.CombinedFuture` which is added for 
every future in `underlyingFutures`  by `init` method. This listener is 
actually a method called `setOneValue`  which will set combinedFuture's status 
as `CANCELLED` rather than `INTERRUTED` as we expect. In addition , the 
listener of `combinedFuture` will set the status of other future in 
`underlyingFutures`   as the same with itself(`CANCELLED` rather than 
`INTERRUTED` as we expect). I have test it in the test of `testQueryTimeout` in 
`ChainedExecutionQueryRunnerTest`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to