This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c5863516479 IndexerDataServerQueryHandler: Use ServiceRetryPolicy.
(#18928)
c5863516479 is described below
commit c5863516479c27a15294d1908956f0df9bdd47e5
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Jan 19 20:46:04 2026 -0800
IndexerDataServerQueryHandler: Use ServiceRetryPolicy. (#18928)
* IndexerDataServerQueryHandler: Use ServiceRetryPolicy.
Instead of a RetryUtils loop, use ServiceRetryPolicy. In addition to
being somewhat cleaner, this allows us to pass in a noRetries policy
for tests, which speeds up the test. It goes from about 24 seconds to
near-instant.
* Add tests and adjust retryable exceptions.
---
.../dart/worker/DartDataServerQueryHandler.java | 3 +-
.../indexing/IndexerDataServerQueryHandler.java | 84 ++++++++++-------
.../msq/indexing/IndexerDataServerRetryPolicy.java | 105 +++++++++++++++++++++
.../IndexerDataServerQueryHandlerTest.java | 5 +-
.../indexing/IndexerDataServerRetryPolicyTest.java | 68 +++++++++++++
.../apache/druid/discovery/DataServerClient.java | 7 +-
.../druid/discovery/DataServerClientTest.java | 4 +-
7 files changed, 237 insertions(+), 39 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
index 244a462bf83..5907846abd9 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
@@ -45,6 +45,7 @@ import org.apache.druid.query.context.DefaultResponseContext;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
+import org.apache.druid.rpc.StandardRetryPolicy;
import java.util.Collections;
import java.util.List;
@@ -151,6 +152,6 @@ public class DartDataServerQueryHandler implements
DataServerQueryHandler
private DataServerClient makeDataServerClient(ServiceLocation
serviceLocation)
{
- return new DataServerClient(serviceClientFactory, serviceLocation,
objectMapper);
+ return new DataServerClient(serviceClientFactory, serviceLocation,
objectMapper, StandardRetryPolicy.noRetries());
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
index 8d4b1c20c30..2bee9ed89ba 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
@@ -30,7 +30,6 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.discovery.DataServerClient;
import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
@@ -57,6 +56,7 @@ import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.rpc.RpcException;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
+import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.server.coordination.DruidServerMetadata;
import java.util.ArrayList;
@@ -77,7 +77,6 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
{
private static final Logger log = new
Logger(IndexerDataServerQueryHandler.class);
private static final int DEFAULT_NUM_TRIES = 3;
- private static final int PER_SERVER_QUERY_NUM_TRIES = 5;
private final int inputNumber;
private final String dataSourceName;
private final ChannelCounters channelCounters;
@@ -86,6 +85,7 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
private final ObjectMapper objectMapper;
private final QueryToolChestWarehouse warehouse;
private final DataServerRequestDescriptor dataServerRequestDescriptor;
+ private final ServiceRetryPolicy retryPolicy;
public IndexerDataServerQueryHandler(
int inputNumber,
@@ -97,6 +97,32 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
QueryToolChestWarehouse warehouse,
DataServerRequestDescriptor dataServerRequestDescriptor
)
+ {
+ this(
+ inputNumber,
+ dataSourceName,
+ channelCounters,
+ serviceClientFactory,
+ coordinatorClient,
+ objectMapper,
+ warehouse,
+ dataServerRequestDescriptor,
+ IndexerDataServerRetryPolicy.standard()
+ );
+ }
+
+ @VisibleForTesting
+ IndexerDataServerQueryHandler(
+ int inputNumber,
+ String dataSourceName,
+ ChannelCounters channelCounters,
+ ServiceClientFactory serviceClientFactory,
+ CoordinatorClient coordinatorClient,
+ ObjectMapper objectMapper,
+ QueryToolChestWarehouse warehouse,
+ DataServerRequestDescriptor dataServerRequestDescriptor,
+ ServiceRetryPolicy retryPolicy
+ )
{
this.inputNumber = inputNumber;
this.dataSourceName = dataSourceName;
@@ -106,12 +132,13 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
this.objectMapper = objectMapper;
this.warehouse = warehouse;
this.dataServerRequestDescriptor = dataServerRequestDescriptor;
+ this.retryPolicy = retryPolicy;
}
@VisibleForTesting
DataServerClient makeDataServerClient(ServiceLocation serviceLocation)
{
- return new DataServerClient(serviceClientFactory, serviceLocation,
objectMapper);
+ return new DataServerClient(serviceClientFactory, serviceLocation,
objectMapper, retryPolicy);
}
/**
@@ -233,33 +260,25 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
.collect(Collectors.toList());
try {
- return RetryUtils.retry(
- () -> {
- final ListenableFuture<Sequence<QueryType>> queryFuture =
dataServerClient.run(
- Queries.withSpecificSegments(
- query,
- requestDescriptor.getSegments()
- .stream()
-
.map(RichSegmentDescriptor::toPlainDescriptor)
- .collect(Collectors.toList())
- ),
- responseContext,
- queryResultType,
- closer
- );
-
- return closer.register(
- DataServerQueryHandlerUtils.createYielder(
- queryFuture.get().map(preComputeManipulatorFn),
- mappingFunction,
- channelCounters
- )
- );
- },
- throwable -> !(throwable instanceof ExecutionException
- && throwable.getCause() instanceof
QueryInterruptedException
- && throwable.getCause().getCause() instanceof
InterruptedException),
- PER_SERVER_QUERY_NUM_TRIES
+ final ListenableFuture<Sequence<QueryType>> queryFuture =
dataServerClient.run(
+ Queries.withSpecificSegments(
+ query,
+ requestDescriptor.getSegments()
+ .stream()
+ .map(RichSegmentDescriptor::toPlainDescriptor)
+ .collect(Collectors.toList())
+ ),
+ responseContext,
+ queryResultType,
+ closer
+ );
+
+ return closer.register(
+ DataServerQueryHandlerUtils.createYielder(
+ queryFuture.get().map(preComputeManipulatorFn),
+ mappingFunction,
+ channelCounters
+ )
);
}
catch (ExecutionException e) {
@@ -274,10 +293,11 @@ public class IndexerDataServerQueryHandler implements
DataServerQueryHandler
.build(e, "Exception while fetching rows for query
from dataservers[%s]", serviceLocation);
}
}
- catch (Exception e) {
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
- .build(e, "Exception while fetching rows for query
from dataservers[%s]", serviceLocation);
+ .build(e, "Interrupted while fetching rows for query
from dataservers[%s]", serviceLocation);
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerRetryPolicy.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerRetryPolicy.java
new file mode 100644
index 00000000000..f7e9b969570
--- /dev/null
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerRetryPolicy.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.rpc.ServiceRetryPolicy;
+import org.apache.druid.utils.Throwables;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+
+/**
+ * Retry policy for {@link IndexerDataServerQueryHandler}.
+ */
+public class IndexerDataServerRetryPolicy implements ServiceRetryPolicy
+{
+ private static final long DEFAULT_MAX_ATTEMPTS = 5;
+ private static final long DEFAULT_MIN_WAIT_MILLIS =
RetryUtils.BASE_SLEEP_MILLIS;
+ private static final long DEFAULT_MAX_WAIT_MILLIS =
RetryUtils.MAX_SLEEP_MILLIS;
+
+ private final long maxAttempts;
+ private final long minWaitMillis;
+ private final long maxWaitMillis;
+
+ public IndexerDataServerRetryPolicy(long maxAttempts, long minWaitMillis,
long maxWaitMillis)
+ {
+ this.maxAttempts = maxAttempts;
+ this.minWaitMillis = minWaitMillis;
+ this.maxWaitMillis = maxWaitMillis;
+ }
+
+ /**
+ * Default policy for production.
+ */
+ public static IndexerDataServerRetryPolicy standard()
+ {
+ return new IndexerDataServerRetryPolicy(DEFAULT_MAX_ATTEMPTS,
DEFAULT_MIN_WAIT_MILLIS, DEFAULT_MAX_WAIT_MILLIS);
+ }
+
+ /**
+ * Policy that does not retry.
+ */
+ public static IndexerDataServerRetryPolicy noRetries()
+ {
+ return new IndexerDataServerRetryPolicy(1, 0, 0);
+ }
+
+ @Override
+ public long maxAttempts()
+ {
+ return maxAttempts;
+ }
+
+ @Override
+ public long minWaitMillis()
+ {
+ return minWaitMillis;
+ }
+
+ @Override
+ public long maxWaitMillis()
+ {
+ return maxWaitMillis;
+ }
+
+ @Override
+ public boolean retryHttpResponse(HttpResponse response)
+ {
+ return false;
+ }
+
+ @Override
+ public boolean retryThrowable(Throwable t)
+ {
+ // Retry on all exceptions, except when the exception chain indicates
explicit interruption.
+ return Throwables.getCauseOfType(t, InterruptedException.class) == null;
+ }
+
+ @Override
+ public boolean retryLoggable()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean retryNotAvailable()
+ {
+ return false;
+ }
+}
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
index d874fa452d7..5c40d3c1966 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
@@ -145,7 +145,8 @@ public class IndexerDataServerQueryHandlerTest
coordinatorClient,
DruidServiceTestUtils.newJsonMapper(),
queryToolChestWarehouse,
- new DataServerRequestDescriptor(DRUID_SERVER_1,
ImmutableList.of(SEGMENT_1, SEGMENT_2))
+ new DataServerRequestDescriptor(DRUID_SERVER_1,
ImmutableList.of(SEGMENT_1, SEGMENT_2)),
+ IndexerDataServerRetryPolicy.noRetries()
)
);
doAnswer(invocationOnMock -> {
@@ -319,7 +320,7 @@ public class IndexerDataServerQueryHandlerTest
)
);
- verify(dataServerClient1, times(5)).run(any(), any(), any(), any());
+ verify(dataServerClient1, times(1)).run(any(), any(), any(), any());
}
@Test
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerRetryPolicyTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerRetryPolicyTest.java
new file mode 100644
index 00000000000..6df8e009d3a
--- /dev/null
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerRetryPolicyTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import org.apache.druid.java.util.common.RetryUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class IndexerDataServerRetryPolicyTest
+{
+ @Test
+ public void testStandard()
+ {
+ final IndexerDataServerRetryPolicy policy =
IndexerDataServerRetryPolicy.standard();
+
+ Assert.assertEquals(5, policy.maxAttempts());
+ Assert.assertEquals(RetryUtils.BASE_SLEEP_MILLIS, policy.minWaitMillis());
+ Assert.assertEquals(RetryUtils.MAX_SLEEP_MILLIS, policy.maxWaitMillis());
+ }
+
+ @Test
+ public void testNoRetries()
+ {
+ final IndexerDataServerRetryPolicy policy =
IndexerDataServerRetryPolicy.noRetries();
+
+ Assert.assertEquals(1, policy.maxAttempts());
+ Assert.assertEquals(0, policy.minWaitMillis());
+ Assert.assertEquals(0, policy.maxWaitMillis());
+ }
+
+ @Test
+ public void testRetryThrowableWithGenericException()
+ {
+ final IndexerDataServerRetryPolicy policy =
IndexerDataServerRetryPolicy.standard();
+
+ Assert.assertTrue(policy.retryThrowable(new IOException("test")));
+ Assert.assertTrue(policy.retryThrowable(new RuntimeException("test")));
+ }
+
+ @Test
+ public void testRetryThrowableWithInterruptedException()
+ {
+ final IndexerDataServerRetryPolicy policy =
IndexerDataServerRetryPolicy.standard();
+
+ // Chains including InterruptedException should not be retried
+ Assert.assertFalse(policy.retryThrowable(new InterruptedException()));
+ Assert.assertFalse(policy.retryThrowable(new RuntimeException(new
InterruptedException())));
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java
b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java
index 6a2021f8043..411548615ad 100644
--- a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java
@@ -41,7 +41,7 @@ import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
-import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.utils.CloseableUtils;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;
@@ -64,13 +64,14 @@ public class DataServerClient
public DataServerClient(
ServiceClientFactory serviceClientFactory,
ServiceLocation serviceLocation,
- ObjectMapper objectMapper
+ ObjectMapper objectMapper,
+ ServiceRetryPolicy retryPolicy
)
{
this.serviceClient = serviceClientFactory.makeClient(
serviceLocation.getHost(),
new FixedServiceLocator(serviceLocation),
- StandardRetryPolicy.noRetries()
+ retryPolicy
);
this.serviceLocation = serviceLocation;
this.objectMapper = objectMapper;
diff --git
a/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java
b/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java
index 95ad46a580c..440c2ee11ad 100644
--- a/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java
+++ b/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java
@@ -39,6 +39,7 @@ import org.apache.druid.rpc.MockServiceClient;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocation;
+import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.server.QueryResource;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@@ -81,7 +82,8 @@ public class DataServerClientTest
target = new DataServerClient(
serviceClientFactory,
mock(ServiceLocation.class),
- jsonMapper
+ jsonMapper,
+ StandardRetryPolicy.noRetries()
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]