jihoonson commented on a change in pull request #10027: URL: https://github.com/apache/druid/pull/10027#discussion_r445720081
########## File path: processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java ########## @@ -141,33 +144,34 @@ public ChainedExecutionQueryRunner( ); } ) - ) - ); + ); - queryWatcher.registerQueryFuture(query, futures); + ListenableFuture<List<Iterable<T>>> future = Futures.allAsList(futures); + queryWatcher.registerQueryFuture(query, future); try { return new MergeIterable<>( ordering.nullsFirst(), QueryContexts.hasTimeout(query) ? - futures.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) : - futures.get() + future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) : + future.get() ).iterator(); } catch (InterruptedException e) { log.noStackTrace().warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); - futures.cancel(true); + GuavaUtils.cancelAll(true, ImmutableList.<Future>builder().add(future).addAll(futures).build()); Review comment: It seems easy to forget canceling `future` and so error-prone. How about modifying `GuavaUtils.cancelAll()` to take `future` as well? So it would be like ```java public static <F extends Future<?>> void cancelAll( boolean mayInterruptIfRunning, @Nullable ListenableFuture<?> combinedFuture, List<F> futures ) { final List<Future> allFuturesToCancel = new ArrayList<>(futures); allFuturesToCancel.add(combinedFuture); if (allFuturesToCancel.isEmpty()) { return; } allFuturesToCancel.forEach(f -> { try { f.cancel(mayInterruptIfRunning); } catch (Throwable t) { log.warn(t, "Error while cancelling future."); } }); } ``` ########## File path: processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java ########## @@ -141,33 +144,34 @@ public ChainedExecutionQueryRunner( ); } ) - ) - ); + ); - queryWatcher.registerQueryFuture(query, futures); + ListenableFuture<List<Iterable<T>>> future = Futures.allAsList(futures); + queryWatcher.registerQueryFuture(query, future); try { return new MergeIterable<>( ordering.nullsFirst(), QueryContexts.hasTimeout(query) ? - futures.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) : - futures.get() + future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) : + future.get() ).iterator(); } catch (InterruptedException e) { log.noStackTrace().warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); - futures.cancel(true); + GuavaUtils.cancelAll(true, ImmutableList.<Future>builder().add(future).addAll(futures).build()); Review comment: Or, more structured way to do could be adding a new `CombinedFuture` like this ```java public static class CombinedFuture<V> implements Future<List<V>> { private final List<ListenableFuture<V>> underlyingFutures; private final ListenableFuture<List<V>> combined; public CombinedFuture(List<ListenableFuture<V>> futures) { this.underlyingFutures = futures; this.combined = Futures.allAsList(futures); } @Override public boolean cancel(boolean mayInterruptIfRunning) { if (combined.isDone() || combined.isCancelled()) { return false; } else { cancelAll(mayInterruptIfRunning, combined, underlyingFutures); return true; } } @Override public boolean isCancelled() { return combined.isCancelled(); } @Override public boolean isDone() { return combined.isDone(); } @Override public List<V> get() throws InterruptedException, ExecutionException { return combined.get(); } @Override public List<V> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return combined.get(timeout, unit); } } ``` I'm fine with either way. ########## File path: processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java ########## @@ -281,4 +281,41 @@ public void testInsufficientResourcesOnBroker() } } } + + @Test(timeout = 60_000L) + public void testTimeoutExceptionOnQueryable() + { + expectedException.expect(QueryInterruptedException.class); + expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class)); + + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows")) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .overrideContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1)) + .build(); + + GroupByQueryRunnerFactory factory = makeQueryRunnerFactory( + GroupByQueryRunnerTest.DEFAULT_MAPPER, + new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return "v2"; + } + + @Override + public boolean isSingleThreaded() + { + return true; + } + } + ); + QueryRunner<ResultRow> _runnnner = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner)); Review comment: nit: the variable name starting with an underscore is not Java convention. How about `mergedRunner`? ########## File path: processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java ########## @@ -187,7 +189,7 @@ private void waitForFutureCompletion( } catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); - future.cancel(true); + GuavaUtils.cancelAll(true, futures); Review comment: nit: `future` should be canceled on exceptions too. This is nit since this class is used only by groupBy v1 which is deprecated. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org