jihoonson commented on a change in pull request #10027:
URL: https://github.com/apache/druid/pull/10027#discussion_r445720081



##########
File path: 
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
##########
@@ -141,33 +144,34 @@ public ChainedExecutionQueryRunner(
                           );
                         }
                     )
-                )
-            );
+                );
 
-            queryWatcher.registerQueryFuture(query, futures);
+            ListenableFuture<List<Iterable<T>>> future = 
Futures.allAsList(futures);
+            queryWatcher.registerQueryFuture(query, future);
 
             try {
               return new MergeIterable<>(
                   ordering.nullsFirst(),
                   QueryContexts.hasTimeout(query) ?
-                      futures.get(QueryContexts.getTimeout(query), 
TimeUnit.MILLISECONDS) :
-                      futures.get()
+                      future.get(QueryContexts.getTimeout(query), 
TimeUnit.MILLISECONDS) :
+                      future.get()
               ).iterator();
             }
             catch (InterruptedException e) {
               log.noStackTrace().warn(e, "Query interrupted, cancelling 
pending results, query id [%s]", query.getId());
-              futures.cancel(true);
+              GuavaUtils.cancelAll(true, 
ImmutableList.<Future>builder().add(future).addAll(futures).build());

Review comment:
       It seems easy to forget canceling `future` and so error-prone. How about 
modifying `GuavaUtils.cancelAll()` to take `future` as well? So it would be like
   
   ```java
     public static <F extends Future<?>> void cancelAll(
         boolean mayInterruptIfRunning,
         @Nullable ListenableFuture<?> combinedFuture,
         List<F> futures
     )
     {
       final List<Future> allFuturesToCancel = new ArrayList<>(futures);
       allFuturesToCancel.add(combinedFuture);
       if (allFuturesToCancel.isEmpty()) {
         return;
       }
       allFuturesToCancel.forEach(f -> {
         try {
           f.cancel(mayInterruptIfRunning);
         }
         catch (Throwable t) {
           log.warn(t, "Error while cancelling future.");
         }
       });
     }
   ```

##########
File path: 
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
##########
@@ -141,33 +144,34 @@ public ChainedExecutionQueryRunner(
                           );
                         }
                     )
-                )
-            );
+                );
 
-            queryWatcher.registerQueryFuture(query, futures);
+            ListenableFuture<List<Iterable<T>>> future = 
Futures.allAsList(futures);
+            queryWatcher.registerQueryFuture(query, future);
 
             try {
               return new MergeIterable<>(
                   ordering.nullsFirst(),
                   QueryContexts.hasTimeout(query) ?
-                      futures.get(QueryContexts.getTimeout(query), 
TimeUnit.MILLISECONDS) :
-                      futures.get()
+                      future.get(QueryContexts.getTimeout(query), 
TimeUnit.MILLISECONDS) :
+                      future.get()
               ).iterator();
             }
             catch (InterruptedException e) {
               log.noStackTrace().warn(e, "Query interrupted, cancelling 
pending results, query id [%s]", query.getId());
-              futures.cancel(true);
+              GuavaUtils.cancelAll(true, 
ImmutableList.<Future>builder().add(future).addAll(futures).build());

Review comment:
       Or, more structured way to do could be adding a new `CombinedFuture` 
like this
   
   ```java
     public static class CombinedFuture<V> implements Future<List<V>>
     {
       private final List<ListenableFuture<V>> underlyingFutures;
       private final ListenableFuture<List<V>> combined;
       
       public CombinedFuture(List<ListenableFuture<V>> futures)
       {
         this.underlyingFutures = futures;
         this.combined = Futures.allAsList(futures);
       }
   
       @Override
       public boolean cancel(boolean mayInterruptIfRunning)
       {
         if (combined.isDone() || combined.isCancelled()) {
           return false;
         } else {
           cancelAll(mayInterruptIfRunning, combined, underlyingFutures);
           return true;
         }
       }
   
       @Override
       public boolean isCancelled()
       {
         return combined.isCancelled();
       }
   
       @Override
       public boolean isDone()
       {
         return combined.isDone();
       }
   
       @Override
       public List<V> get() throws InterruptedException, ExecutionException
       {
         return combined.get();
       }
   
       @Override
       public List<V> get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException
       {
         return combined.get(timeout, unit);
       }
     }
   ```
   
   I'm fine with either way.

##########
File path: 
processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
##########
@@ -281,4 +281,41 @@ public void testInsufficientResourcesOnBroker()
       }
     }
   }
+
+  @Test(timeout = 60_000L)
+  public void testTimeoutExceptionOnQueryable()
+  {
+    expectedException.expect(QueryInterruptedException.class);
+    
expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
+
+    final GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
+        .overrideContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1))
+        .build();
+
+    GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+        GroupByQueryRunnerTest.DEFAULT_MAPPER,
+        new GroupByQueryConfig()
+        {
+          @Override
+          public String getDefaultStrategy()
+          {
+            return "v2";
+          }
+
+          @Override
+          public boolean isSingleThreaded()
+          {
+            return true;
+          }
+        }
+    );
+    QueryRunner<ResultRow> _runnnner = 
factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner));

Review comment:
       nit: the variable name starting with an underscore is not Java 
convention. How about `mergedRunner`?

##########
File path: 
processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java
##########
@@ -187,7 +189,7 @@ private void waitForFutureCompletion(
     }
     catch (InterruptedException e) {
       log.warn(e, "Query interrupted, cancelling pending results, query id 
[%s]", query.getId());
-      future.cancel(true);
+      GuavaUtils.cancelAll(true, futures);

Review comment:
       nit: `future` should be canceled on exceptions too. This is nit since 
this class is used only by groupBy v1 which is deprecated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to