abhishekrb19 commented on code in PR #18148:
URL: https://github.com/apache/druid/pull/18148#discussion_r2403631582


##########
processing/src/main/java/org/apache/druid/query/QueryContext.java:
##########
@@ -545,6 +544,32 @@ public void verifyMaxQueryTimeout(long maxQueryTimeout)
     }
   }
 
+  public long getPerSegmentTimeout()
+  {
+    return getPerSegmentTimeout(QueryContexts.NO_TIMEOUT);
+  }
+
+  public long getPerSegmentTimeout(long defaultTimeout)
+  {
+    final long timeout = getLong(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 
defaultTimeout);
+    if (timeout >= 0) {
+      return timeout;
+    }
+
+    throw new BadQueryContextException(
+        StringUtils.format(
+            "Per-segment timeout [%s] must be a non negative value, but was 
%d",
+            QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+            timeout
+        )
+    );
+  }
+
+  public boolean usePerSegmentTimeout()
+  {
+    return getPerSegmentTimeout() != QueryContexts.NO_TIMEOUT;

Review Comment:
   can this be set at the cluster level similar to the existing `timeout` 
parameter that will broadly apply to all queries?



##########
processing/src/main/java/org/apache/druid/query/QueryContext.java:
##########
@@ -545,6 +544,32 @@ public void verifyMaxQueryTimeout(long maxQueryTimeout)
     }
   }
 
+  public long getPerSegmentTimeout()
+  {
+    return getPerSegmentTimeout(QueryContexts.NO_TIMEOUT);
+  }
+
+  public long getPerSegmentTimeout(long defaultTimeout)

Review Comment:
   can be private-scoped



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java:
##########
@@ -318,4 +330,258 @@ public boolean isSingleThreaded()
         () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners, 
query)
     );
   }
+
+  @Test(timeout = 60_000L)
+  public void testTimeoutExceptionOnQueryable_multiThreaded()
+  {
+    final GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(QueryContexts.TIMEOUT_KEY, 1))
+        .build();
+
+    GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+        GroupByQueryRunnerTest.DEFAULT_MAPPER,
+        new GroupByQueryConfig()
+        {
+
+          @Override
+          public boolean isSingleThreaded()
+          {
+            return true;
+          }
+        }
+    );
+    QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+      try {
+        Thread.sleep(100);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return Sequences.empty();
+    };
+
+    QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+        Execs.directExecutor(),
+        List.of(runner, mockRunner)
+    );
+
+    Assert.assertThrows(
+        QueryTimeoutException.class,
+        () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners, 
query)
+    );
+  }
+
+  @Test(timeout = 20_000L)
+  public void test_multiThreaded_perSegmentTimeout_causes_queryTimeout()
+  {
+    final GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(
+            QueryContexts.TIMEOUT_KEY,
+            300_000,
+            QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+            100
+        ))
+        .build();
+
+    GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+        GroupByQueryRunnerTest.DEFAULT_MAPPER,
+        new GroupByQueryConfig()
+        {
+
+          @Override
+          public boolean isSingleThreaded()
+          {
+            return false;
+          }
+        }
+    );
+    QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+      try {
+        Thread.sleep(1000);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return Sequences.empty();
+    };
+
+    QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+        processingPool,
+        List.of(runner, mockRunner)
+    );
+
+    Assert.assertThrows(
+        QueryTimeoutException.class,
+        () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners, 
query)
+    );
+  }
+
+  @Test(timeout = 20_000L)
+  public void test_singleThreaded_perSegmentTimeout_causes_queryTimeout()
+  {
+    final GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(
+            QueryContexts.TIMEOUT_KEY,
+            300_000,
+            QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+            100
+        ))
+        .build();
+
+    GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+        GroupByQueryRunnerTest.DEFAULT_MAPPER,
+        new GroupByQueryConfig()
+        {
+
+          @Override
+          public boolean isSingleThreaded()
+          {
+            return true;
+          }
+        }
+    );
+    QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+      try {
+        Thread.sleep(1000);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return Sequences.empty();
+    };
+
+    QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+        processingPool,
+        List.of(runner, mockRunner)
+    );
+
+    Assert.assertThrows(
+        QueryTimeoutException.class,

Review Comment:
   Same comment on exception validation



##########
processing/src/main/java/org/apache/druid/java/util/common/concurrent/DummyExecutorService.java:
##########
@@ -117,4 +120,38 @@ public void execute(Runnable command)
   {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public @NotNull ScheduledFuture<?> schedule(@NotNull Runnable command, long 
delay, @NotNull TimeUnit unit)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public @NotNull <V> ScheduledFuture<V> schedule(@NotNull Callable<V> 
callable, long delay, @NotNull TimeUnit unit)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public @NotNull ScheduledFuture<?> scheduleAtFixedRate(
+      @NotNull Runnable command,
+      long initialDelay,
+      long period,
+      @NotNull TimeUnit unit
+  )
+  {
+    throw new UnsupportedOperationException();
+  }
+

Review Comment:
   Why implement `ScheduledExecutorService` to override these methods and throw 
`UnsupportedOperationException()`?  Looks like this is only called from the 
`BrokerProcessingModule` - would passing null there suffice instead of this 
dummy one?



##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java:
##########
@@ -176,6 +176,8 @@ public Sequence<ResultRow> run(final QueryPlus<ResultRow> 
queryPlus, final Respo
     // query processing together.
     final long queryTimeout = queryContext.getTimeout();
     final boolean hasTimeout = queryContext.hasTimeout();
+    final boolean hasPerSegmentTimeout = queryContext.usePerSegmentTimeout();

Review Comment:
   Is there a cluster-level config that an operator can set and forget for all 
queries (similar to `druid.server.http.defaultQueryTimeout`)?
   
   Also, is it intentional that we've not 
[documented](https://druid.apache.org/docs/latest/querying/query-context-reference/#general-parameters)
 this query context?



##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -141,32 +157,33 @@ public Iterable<T> call()
             queryWatcher.registerQueryFuture(query, future);
 
             try {
-              final QueryContext context = query.context();
               return new MergeIterable<>(
                   context.hasTimeout() ?
                       future.get(context.getTimeout(), TimeUnit.MILLISECONDS) :
                       future.get(),
                   ordering.nullsFirst()
               ).iterator();
             }
-            catch (InterruptedException e) {
-              log.noStackTrace().warn(e, "Query interrupted, cancelling 
pending results, query id [%s]", query.getId());
-              //Note: canceling combinedFuture first so that it can complete 
with INTERRUPTED as its final state. See 
ChainedExecutionQueryRunnerTest.testQueryTimeout()
+            catch (CancellationException | InterruptedException e) {
+              log.noStackTrace().warn(e, "Query interrupted, cancelling 
pending results for query [%s]", query.getId());
               GuavaUtils.cancelAll(true, future, futures);
               throw new QueryInterruptedException(e);
             }
-            catch (CancellationException e) {
-              throw new QueryInterruptedException(e);
-            }
-            catch (TimeoutException e) {
-              log.warn("Query timeout, cancelling pending results for query id 
[%s]", query.getId());
+            catch (TimeoutException | QueryTimeoutException e) {
+              log.noStackTrace().warn(e, "Query timeout, cancelling pending 
results for query [%s]", query.getId());

Review Comment:
   Would this timeout occur if a query exceeds `timeout`?



##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java:
##########
@@ -176,6 +176,8 @@ public Sequence<ResultRow> run(final QueryPlus<ResultRow> 
queryPlus, final Respo
     // query processing together.
     final long queryTimeout = queryContext.getTimeout();
     final boolean hasTimeout = queryContext.hasTimeout();
+    final boolean hasPerSegmentTimeout = queryContext.usePerSegmentTimeout();
+    final long perSegmentTimeout = queryContext.getPerSegmentTimeout();

Review Comment:
   I think for a cluster-level config, we’d want to wire up some changes to 
`SetAndVerifyContextQueryRunner` so it applies to all query runners; it'd apply 
the query-specific context and override the cluster default



##########
server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java:
##########
@@ -157,6 +158,7 @@ public static QueryProcessingPool 
createProcessingExecutorPool(
             lifecycle,
             config
         ),
+        config.getNumTimeoutThreads() > 0 ? 
ScheduledExecutors.fixed(config.getNumTimeoutThreads(), 
"PrioritizedExecutorService-Timeout-%%d") : null,

Review Comment:
   Typo?
   ```suggestion
           config.getNumTimeoutThreads() > 0 ? 
ScheduledExecutors.fixed(config.getNumTimeoutThreads(), 
"PrioritizedExecutorService-Timeout-%d") : null,
   ```



##########
server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java:
##########
@@ -84,7 +85,7 @@ public QueryProcessingPool getProcessingExecutorPool(
       DruidProcessingConfig config
   )
   {
-    return new ForwardingQueryProcessingPool(Execs.dummy());
+    return new ForwardingQueryProcessingPool(Execs.dummy(), 
(ScheduledExecutorService) Execs.dummy());

Review Comment:
   Any reason to not pass this as null for the Broker module rather than a 
dummy one? 



##########
processing/src/main/java/org/apache/druid/query/QueryContext.java:
##########
@@ -545,6 +544,32 @@ public void verifyMaxQueryTimeout(long maxQueryTimeout)
     }
   }
 
+  public long getPerSegmentTimeout()
+  {
+    return getPerSegmentTimeout(QueryContexts.NO_TIMEOUT);
+  }
+
+  public long getPerSegmentTimeout(long defaultTimeout)
+  {
+    final long timeout = getLong(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 
defaultTimeout);
+    if (timeout >= 0) {
+      return timeout;
+    }
+
+    throw new BadQueryContextException(
+        StringUtils.format(
+            "Per-segment timeout [%s] must be a non negative value, but was 
%d",

Review Comment:
   ```suggestion
               "Per-segment timeout [%s] must be a non negative value, but was 
[%d]",
   ```



##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -97,42 +101,54 @@ public Iterator<T> make()
                             throw new ISE("Null queryRunner! Looks to be some 
segment unmapping action happening");
                           }
 
-                          return queryProcessingPool.submitRunnerTask(
-                              new 
AbstractPrioritizedQueryRunnerCallable<>(priority, input)
-                              {
-                                @Override
-                                public Iterable<T> call()
-                                {
-                                  try {
-                                    Sequence<T> result = 
input.run(threadSafeQueryPlus, responseContext);
-                                    if (result == null) {
-                                      throw new ISE("Got a null result! 
Segments are missing!");
-                                    }
-
-                                    List<T> retVal = result.toList();
-                                    if (retVal == null) {
-                                      throw new ISE("Got a null list of 
results");
-                                    }
-
-                                    return retVal;
-                                  }
-                                  catch (QueryInterruptedException e) {
-                                    throw new RuntimeException(e);
-                                  }
-                                  catch (QueryTimeoutException e) {
-                                    throw e;
-                                  }
-                                  catch (Exception e) {
-                                    if (query.context().isDebug()) {
-                                      log.error(e, "Exception with one of the 
sequences!");
-                                    } else {
-                                      log.noStackTrace().error(e, "Exception 
with one of the sequences!");
-                                    }
-                                    Throwables.propagateIfPossible(e);
-                                    throw new RuntimeException(e);
-                                  }
+                          final 
AbstractPrioritizedQueryRunnerCallable<Iterable<T>, T> callable = new 
AbstractPrioritizedQueryRunnerCallable<>(
+                              priority,
+                              input
+                          )
+                          {
+                            @Override
+                            public Iterable<T> call()
+                            {
+                              try {
+                                Sequence<T> result = 
input.run(threadSafeQueryPlus, responseContext);
+                                if (result == null) {
+                                  throw new ISE("Got a null result! Segments 
are missing!");
+                                }
+
+                                List<T> retVal = result.toList();
+                                if (retVal == null) {
+                                  throw new ISE("Got a null list of results");
+                                }
+
+                                return retVal;
+                              }
+                              catch (QueryInterruptedException e) {
+                                throw new RuntimeException(e);
+                              }
+                              catch (QueryTimeoutException e) {
+                                throw e;
+                              }
+                              catch (Exception e) {
+                                if (query.context().isDebug()) {
+                                  log.error(e, "Exception with one of the 
sequences!");
+                                } else {
+                                  log.noStackTrace().error(e, "Exception with 
one of the sequences!");
                                 }
-                              });
+                                Throwables.throwIfUnchecked(e);

Review Comment:
   `Throwables.propagateIfPossible(e);` -> `Throwables.throwIfUnchecked(e);` bc 
of deprecated usage?



##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java:
##########
@@ -376,22 +387,24 @@ private void waitForFutureCompletion(
         }
       }
     }
-    catch (InterruptedException e) {
-      log.warn(e, "Query interrupted, cancelling pending results, query id 
[%s]", query.getId());
-      GuavaUtils.cancelAll(true, future, futures);
-      throw new QueryInterruptedException(e);
-    }
-    catch (CancellationException e) {
+    catch (InterruptedException | CancellationException e) {
+      log.noStackTrace().warn(e, "Query interrupted, cancelling pending 
results for query [%s]", query.getId());
       GuavaUtils.cancelAll(true, future, futures);
       throw new QueryInterruptedException(e);
     }
     catch (QueryTimeoutException | TimeoutException e) {
-      log.info("Query timeout, cancelling pending results for query id [%s]", 
query.getId());
+      log.noStackTrace().warn(e, "Query timeout, cancelling pending results 
for query [%s]", query.getId());
       GuavaUtils.cancelAll(true, future, futures);
-      throw new QueryTimeoutException();
+      throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] 
timed out", query.getId()));
     }
     catch (ExecutionException e) {
+      log.noStackTrace().warn(e, "Query error, cancelling pending results for 
query [%s]", query.getId());
       GuavaUtils.cancelAll(true, future, futures);
+      Throwable cause = e.getCause();
+      // Nested per-segment future timeout
+      if (cause instanceof TimeoutException) {
+        throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query 
[%s] timed out", query.getId()));

Review Comment:
   Ditto on the error message here and above



##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -141,32 +157,33 @@ public Iterable<T> call()
             queryWatcher.registerQueryFuture(query, future);
 
             try {
-              final QueryContext context = query.context();
               return new MergeIterable<>(
                   context.hasTimeout() ?
                       future.get(context.getTimeout(), TimeUnit.MILLISECONDS) :
                       future.get(),
                   ordering.nullsFirst()
               ).iterator();
             }
-            catch (InterruptedException e) {
-              log.noStackTrace().warn(e, "Query interrupted, cancelling 
pending results, query id [%s]", query.getId());
-              //Note: canceling combinedFuture first so that it can complete 
with INTERRUPTED as its final state. See 
ChainedExecutionQueryRunnerTest.testQueryTimeout()
+            catch (CancellationException | InterruptedException e) {
+              log.noStackTrace().warn(e, "Query interrupted, cancelling 
pending results for query [%s]", query.getId());
               GuavaUtils.cancelAll(true, future, futures);
               throw new QueryInterruptedException(e);
             }
-            catch (CancellationException e) {
-              throw new QueryInterruptedException(e);
-            }
-            catch (TimeoutException e) {
-              log.warn("Query timeout, cancelling pending results for query id 
[%s]", query.getId());
+            catch (TimeoutException | QueryTimeoutException e) {
+              log.noStackTrace().warn(e, "Query timeout, cancelling pending 
results for query [%s]", query.getId());
               GuavaUtils.cancelAll(true, future, futures);
               throw new 
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", 
query.getId()));
             }
             catch (ExecutionException e) {
+              log.noStackTrace().warn(e, "Query error, cancelling pending 
results for query [%s]", query.getId());
               GuavaUtils.cancelAll(true, future, futures);
-              Throwables.propagateIfPossible(e.getCause());
-              throw new RuntimeException(e.getCause());
+              Throwable cause = e.getCause();
+              // Nested per-segment future timeout
+              if (cause instanceof TimeoutException) {

Review Comment:
   Would this timeout occur if it exceeds `perSegmentTimeout` specifically? If 
so, could you update the error message to reflect that so a user/admin can take 
corrective action? Same for the above timeout excewption message.



##########
processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java:
##########
@@ -344,6 +358,151 @@ public void testSubmittedTaskType()
     Assert.assertEquals(runners, actual);
   }
 
+  @Test(timeout = 10_000L)
+  public void testPerSegmentTimeout()
+  {
+    QueryRunner<Integer> slowRunner = (queryPlus, responseContext) -> {
+      try {
+        Thread.sleep(500);
+        return Sequences.of(2);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    };
+    QueryRunner<Integer> fastRunner = (queryPlus, responseContext) -> 
Sequences.of(1);
+
+    QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
+    watcher.registerQueryFuture(
+        EasyMock.anyObject(),
+        EasyMock.anyObject()
+    );
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.replay(watcher);
+
+    ChainedExecutionQueryRunner chainedRunner = new 
ChainedExecutionQueryRunner<>(
+        processingPool,
+        watcher,
+        Arrays.asList(slowRunner, fastRunner)
+    );
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  .dataSource("test")
+                                  .intervals("2014/2015")
+                                  .aggregators(Collections.singletonList(new 
CountAggregatorFactory("count")))
+                                  .context(
+                                      ImmutableMap.of(
+                                          
QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 100L,
+                                          QueryContexts.TIMEOUT_KEY, 5_000L
+                                      )
+                                  )
+                                  .build();
+    Sequence seq = chainedRunner.run(QueryPlus.wrap(query));
+
+    List<Integer> results = null;
+    Exception thrown = null;
+    try {
+      results = seq.toList();
+    }
+    catch (Exception e) {
+      thrown = e;
+    }
+
+    Assert.assertNull("No results expected due to timeout", results);
+    Assert.assertNotNull("Exception should be thrown", thrown);
+    Assert.assertTrue(
+        "Should be QueryTimeoutException or caused by it",
+        Throwables.getRootCause(thrown) instanceof QueryTimeoutException

Review Comment:
   Since `QueryTimeoutException` can be thrown from multiple places depending 
on what timeout kicks in, could you also assert on the error message so we know 
which code path triggered it?



##########
processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java:
##########
@@ -19,12 +19,17 @@
 
 package org.apache.druid.query;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
+import javax.annotation.Nullable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Default implementation of {@link QueryProcessingPool} that just forwards 
operations, including query execution tasks,

Review Comment:
   Could you update the javadoc here to reflect the usage of `timeoutService` 
in conjunction to this pool?



##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java:
##########
@@ -318,4 +330,258 @@ public boolean isSingleThreaded()
         () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners, 
query)
     );
   }
+
+  @Test(timeout = 60_000L)
+  public void testTimeoutExceptionOnQueryable_multiThreaded()
+  {
+    final GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(QueryContexts.TIMEOUT_KEY, 1))
+        .build();
+
+    GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+        GroupByQueryRunnerTest.DEFAULT_MAPPER,
+        new GroupByQueryConfig()
+        {
+
+          @Override
+          public boolean isSingleThreaded()
+          {
+            return true;
+          }
+        }
+    );
+    QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+      try {
+        Thread.sleep(100);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return Sequences.empty();
+    };
+
+    QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+        Execs.directExecutor(),
+        List.of(runner, mockRunner)
+    );
+
+    Assert.assertThrows(
+        QueryTimeoutException.class,
+        () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners, 
query)
+    );
+  }
+
+  @Test(timeout = 20_000L)
+  public void test_multiThreaded_perSegmentTimeout_causes_queryTimeout()
+  {
+    final GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(
+            QueryContexts.TIMEOUT_KEY,
+            300_000,
+            QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+            100
+        ))
+        .build();
+
+    GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+        GroupByQueryRunnerTest.DEFAULT_MAPPER,
+        new GroupByQueryConfig()
+        {
+
+          @Override
+          public boolean isSingleThreaded()
+          {
+            return false;
+          }
+        }
+    );
+    QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+      try {
+        Thread.sleep(1000);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return Sequences.empty();
+    };
+
+    QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+        processingPool,
+        List.of(runner, mockRunner)
+    );
+
+    Assert.assertThrows(
+        QueryTimeoutException.class,
+        () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners, 
query)
+    );
+  }
+
+  @Test(timeout = 20_000L)
+  public void test_singleThreaded_perSegmentTimeout_causes_queryTimeout()
+  {
+    final GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(
+            QueryContexts.TIMEOUT_KEY,
+            300_000,
+            QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+            100
+        ))
+        .build();
+
+    GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+        GroupByQueryRunnerTest.DEFAULT_MAPPER,
+        new GroupByQueryConfig()
+        {
+
+          @Override
+          public boolean isSingleThreaded()
+          {
+            return true;
+          }
+        }
+    );
+    QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+      try {
+        Thread.sleep(1000);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return Sequences.empty();
+    };
+
+    QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+        processingPool,
+        List.of(runner, mockRunner)
+    );
+
+    Assert.assertThrows(
+        QueryTimeoutException.class,
+        () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners, 
query)
+    );
+  }
+
+  @Test(timeout = 5_000L)
+  public void test_perSegmentTimeout_crossQuery() throws Exception
+  {
+    GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+        GroupByQueryRunnerTest.DEFAULT_MAPPER,
+        new GroupByQueryConfig()
+        {
+          @Override
+          public boolean isSingleThreaded()
+          {
+            return false;
+          }
+        }
+    );
+
+    final GroupByQuery slowQuery = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(
+            QueryContexts.TIMEOUT_KEY, 300_000,
+            QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 1_000
+        ))
+        .queryId("slow")
+        .build();
+
+    final GroupByQuery fastQuery = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(
+            QueryContexts.TIMEOUT_KEY, 5_000,
+            QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 100
+        ))
+        .queryId("fast")
+        .build();
+
+    CountDownLatch slowStart = new CountDownLatch(2);
+    CountDownLatch fastStart = new CountDownLatch(1);
+
+    QueryRunner<ResultRow> signalingSlowRunner = (queryPlus, responseContext) 
-> {
+      slowStart.countDown();
+      try {
+        Thread.sleep(60_000L);

Review Comment:
   Does this need to sleep for 60s? In the interest of having faster tests, I 
think you can just do something like 6s since the queries below have lower 
timeout values
   
   



##########
docs/configuration/index.md:
##########
@@ -1371,6 +1371,7 @@ Processing properties set on the Middle Manager are 
passed through to Peons.
 |`druid.processing.formatString`|Realtime and Historical processes use this 
format string to name their processing threads.|processing-%s|
 |`druid.processing.numMergeBuffers`|The number of direct memory buffers 
available for merging query results. The buffers are sized by 
`druid.processing.buffer.sizeBytes`. This property is effectively a concurrency 
limit for queries that require merging buffers. If you are using any queries 
that require merge buffers (currently, just groupBy) then you should have at 
least two of these.|`max(2, druid.processing.numThreads / 4)`|
 |`druid.processing.numThreads`|The number of processing threads to have 
available for parallel processing of segments. Our rule of thumb is `num_cores 
- 1`, which means that even under heavy load there will still be one core 
available to do background tasks like talking with ZooKeeper and pulling down 
segments. If only one core is available, this property defaults to the value 
`1`.|Number of cores - 1 (or 1)|
+|`druid.processing.numTimeoutThreads`|The number of processing threads to have 
available for handling per-segment query timeouts.|0|

Review Comment:
   Also, could you add guidance on how an operator should configure this 
property? What happens if an operator set this value too high or as low as 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to