abhishekrb19 commented on code in PR #18148:
URL: https://github.com/apache/druid/pull/18148#discussion_r2403631582
##########
processing/src/main/java/org/apache/druid/query/QueryContext.java:
##########
@@ -545,6 +544,32 @@ public void verifyMaxQueryTimeout(long maxQueryTimeout)
}
}
+ public long getPerSegmentTimeout()
+ {
+ return getPerSegmentTimeout(QueryContexts.NO_TIMEOUT);
+ }
+
+ public long getPerSegmentTimeout(long defaultTimeout)
+ {
+ final long timeout = getLong(QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
defaultTimeout);
+ if (timeout >= 0) {
+ return timeout;
+ }
+
+ throw new BadQueryContextException(
+ StringUtils.format(
+ "Per-segment timeout [%s] must be a non negative value, but was
%d",
+ QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+ timeout
+ )
+ );
+ }
+
+ public boolean usePerSegmentTimeout()
+ {
+ return getPerSegmentTimeout() != QueryContexts.NO_TIMEOUT;
Review Comment:
can this be set at the cluster level similar to the existing `timeout`
parameter that will broadly apply to all queries?
##########
processing/src/main/java/org/apache/druid/query/QueryContext.java:
##########
@@ -545,6 +544,32 @@ public void verifyMaxQueryTimeout(long maxQueryTimeout)
}
}
+ public long getPerSegmentTimeout()
+ {
+ return getPerSegmentTimeout(QueryContexts.NO_TIMEOUT);
+ }
+
+ public long getPerSegmentTimeout(long defaultTimeout)
Review Comment:
can be private-scoped
##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java:
##########
@@ -318,4 +330,258 @@ public boolean isSingleThreaded()
() -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners,
query)
);
}
+
+ @Test(timeout = 60_000L)
+ public void testTimeoutExceptionOnQueryable_multiThreaded()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(QueryContexts.TIMEOUT_KEY, 1))
+ .build();
+
+ GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+
+ @Override
+ public boolean isSingleThreaded()
+ {
+ return true;
+ }
+ }
+ );
+ QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Sequences.empty();
+ };
+
+ QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+ Execs.directExecutor(),
+ List.of(runner, mockRunner)
+ );
+
+ Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners,
query)
+ );
+ }
+
+ @Test(timeout = 20_000L)
+ public void test_multiThreaded_perSegmentTimeout_causes_queryTimeout()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(
+ QueryContexts.TIMEOUT_KEY,
+ 300_000,
+ QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+ 100
+ ))
+ .build();
+
+ GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+
+ @Override
+ public boolean isSingleThreaded()
+ {
+ return false;
+ }
+ }
+ );
+ QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Sequences.empty();
+ };
+
+ QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+ processingPool,
+ List.of(runner, mockRunner)
+ );
+
+ Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners,
query)
+ );
+ }
+
+ @Test(timeout = 20_000L)
+ public void test_singleThreaded_perSegmentTimeout_causes_queryTimeout()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(
+ QueryContexts.TIMEOUT_KEY,
+ 300_000,
+ QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+ 100
+ ))
+ .build();
+
+ GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+
+ @Override
+ public boolean isSingleThreaded()
+ {
+ return true;
+ }
+ }
+ );
+ QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Sequences.empty();
+ };
+
+ QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+ processingPool,
+ List.of(runner, mockRunner)
+ );
+
+ Assert.assertThrows(
+ QueryTimeoutException.class,
Review Comment:
Same comment on exception validation
##########
processing/src/main/java/org/apache/druid/java/util/common/concurrent/DummyExecutorService.java:
##########
@@ -117,4 +120,38 @@ public void execute(Runnable command)
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public @NotNull ScheduledFuture<?> schedule(@NotNull Runnable command, long
delay, @NotNull TimeUnit unit)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public @NotNull <V> ScheduledFuture<V> schedule(@NotNull Callable<V>
callable, long delay, @NotNull TimeUnit unit)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public @NotNull ScheduledFuture<?> scheduleAtFixedRate(
+ @NotNull Runnable command,
+ long initialDelay,
+ long period,
+ @NotNull TimeUnit unit
+ )
+ {
+ throw new UnsupportedOperationException();
+ }
+
Review Comment:
Why implement `ScheduledExecutorService` to override these methods and throw
`UnsupportedOperationException()`? Looks like this is only called from the
`BrokerProcessingModule` - would passing null there suffice instead of this
dummy one?
##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java:
##########
@@ -176,6 +176,8 @@ public Sequence<ResultRow> run(final QueryPlus<ResultRow>
queryPlus, final Respo
// query processing together.
final long queryTimeout = queryContext.getTimeout();
final boolean hasTimeout = queryContext.hasTimeout();
+ final boolean hasPerSegmentTimeout = queryContext.usePerSegmentTimeout();
Review Comment:
Is there a cluster-level config that an operator can set and forget for all
queries (similar to `druid.server.http.defaultQueryTimeout`)?
Also, is it intentional that we've not
[documented](https://druid.apache.org/docs/latest/querying/query-context-reference/#general-parameters)
this query context?
##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -141,32 +157,33 @@ public Iterable<T> call()
queryWatcher.registerQueryFuture(query, future);
try {
- final QueryContext context = query.context();
return new MergeIterable<>(
context.hasTimeout() ?
future.get(context.getTimeout(), TimeUnit.MILLISECONDS) :
future.get(),
ordering.nullsFirst()
).iterator();
}
- catch (InterruptedException e) {
- log.noStackTrace().warn(e, "Query interrupted, cancelling
pending results, query id [%s]", query.getId());
- //Note: canceling combinedFuture first so that it can complete
with INTERRUPTED as its final state. See
ChainedExecutionQueryRunnerTest.testQueryTimeout()
+ catch (CancellationException | InterruptedException e) {
+ log.noStackTrace().warn(e, "Query interrupted, cancelling
pending results for query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
- catch (CancellationException e) {
- throw new QueryInterruptedException(e);
- }
- catch (TimeoutException e) {
- log.warn("Query timeout, cancelling pending results for query id
[%s]", query.getId());
+ catch (TimeoutException | QueryTimeoutException e) {
+ log.noStackTrace().warn(e, "Query timeout, cancelling pending
results for query [%s]", query.getId());
Review Comment:
Would this timeout occur if a query exceeds `timeout`?
##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java:
##########
@@ -176,6 +176,8 @@ public Sequence<ResultRow> run(final QueryPlus<ResultRow>
queryPlus, final Respo
// query processing together.
final long queryTimeout = queryContext.getTimeout();
final boolean hasTimeout = queryContext.hasTimeout();
+ final boolean hasPerSegmentTimeout = queryContext.usePerSegmentTimeout();
+ final long perSegmentTimeout = queryContext.getPerSegmentTimeout();
Review Comment:
I think for a cluster-level config, we’d want to wire up some changes to
`SetAndVerifyContextQueryRunner` so it applies to all query runners; it'd apply
the query-specific context and override the cluster default
##########
server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java:
##########
@@ -157,6 +158,7 @@ public static QueryProcessingPool
createProcessingExecutorPool(
lifecycle,
config
),
+ config.getNumTimeoutThreads() > 0 ?
ScheduledExecutors.fixed(config.getNumTimeoutThreads(),
"PrioritizedExecutorService-Timeout-%%d") : null,
Review Comment:
Typo?
```suggestion
config.getNumTimeoutThreads() > 0 ?
ScheduledExecutors.fixed(config.getNumTimeoutThreads(),
"PrioritizedExecutorService-Timeout-%d") : null,
```
##########
server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java:
##########
@@ -84,7 +85,7 @@ public QueryProcessingPool getProcessingExecutorPool(
DruidProcessingConfig config
)
{
- return new ForwardingQueryProcessingPool(Execs.dummy());
+ return new ForwardingQueryProcessingPool(Execs.dummy(),
(ScheduledExecutorService) Execs.dummy());
Review Comment:
Any reason to not pass this as null for the Broker module rather than a
dummy one?
##########
processing/src/main/java/org/apache/druid/query/QueryContext.java:
##########
@@ -545,6 +544,32 @@ public void verifyMaxQueryTimeout(long maxQueryTimeout)
}
}
+ public long getPerSegmentTimeout()
+ {
+ return getPerSegmentTimeout(QueryContexts.NO_TIMEOUT);
+ }
+
+ public long getPerSegmentTimeout(long defaultTimeout)
+ {
+ final long timeout = getLong(QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
defaultTimeout);
+ if (timeout >= 0) {
+ return timeout;
+ }
+
+ throw new BadQueryContextException(
+ StringUtils.format(
+ "Per-segment timeout [%s] must be a non negative value, but was
%d",
Review Comment:
```suggestion
"Per-segment timeout [%s] must be a non negative value, but was
[%d]",
```
##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -97,42 +101,54 @@ public Iterator<T> make()
throw new ISE("Null queryRunner! Looks to be some
segment unmapping action happening");
}
- return queryProcessingPool.submitRunnerTask(
- new
AbstractPrioritizedQueryRunnerCallable<>(priority, input)
- {
- @Override
- public Iterable<T> call()
- {
- try {
- Sequence<T> result =
input.run(threadSafeQueryPlus, responseContext);
- if (result == null) {
- throw new ISE("Got a null result!
Segments are missing!");
- }
-
- List<T> retVal = result.toList();
- if (retVal == null) {
- throw new ISE("Got a null list of
results");
- }
-
- return retVal;
- }
- catch (QueryInterruptedException e) {
- throw new RuntimeException(e);
- }
- catch (QueryTimeoutException e) {
- throw e;
- }
- catch (Exception e) {
- if (query.context().isDebug()) {
- log.error(e, "Exception with one of the
sequences!");
- } else {
- log.noStackTrace().error(e, "Exception
with one of the sequences!");
- }
- Throwables.propagateIfPossible(e);
- throw new RuntimeException(e);
- }
+ final
AbstractPrioritizedQueryRunnerCallable<Iterable<T>, T> callable = new
AbstractPrioritizedQueryRunnerCallable<>(
+ priority,
+ input
+ )
+ {
+ @Override
+ public Iterable<T> call()
+ {
+ try {
+ Sequence<T> result =
input.run(threadSafeQueryPlus, responseContext);
+ if (result == null) {
+ throw new ISE("Got a null result! Segments
are missing!");
+ }
+
+ List<T> retVal = result.toList();
+ if (retVal == null) {
+ throw new ISE("Got a null list of results");
+ }
+
+ return retVal;
+ }
+ catch (QueryInterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ catch (QueryTimeoutException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ if (query.context().isDebug()) {
+ log.error(e, "Exception with one of the
sequences!");
+ } else {
+ log.noStackTrace().error(e, "Exception with
one of the sequences!");
}
- });
+ Throwables.throwIfUnchecked(e);
Review Comment:
`Throwables.propagateIfPossible(e);` -> `Throwables.throwIfUnchecked(e);` bc
of deprecated usage?
##########
processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java:
##########
@@ -376,22 +387,24 @@ private void waitForFutureCompletion(
}
}
}
- catch (InterruptedException e) {
- log.warn(e, "Query interrupted, cancelling pending results, query id
[%s]", query.getId());
- GuavaUtils.cancelAll(true, future, futures);
- throw new QueryInterruptedException(e);
- }
- catch (CancellationException e) {
+ catch (InterruptedException | CancellationException e) {
+ log.noStackTrace().warn(e, "Query interrupted, cancelling pending
results for query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
catch (QueryTimeoutException | TimeoutException e) {
- log.info("Query timeout, cancelling pending results for query id [%s]",
query.getId());
+ log.noStackTrace().warn(e, "Query timeout, cancelling pending results
for query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
- throw new QueryTimeoutException();
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s]
timed out", query.getId()));
}
catch (ExecutionException e) {
+ log.noStackTrace().warn(e, "Query error, cancelling pending results for
query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
+ Throwable cause = e.getCause();
+ // Nested per-segment future timeout
+ if (cause instanceof TimeoutException) {
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query
[%s] timed out", query.getId()));
Review Comment:
Ditto on the error message here and above
##########
processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java:
##########
@@ -141,32 +157,33 @@ public Iterable<T> call()
queryWatcher.registerQueryFuture(query, future);
try {
- final QueryContext context = query.context();
return new MergeIterable<>(
context.hasTimeout() ?
future.get(context.getTimeout(), TimeUnit.MILLISECONDS) :
future.get(),
ordering.nullsFirst()
).iterator();
}
- catch (InterruptedException e) {
- log.noStackTrace().warn(e, "Query interrupted, cancelling
pending results, query id [%s]", query.getId());
- //Note: canceling combinedFuture first so that it can complete
with INTERRUPTED as its final state. See
ChainedExecutionQueryRunnerTest.testQueryTimeout()
+ catch (CancellationException | InterruptedException e) {
+ log.noStackTrace().warn(e, "Query interrupted, cancelling
pending results for query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
- catch (CancellationException e) {
- throw new QueryInterruptedException(e);
- }
- catch (TimeoutException e) {
- log.warn("Query timeout, cancelling pending results for query id
[%s]", query.getId());
+ catch (TimeoutException | QueryTimeoutException e) {
+ log.noStackTrace().warn(e, "Query timeout, cancelling pending
results for query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out",
query.getId()));
}
catch (ExecutionException e) {
+ log.noStackTrace().warn(e, "Query error, cancelling pending
results for query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
- Throwables.propagateIfPossible(e.getCause());
- throw new RuntimeException(e.getCause());
+ Throwable cause = e.getCause();
+ // Nested per-segment future timeout
+ if (cause instanceof TimeoutException) {
Review Comment:
Would this timeout occur if it exceeds `perSegmentTimeout` specifically? If
so, could you update the error message to reflect that so a user/admin can take
corrective action? Same for the above timeout excewption message.
##########
processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java:
##########
@@ -344,6 +358,151 @@ public void testSubmittedTaskType()
Assert.assertEquals(runners, actual);
}
+ @Test(timeout = 10_000L)
+ public void testPerSegmentTimeout()
+ {
+ QueryRunner<Integer> slowRunner = (queryPlus, responseContext) -> {
+ try {
+ Thread.sleep(500);
+ return Sequences.of(2);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ QueryRunner<Integer> fastRunner = (queryPlus, responseContext) ->
Sequences.of(1);
+
+ QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
+ watcher.registerQueryFuture(
+ EasyMock.anyObject(),
+ EasyMock.anyObject()
+ );
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(watcher);
+
+ ChainedExecutionQueryRunner chainedRunner = new
ChainedExecutionQueryRunner<>(
+ processingPool,
+ watcher,
+ Arrays.asList(slowRunner, fastRunner)
+ );
+ TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource("test")
+ .intervals("2014/2015")
+ .aggregators(Collections.singletonList(new
CountAggregatorFactory("count")))
+ .context(
+ ImmutableMap.of(
+
QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 100L,
+ QueryContexts.TIMEOUT_KEY, 5_000L
+ )
+ )
+ .build();
+ Sequence seq = chainedRunner.run(QueryPlus.wrap(query));
+
+ List<Integer> results = null;
+ Exception thrown = null;
+ try {
+ results = seq.toList();
+ }
+ catch (Exception e) {
+ thrown = e;
+ }
+
+ Assert.assertNull("No results expected due to timeout", results);
+ Assert.assertNotNull("Exception should be thrown", thrown);
+ Assert.assertTrue(
+ "Should be QueryTimeoutException or caused by it",
+ Throwables.getRootCause(thrown) instanceof QueryTimeoutException
Review Comment:
Since `QueryTimeoutException` can be thrown from multiple places depending
on what timeout kicks in, could you also assert on the error message so we know
which code path triggered it?
##########
processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java:
##########
@@ -19,12 +19,17 @@
package org.apache.druid.query;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import javax.annotation.Nullable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Default implementation of {@link QueryProcessingPool} that just forwards
operations, including query execution tasks,
Review Comment:
Could you update the javadoc here to reflect the usage of `timeoutService`
in conjunction to this pool?
##########
processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java:
##########
@@ -318,4 +330,258 @@ public boolean isSingleThreaded()
() -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners,
query)
);
}
+
+ @Test(timeout = 60_000L)
+ public void testTimeoutExceptionOnQueryable_multiThreaded()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(QueryContexts.TIMEOUT_KEY, 1))
+ .build();
+
+ GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+
+ @Override
+ public boolean isSingleThreaded()
+ {
+ return true;
+ }
+ }
+ );
+ QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Sequences.empty();
+ };
+
+ QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+ Execs.directExecutor(),
+ List.of(runner, mockRunner)
+ );
+
+ Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners,
query)
+ );
+ }
+
+ @Test(timeout = 20_000L)
+ public void test_multiThreaded_perSegmentTimeout_causes_queryTimeout()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(
+ QueryContexts.TIMEOUT_KEY,
+ 300_000,
+ QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+ 100
+ ))
+ .build();
+
+ GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+
+ @Override
+ public boolean isSingleThreaded()
+ {
+ return false;
+ }
+ }
+ );
+ QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Sequences.empty();
+ };
+
+ QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+ processingPool,
+ List.of(runner, mockRunner)
+ );
+
+ Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners,
query)
+ );
+ }
+
+ @Test(timeout = 20_000L)
+ public void test_singleThreaded_perSegmentTimeout_causes_queryTimeout()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(
+ QueryContexts.TIMEOUT_KEY,
+ 300_000,
+ QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+ 100
+ ))
+ .build();
+
+ GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+
+ @Override
+ public boolean isSingleThreaded()
+ {
+ return true;
+ }
+ }
+ );
+ QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Sequences.empty();
+ };
+
+ QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+ processingPool,
+ List.of(runner, mockRunner)
+ );
+
+ Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners,
query)
+ );
+ }
+
+ @Test(timeout = 5_000L)
+ public void test_perSegmentTimeout_crossQuery() throws Exception
+ {
+ GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+ @Override
+ public boolean isSingleThreaded()
+ {
+ return false;
+ }
+ }
+ );
+
+ final GroupByQuery slowQuery = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(
+ QueryContexts.TIMEOUT_KEY, 300_000,
+ QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 1_000
+ ))
+ .queryId("slow")
+ .build();
+
+ final GroupByQuery fastQuery = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(
+ QueryContexts.TIMEOUT_KEY, 5_000,
+ QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 100
+ ))
+ .queryId("fast")
+ .build();
+
+ CountDownLatch slowStart = new CountDownLatch(2);
+ CountDownLatch fastStart = new CountDownLatch(1);
+
+ QueryRunner<ResultRow> signalingSlowRunner = (queryPlus, responseContext)
-> {
+ slowStart.countDown();
+ try {
+ Thread.sleep(60_000L);
Review Comment:
Does this need to sleep for 60s? In the interest of having faster tests, I
think you can just do something like 6s since the queries below have lower
timeout values
##########
docs/configuration/index.md:
##########
@@ -1371,6 +1371,7 @@ Processing properties set on the Middle Manager are
passed through to Peons.
|`druid.processing.formatString`|Realtime and Historical processes use this
format string to name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers
available for merging query results. The buffers are sized by
`druid.processing.buffer.sizeBytes`. This property is effectively a concurrency
limit for queries that require merging buffers. If you are using any queries
that require merge buffers (currently, just groupBy) then you should have at
least two of these.|`max(2, druid.processing.numThreads / 4)`|
|`druid.processing.numThreads`|The number of processing threads to have
available for parallel processing of segments. Our rule of thumb is `num_cores
- 1`, which means that even under heavy load there will still be one core
available to do background tasks like talking with ZooKeeper and pulling down
segments. If only one core is available, this property defaults to the value
`1`.|Number of cores - 1 (or 1)|
+|`druid.processing.numTimeoutThreads`|The number of processing threads to have
available for handling per-segment query timeouts.|0|
Review Comment:
Also, could you add guidance on how an operator should configure this
property? What happens if an operator set this value too high or as low as 1?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]