This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c5863516479 IndexerDataServerQueryHandler: Use ServiceRetryPolicy. 
(#18928)
c5863516479 is described below

commit c5863516479c27a15294d1908956f0df9bdd47e5
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Jan 19 20:46:04 2026 -0800

    IndexerDataServerQueryHandler: Use ServiceRetryPolicy. (#18928)
    
    * IndexerDataServerQueryHandler: Use ServiceRetryPolicy.
    
    Instead of a RetryUtils loop, use ServiceRetryPolicy. In addition to
    being somewhat cleaner, this allows us to pass in a noRetries policy
    for tests, which speeds up the test. It goes from about 24 seconds to
    near-instant.
    
    * Add tests and adjust retryable exceptions.
---
 .../dart/worker/DartDataServerQueryHandler.java    |   3 +-
 .../indexing/IndexerDataServerQueryHandler.java    |  84 ++++++++++-------
 .../msq/indexing/IndexerDataServerRetryPolicy.java | 105 +++++++++++++++++++++
 .../IndexerDataServerQueryHandlerTest.java         |   5 +-
 .../indexing/IndexerDataServerRetryPolicyTest.java |  68 +++++++++++++
 .../apache/druid/discovery/DataServerClient.java   |   7 +-
 .../druid/discovery/DataServerClientTest.java      |   4 +-
 7 files changed, 237 insertions(+), 39 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
index 244a462bf83..5907846abd9 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
@@ -45,6 +45,7 @@ import org.apache.druid.query.context.DefaultResponseContext;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.ServiceLocation;
+import org.apache.druid.rpc.StandardRetryPolicy;
 
 import java.util.Collections;
 import java.util.List;
@@ -151,6 +152,6 @@ public class DartDataServerQueryHandler implements 
DataServerQueryHandler
 
   private DataServerClient makeDataServerClient(ServiceLocation 
serviceLocation)
   {
-    return new DataServerClient(serviceClientFactory, serviceLocation, 
objectMapper);
+    return new DataServerClient(serviceClientFactory, serviceLocation, 
objectMapper, StandardRetryPolicy.noRetries());
   }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
index 8d4b1c20c30..2bee9ed89ba 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
@@ -30,7 +30,6 @@ import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.discovery.DataServerClient;
 import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.RetryUtils;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.guava.Yielder;
@@ -57,6 +56,7 @@ import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.rpc.RpcException;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.ServiceLocation;
+import org.apache.druid.rpc.ServiceRetryPolicy;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 
 import java.util.ArrayList;
@@ -77,7 +77,6 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
 {
   private static final Logger log = new 
Logger(IndexerDataServerQueryHandler.class);
   private static final int DEFAULT_NUM_TRIES = 3;
-  private static final int PER_SERVER_QUERY_NUM_TRIES = 5;
   private final int inputNumber;
   private final String dataSourceName;
   private final ChannelCounters channelCounters;
@@ -86,6 +85,7 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
   private final ObjectMapper objectMapper;
   private final QueryToolChestWarehouse warehouse;
   private final DataServerRequestDescriptor dataServerRequestDescriptor;
+  private final ServiceRetryPolicy retryPolicy;
 
   public IndexerDataServerQueryHandler(
       int inputNumber,
@@ -97,6 +97,32 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
       QueryToolChestWarehouse warehouse,
       DataServerRequestDescriptor dataServerRequestDescriptor
   )
+  {
+    this(
+        inputNumber,
+        dataSourceName,
+        channelCounters,
+        serviceClientFactory,
+        coordinatorClient,
+        objectMapper,
+        warehouse,
+        dataServerRequestDescriptor,
+        IndexerDataServerRetryPolicy.standard()
+    );
+  }
+
+  @VisibleForTesting
+  IndexerDataServerQueryHandler(
+      int inputNumber,
+      String dataSourceName,
+      ChannelCounters channelCounters,
+      ServiceClientFactory serviceClientFactory,
+      CoordinatorClient coordinatorClient,
+      ObjectMapper objectMapper,
+      QueryToolChestWarehouse warehouse,
+      DataServerRequestDescriptor dataServerRequestDescriptor,
+      ServiceRetryPolicy retryPolicy
+  )
   {
     this.inputNumber = inputNumber;
     this.dataSourceName = dataSourceName;
@@ -106,12 +132,13 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
     this.objectMapper = objectMapper;
     this.warehouse = warehouse;
     this.dataServerRequestDescriptor = dataServerRequestDescriptor;
+    this.retryPolicy = retryPolicy;
   }
 
   @VisibleForTesting
   DataServerClient makeDataServerClient(ServiceLocation serviceLocation)
   {
-    return new DataServerClient(serviceClientFactory, serviceLocation, 
objectMapper);
+    return new DataServerClient(serviceClientFactory, serviceLocation, 
objectMapper, retryPolicy);
   }
 
   /**
@@ -233,33 +260,25 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
                          .collect(Collectors.toList());
 
     try {
-      return RetryUtils.retry(
-          () -> {
-            final ListenableFuture<Sequence<QueryType>> queryFuture = 
dataServerClient.run(
-                Queries.withSpecificSegments(
-                    query,
-                    requestDescriptor.getSegments()
-                                     .stream()
-                                     
.map(RichSegmentDescriptor::toPlainDescriptor)
-                                     .collect(Collectors.toList())
-                ),
-                responseContext,
-                queryResultType,
-                closer
-            );
-
-            return closer.register(
-                DataServerQueryHandlerUtils.createYielder(
-                    queryFuture.get().map(preComputeManipulatorFn),
-                    mappingFunction,
-                    channelCounters
-                )
-            );
-          },
-          throwable -> !(throwable instanceof ExecutionException
-                         && throwable.getCause() instanceof 
QueryInterruptedException
-                         && throwable.getCause().getCause() instanceof 
InterruptedException),
-          PER_SERVER_QUERY_NUM_TRIES
+      final ListenableFuture<Sequence<QueryType>> queryFuture = 
dataServerClient.run(
+          Queries.withSpecificSegments(
+              query,
+              requestDescriptor.getSegments()
+                               .stream()
+                               .map(RichSegmentDescriptor::toPlainDescriptor)
+                               .collect(Collectors.toList())
+          ),
+          responseContext,
+          queryResultType,
+          closer
+      );
+
+      return closer.register(
+          DataServerQueryHandlerUtils.createYielder(
+              queryFuture.get().map(preComputeManipulatorFn),
+              mappingFunction,
+              channelCounters
+          )
       );
     }
     catch (ExecutionException e) {
@@ -274,10 +293,11 @@ public class IndexerDataServerQueryHandler implements 
DataServerQueryHandler
                             .build(e, "Exception while fetching rows for query 
from dataservers[%s]", serviceLocation);
       }
     }
-    catch (Exception e) {
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
       throw DruidException.forPersona(DruidException.Persona.OPERATOR)
                           .ofCategory(DruidException.Category.RUNTIME_FAILURE)
-                          .build(e, "Exception while fetching rows for query 
from dataservers[%s]", serviceLocation);
+                          .build(e, "Interrupted while fetching rows for query 
from dataservers[%s]", serviceLocation);
     }
   }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerRetryPolicy.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerRetryPolicy.java
new file mode 100644
index 00000000000..f7e9b969570
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerRetryPolicy.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.rpc.ServiceRetryPolicy;
+import org.apache.druid.utils.Throwables;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+
+/**
+ * Retry policy for {@link IndexerDataServerQueryHandler}.
+ */
+public class IndexerDataServerRetryPolicy implements ServiceRetryPolicy
+{
+  private static final long DEFAULT_MAX_ATTEMPTS = 5;
+  private static final long DEFAULT_MIN_WAIT_MILLIS = 
RetryUtils.BASE_SLEEP_MILLIS;
+  private static final long DEFAULT_MAX_WAIT_MILLIS = 
RetryUtils.MAX_SLEEP_MILLIS;
+
+  private final long maxAttempts;
+  private final long minWaitMillis;
+  private final long maxWaitMillis;
+
+  public IndexerDataServerRetryPolicy(long maxAttempts, long minWaitMillis, 
long maxWaitMillis)
+  {
+    this.maxAttempts = maxAttempts;
+    this.minWaitMillis = minWaitMillis;
+    this.maxWaitMillis = maxWaitMillis;
+  }
+
+  /**
+   * Default policy for production.
+   */
+  public static IndexerDataServerRetryPolicy standard()
+  {
+    return new IndexerDataServerRetryPolicy(DEFAULT_MAX_ATTEMPTS, 
DEFAULT_MIN_WAIT_MILLIS, DEFAULT_MAX_WAIT_MILLIS);
+  }
+
+  /**
+   * Policy that does not retry.
+   */
+  public static IndexerDataServerRetryPolicy noRetries()
+  {
+    return new IndexerDataServerRetryPolicy(1, 0, 0);
+  }
+
+  @Override
+  public long maxAttempts()
+  {
+    return maxAttempts;
+  }
+
+  @Override
+  public long minWaitMillis()
+  {
+    return minWaitMillis;
+  }
+
+  @Override
+  public long maxWaitMillis()
+  {
+    return maxWaitMillis;
+  }
+
+  @Override
+  public boolean retryHttpResponse(HttpResponse response)
+  {
+    return false;
+  }
+
+  @Override
+  public boolean retryThrowable(Throwable t)
+  {
+    // Retry on all exceptions, except when the exception chain indicates 
explicit interruption.
+    return Throwables.getCauseOfType(t, InterruptedException.class) == null;
+  }
+
+  @Override
+  public boolean retryLoggable()
+  {
+    return true;
+  }
+
+  @Override
+  public boolean retryNotAvailable()
+  {
+    return false;
+  }
+}
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
index d874fa452d7..5c40d3c1966 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
@@ -145,7 +145,8 @@ public class IndexerDataServerQueryHandlerTest
             coordinatorClient,
             DruidServiceTestUtils.newJsonMapper(),
             queryToolChestWarehouse,
-            new DataServerRequestDescriptor(DRUID_SERVER_1, 
ImmutableList.of(SEGMENT_1, SEGMENT_2))
+            new DataServerRequestDescriptor(DRUID_SERVER_1, 
ImmutableList.of(SEGMENT_1, SEGMENT_2)),
+            IndexerDataServerRetryPolicy.noRetries()
         )
     );
     doAnswer(invocationOnMock -> {
@@ -319,7 +320,7 @@ public class IndexerDataServerQueryHandlerTest
         )
     );
 
-    verify(dataServerClient1, times(5)).run(any(), any(), any(), any());
+    verify(dataServerClient1, times(1)).run(any(), any(), any(), any());
   }
 
   @Test
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerRetryPolicyTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerRetryPolicyTest.java
new file mode 100644
index 00000000000..6df8e009d3a
--- /dev/null
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerRetryPolicyTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import org.apache.druid.java.util.common.RetryUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class IndexerDataServerRetryPolicyTest
+{
+  @Test
+  public void testStandard()
+  {
+    final IndexerDataServerRetryPolicy policy = 
IndexerDataServerRetryPolicy.standard();
+
+    Assert.assertEquals(5, policy.maxAttempts());
+    Assert.assertEquals(RetryUtils.BASE_SLEEP_MILLIS, policy.minWaitMillis());
+    Assert.assertEquals(RetryUtils.MAX_SLEEP_MILLIS, policy.maxWaitMillis());
+  }
+
+  @Test
+  public void testNoRetries()
+  {
+    final IndexerDataServerRetryPolicy policy = 
IndexerDataServerRetryPolicy.noRetries();
+
+    Assert.assertEquals(1, policy.maxAttempts());
+    Assert.assertEquals(0, policy.minWaitMillis());
+    Assert.assertEquals(0, policy.maxWaitMillis());
+  }
+
+  @Test
+  public void testRetryThrowableWithGenericException()
+  {
+    final IndexerDataServerRetryPolicy policy = 
IndexerDataServerRetryPolicy.standard();
+
+    Assert.assertTrue(policy.retryThrowable(new IOException("test")));
+    Assert.assertTrue(policy.retryThrowable(new RuntimeException("test")));
+  }
+
+  @Test
+  public void testRetryThrowableWithInterruptedException()
+  {
+    final IndexerDataServerRetryPolicy policy = 
IndexerDataServerRetryPolicy.standard();
+
+    // Chains including InterruptedException should not be retried
+    Assert.assertFalse(policy.retryThrowable(new InterruptedException()));
+    Assert.assertFalse(policy.retryThrowable(new RuntimeException(new 
InterruptedException())));
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java 
b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java
index 6a2021f8043..411548615ad 100644
--- a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java
@@ -41,7 +41,7 @@ import org.apache.druid.rpc.RequestBuilder;
 import org.apache.druid.rpc.ServiceClient;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.ServiceLocation;
-import org.apache.druid.rpc.StandardRetryPolicy;
+import org.apache.druid.rpc.ServiceRetryPolicy;
 import org.apache.druid.utils.CloseableUtils;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.joda.time.Duration;
@@ -64,13 +64,14 @@ public class DataServerClient
   public DataServerClient(
       ServiceClientFactory serviceClientFactory,
       ServiceLocation serviceLocation,
-      ObjectMapper objectMapper
+      ObjectMapper objectMapper,
+      ServiceRetryPolicy retryPolicy
   )
   {
     this.serviceClient = serviceClientFactory.makeClient(
         serviceLocation.getHost(),
         new FixedServiceLocator(serviceLocation),
-        StandardRetryPolicy.noRetries()
+        retryPolicy
     );
     this.serviceLocation = serviceLocation;
     this.objectMapper = objectMapper;
diff --git 
a/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java 
b/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java
index 95ad46a580c..440c2ee11ad 100644
--- a/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java
+++ b/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java
@@ -39,6 +39,7 @@ import org.apache.druid.rpc.MockServiceClient;
 import org.apache.druid.rpc.RequestBuilder;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.rpc.ServiceLocation;
+import org.apache.druid.rpc.StandardRetryPolicy;
 import org.apache.druid.server.QueryResource;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
@@ -81,7 +82,8 @@ public class DataServerClientTest
     target = new DataServerClient(
         serviceClientFactory,
         mock(ServiceLocation.class),
-        jsonMapper
+        jsonMapper,
+        StandardRetryPolicy.noRetries()
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to