This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch 34.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/34.0.0 by this push:
     new cd89f7062f0 Dart: Async queries to realtime servers. (#18241) (#18278)
cd89f7062f0 is described below

commit cd89f7062f0c7a8d2a425edbae75acab4306ac32
Author: Lucas Capistrant <[email protected]>
AuthorDate: Thu Jul 17 20:00:12 2025 -0500

    Dart: Async queries to realtime servers. (#18241) (#18278)
    
    * Dart: Async queries to realtime servers.
    
    Prior to this patch, queries from MSQ workers to data servers would
    be initiated in the processing pool, and would block the processing
    pool until results started coming in. This patch addresses it with
    the strategy:
    
    1) Update DataServerClient to return a future that resolves when the
       response starts being written.
    
    2) Split DataServerQueryHandler into DartDataServerQueryHandler and
       IndexerDataServerQueryHandler. The Dart version doesn't do retries
       and doesn't follow segments to other data servers. It just returns
       the async future from DataServerClient. The Indexer (tasks) version
       retains the prior logic and isn't really async. I didn't attempt to
       asyncify its retry logic in this patch.
    
    3) Add ReturnOrAwait.awaitAllFutures, which allows processors to wait
       for a future to resolve.
    
    4) Update ScanQueryFrameProcessor and GroupByPreShuffleFrameProcessor
       to give up the processing thread when waiting for a data server query
       to come back.
    
    Additionally, to simplify DataServerClient, cancellations are now
    issued without using a scheduled executor. There should be no need for
    this, because the service client is async.
    
    * Fix tests and checkstyle.
    
    * Fix exception checking.
    
    Co-authored-by: Gian Merlino <[email protected]>
---
 .../druid/msq/dart/guice/DartWorkerModule.java     |  20 ++
 .../dart/worker/DartDataServerQueryHandler.java    | 153 +++++++++
 .../worker/DartDataServerQueryHandlerFactory.java  |  65 ++++
 .../dart/worker/DartWorkerContextFactoryImpl.java  |  19 +-
 .../druid/msq/exec/DataServerQueryHandler.java     | 354 ++-------------------
 .../msq/exec/DataServerQueryHandlerFactory.java    |  68 +---
 .../msq/exec/DataServerQueryHandlerUtils.java      |  94 ++++++
 .../java/org/apache/druid/msq/exec/WorkerImpl.java |   1 -
 .../IndexerDataServerQueryHandler.java}            | 172 +++++-----
 .../IndexerDataServerQueryHandlerFactory.java}     |  47 +--
 .../druid/msq/indexing/IndexerFrameContext.java    |   4 +-
 .../druid/msq/indexing/IndexerWorkerContext.java   |   6 +-
 .../msq/input/table/RichSegmentDescriptor.java     |   8 +
 .../groupby/GroupByPreShuffleFrameProcessor.java   |  25 +-
 .../msq/querykit/scan/ScanQueryFrameProcessor.java |  26 +-
 .../druid/msq/exec/MSQLoadedSegmentTests.java      |  93 +++---
 .../IndexerDataServerQueryHandlerTest.java}        |  87 ++---
 .../frame/processor/FrameProcessorExecutor.java    |  30 +-
 .../druid/frame/processor/ReturnOrAwait.java       | 113 ++++---
 .../processor/FrameProcessorExecutorTest.java      |  65 ++++
 .../druid/frame/processor/ReturnOrAwaitTest.java   |  19 +-
 .../processor/test/FutureWaitingProcessor.java     | 104 ++++++
 .../apache/druid/discovery/DataServerClient.java   | 132 ++++----
 .../java/org/apache/druid/rpc/ServiceLocation.java |  12 +
 .../druid/discovery/DataServerClientTest.java      |  11 +-
 .../testing/embedded/EmbeddedDruidCluster.java     |   1 +
 26 files changed, 983 insertions(+), 746 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
index a1f4e06e948..5e0a3898e7f 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.msq.dart.guice;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -35,8 +36,10 @@ import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.ManageLifecycleAnnouncements;
+import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.annotations.LoadScope;
 import org.apache.druid.guice.annotations.Self;
+import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
@@ -50,6 +53,7 @@ import 
org.apache.druid.msq.dart.controller.http.DartQueryInfo;
 import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
 import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
 import org.apache.druid.msq.dart.worker.DartDataSegmentProvider;
+import org.apache.druid.msq.dart.worker.DartDataServerQueryHandlerFactory;
 import org.apache.druid.msq.dart.worker.DartWorkerContextFactory;
 import org.apache.druid.msq.dart.worker.DartWorkerContextFactoryImpl;
 import org.apache.druid.msq.dart.worker.DartWorkerRunner;
@@ -58,6 +62,8 @@ import org.apache.druid.msq.exec.MemoryIntrospector;
 import org.apache.druid.msq.querykit.DataSegmentProvider;
 import org.apache.druid.msq.rpc.ResourcePermissionMapper;
 import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.QueryToolChestWarehouse;
+import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.security.AuthorizerMapper;
 
@@ -156,6 +162,20 @@ public class DartWorkerModule implements DruidModule
     {
       return new OutboxImpl<>();
     }
+
+    @Provides
+    public DartDataServerQueryHandlerFactory 
createDataServerQueryHandlerFactory(
+        @EscalatedGlobal ServiceClientFactory serviceClientFactory,
+        @Smile ObjectMapper smileMapper,
+        QueryToolChestWarehouse queryToolChestWarehouse
+    )
+    {
+      return new DartDataServerQueryHandlerFactory(
+          serviceClientFactory,
+          smileMapper,
+          queryToolChestWarehouse
+      );
+    }
   }
 
   @Override
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
new file mode 100644
index 00000000000..4a6b67da669
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandler.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.worker;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.discovery.DataServerClient;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.exec.DataServerQueryHandler;
+import org.apache.druid.msq.exec.DataServerQueryHandlerUtils;
+import org.apache.druid.msq.exec.DataServerQueryResult;
+import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
+import org.apache.druid.msq.input.table.RichSegmentDescriptor;
+import org.apache.druid.query.Queries;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.QueryToolChestWarehouse;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.aggregation.MetricManipulatorFns;
+import org.apache.druid.query.context.DefaultResponseContext;
+import org.apache.druid.query.context.ResponseContext;
+import org.apache.druid.rpc.ServiceClientFactory;
+import org.apache.druid.rpc.ServiceLocation;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Dart implementation of {@link DataServerQueryHandler}. Issues queries 
asynchronously, with no retries.
+ */
+public class DartDataServerQueryHandler implements DataServerQueryHandler
+{
+  private final String dataSource;
+  private final ChannelCounters channelCounters;
+  private final ServiceClientFactory serviceClientFactory;
+  private final ObjectMapper objectMapper;
+  private final QueryToolChestWarehouse warehouse;
+  private final DataServerRequestDescriptor requestDescriptor;
+
+  public DartDataServerQueryHandler(
+      String dataSource,
+      ChannelCounters channelCounters,
+      ServiceClientFactory serviceClientFactory,
+      ObjectMapper objectMapper,
+      QueryToolChestWarehouse warehouse,
+      DataServerRequestDescriptor requestDescriptor
+  )
+  {
+    this.dataSource = dataSource;
+    this.channelCounters = channelCounters;
+    this.serviceClientFactory = serviceClientFactory;
+    this.objectMapper = objectMapper;
+    this.warehouse = warehouse;
+    this.requestDescriptor = requestDescriptor;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * This method returns immediately. The returned future resolves when the 
server has started sending back
+   * its response.
+   *
+   * Queries are issued once, without retries.
+   */
+  @Override
+  public <RowType, QueryType> ListenableFuture<DataServerQueryResult<RowType>> 
fetchRowsFromDataServer(
+      Query<QueryType> query,
+      Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
+      Closer closer
+  )
+  {
+    final Query<QueryType> preparedQuery =
+        Queries.withSpecificSegments(
+            DataServerQueryHandlerUtils.prepareQuery(query, dataSource),
+            requestDescriptor.getSegments()
+                             .stream()
+                             .map(RichSegmentDescriptor::toPlainDescriptor)
+                             .collect(Collectors.toList())
+        );
+
+    final ServiceLocation serviceLocation =
+        
ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata());
+    final DataServerClient dataServerClient = 
makeDataServerClient(serviceLocation);
+    final QueryToolChest<QueryType, Query<QueryType>> toolChest = 
warehouse.getToolChest(query);
+    final Function<QueryType, QueryType> preComputeManipulatorFn =
+        toolChest.makePreComputeManipulatorFn(query, 
MetricManipulatorFns.deserializing());
+    final JavaType queryResultType = toolChest.getBaseResultType();
+    final ResponseContext responseContext = new DefaultResponseContext();
+
+    return FutureUtils.transform(
+        dataServerClient.run(preparedQuery, responseContext, queryResultType, 
closer),
+        resultSequence -> {
+          final Yielder<RowType> yielder = 
DataServerQueryHandlerUtils.createYielder(
+              resultSequence.map(preComputeManipulatorFn),
+              mappingFunction,
+              channelCounters
+          );
+
+          final List<SegmentDescriptor> missingSegments =
+              DataServerQueryHandlerUtils.getMissingSegments(responseContext);
+
+          if (!missingSegments.isEmpty()) {
+            throw DruidException
+                .forPersona(DruidException.Persona.USER)
+                .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                .build(
+                    "Segment[%s]%s not found on server[%s]. Please retry your 
query.",
+                    missingSegments.get(0),
+                    missingSegments.size() > 1 ? StringUtils.format(" and[%d] 
others", missingSegments.size() - 1) : "",
+                    serviceLocation.getHostAndPort()
+                );
+          }
+
+          return new DataServerQueryResult<>(
+              Collections.singletonList(yielder),
+              Collections.emptyList(),
+              dataSource
+          );
+        }
+    );
+  }
+
+  private DataServerClient makeDataServerClient(ServiceLocation 
serviceLocation)
+  {
+    return new DataServerClient(serviceClientFactory, serviceLocation, 
objectMapper);
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java
new file mode 100644
index 00000000000..a7fc2004f7b
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartDataServerQueryHandlerFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.worker;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
+import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
+import org.apache.druid.query.QueryToolChestWarehouse;
+import org.apache.druid.rpc.ServiceClientFactory;
+
+/**
+ * Factory for {@link DartDataServerQueryHandler}.
+ */
+public class DartDataServerQueryHandlerFactory implements 
DataServerQueryHandlerFactory
+{
+  private final ServiceClientFactory serviceClientFactory;
+  private final ObjectMapper objectMapper;
+  private final QueryToolChestWarehouse warehouse;
+
+  public DartDataServerQueryHandlerFactory(
+      ServiceClientFactory serviceClientFactory,
+      ObjectMapper objectMapper,
+      QueryToolChestWarehouse warehouse
+  )
+  {
+    this.serviceClientFactory = serviceClientFactory;
+    this.objectMapper = objectMapper;
+    this.warehouse = warehouse;
+  }
+
+  @Override
+  public DartDataServerQueryHandler createDataServerQueryHandler(
+      String dataSource,
+      ChannelCounters channelCounters,
+      DataServerRequestDescriptor requestDescriptor
+  )
+  {
+    return new DartDataServerQueryHandler(
+        dataSource,
+        channelCounters,
+        serviceClientFactory,
+        objectMapper,
+        warehouse,
+        requestDescriptor
+    );
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
index b1e2fb92594..03d17b1bbd3 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
@@ -22,7 +22,6 @@ package org.apache.druid.msq.dart.worker;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
-import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.guice.annotations.Self;
@@ -31,14 +30,12 @@ import 
org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.messages.server.Outbox;
 import org.apache.druid.msq.dart.Dart;
 import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
-import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
 import org.apache.druid.msq.exec.MemoryIntrospector;
 import org.apache.druid.msq.exec.ProcessingBuffersProvider;
 import org.apache.druid.msq.exec.WorkerContext;
 import org.apache.druid.msq.querykit.DataSegmentProvider;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.QueryContext;
-import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.query.groupby.GroupingEngine;
 import org.apache.druid.query.policy.PolicyEnforcer;
 import org.apache.druid.rpc.ServiceClientFactory;
@@ -65,8 +62,7 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
   private final MemoryIntrospector memoryIntrospector;
   private final ProcessingBuffersProvider processingBuffersProvider;
   private final Outbox<ControllerMessage> outbox;
-  private final CoordinatorClient coordinatorClient;
-  private final QueryToolChestWarehouse warehouse;
+  private final DartDataServerQueryHandlerFactory 
dataServerQueryHandlerFactory;
   private final ServiceEmitter emitter;
 
   @Inject
@@ -84,8 +80,7 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
       MemoryIntrospector memoryIntrospector,
       @Dart ProcessingBuffersProvider processingBuffersProvider,
       Outbox<ControllerMessage> outbox,
-      CoordinatorClient coordinatorClient,
-      QueryToolChestWarehouse warehouse,
+      DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory,
       ServiceEmitter emitter
   )
   {
@@ -102,8 +97,7 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
     this.memoryIntrospector = memoryIntrospector;
     this.processingBuffersProvider = processingBuffersProvider;
     this.outbox = outbox;
-    this.coordinatorClient = coordinatorClient;
-    this.warehouse = warehouse;
+    this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
     this.emitter = emitter;
   }
 
@@ -132,12 +126,7 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
         outbox,
         tempDir,
         queryContext,
-        new DataServerQueryHandlerFactory(
-            coordinatorClient,
-            serviceClientFactory,
-            jsonMapper,
-            warehouse
-        ),
+        dataServerQueryHandlerFactory,
         emitter
     );
   }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
index 04458d70170..f9c8c14ad14 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
@@ -19,359 +19,43 @@
 
 package org.apache.druid.msq.exec;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.client.ImmutableSegmentLoadInfo;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.common.guava.FutureUtils;
-import org.apache.druid.discovery.DataServerClient;
-import org.apache.druid.error.DruidException;
-import org.apache.druid.java.util.common.RetryUtils;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.java.util.common.guava.Sequence;
-import org.apache.druid.java.util.common.guava.Sequences;
-import org.apache.druid.java.util.common.guava.Yielder;
-import org.apache.druid.java.util.common.guava.Yielders;
 import org.apache.druid.java.util.common.io.Closer;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.msq.counters.ChannelCounters;
 import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
-import org.apache.druid.msq.input.table.DataServerSelector;
-import org.apache.druid.msq.input.table.RichSegmentDescriptor;
-import org.apache.druid.query.Queries;
 import org.apache.druid.query.Query;
-import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.QueryToolChest;
-import org.apache.druid.query.QueryToolChestWarehouse;
-import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.aggregation.MetricManipulationFn;
-import org.apache.druid.query.aggregation.MetricManipulatorFns;
-import org.apache.druid.query.context.DefaultResponseContext;
-import org.apache.druid.query.context.ResponseContext;
-import org.apache.druid.rpc.RpcException;
-import org.apache.druid.rpc.ServiceClientFactory;
-import org.apache.druid.rpc.ServiceLocation;
-import org.apache.druid.server.coordination.DruidServerMetadata;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
- * Class responsible for querying dataservers and retriving results for a 
given query. Also queries the coordinator
- * to check if a segment has been handed off.
+ * Object for issuing native queries to data servers. This is created by a 
{@link DataServerQueryHandlerFactory},
+ * and is used when MSQ is querying realtime data. It is required because 
realtime tasks are not currently able to
+ * execute MSQ logic themselves.
  */
-public class DataServerQueryHandler
+public interface DataServerQueryHandler
 {
-  private static final Logger log = new Logger(DataServerQueryHandler.class);
-  private static final int DEFAULT_NUM_TRIES = 3;
-  private static final int PER_SERVER_QUERY_NUM_TRIES = 5;
-  private final String dataSource;
-  private final ChannelCounters channelCounters;
-  private final ServiceClientFactory serviceClientFactory;
-  private final CoordinatorClient coordinatorClient;
-  private final ObjectMapper objectMapper;
-  private final QueryToolChestWarehouse warehouse;
-  private final ScheduledExecutorService queryCancellationExecutor;
-  private final DataServerRequestDescriptor dataServerRequestDescriptor;
-
-  public DataServerQueryHandler(
-      String dataSource,
-      ChannelCounters channelCounters,
-      ServiceClientFactory serviceClientFactory,
-      CoordinatorClient coordinatorClient,
-      ObjectMapper objectMapper,
-      QueryToolChestWarehouse warehouse,
-      ScheduledExecutorService queryCancellationExecutor,
-      DataServerRequestDescriptor dataServerRequestDescriptor
-  )
-  {
-    this.dataSource = dataSource;
-    this.channelCounters = channelCounters;
-    this.serviceClientFactory = serviceClientFactory;
-    this.coordinatorClient = coordinatorClient;
-    this.objectMapper = objectMapper;
-    this.warehouse = warehouse;
-    this.queryCancellationExecutor = queryCancellationExecutor;
-    this.dataServerRequestDescriptor = dataServerRequestDescriptor;
-  }
-
-  @VisibleForTesting
-  DataServerClient makeDataServerClient(ServiceLocation serviceLocation)
-  {
-    return new DataServerClient(serviceClientFactory, serviceLocation, 
objectMapper, queryCancellationExecutor);
-  }
-
   /**
-   * Performs some necessary transforms to the query, so that the dataserver 
is able to understand it first.
-   * - Changing the datasource to a {@link TableDataSource}
-   * - Limiting the query to the required segments with {@link 
Queries#withSpecificSegments(Query, List)}
-   * <br>
-   * Then queries a data server and returns a {@link Yielder} for the results, 
retrying if needed. If a dataserver
-   * indicates that some segments were not found, checks with the coordinator 
to see if the segment was handed off.
-   * - If all the segments were handed off, returns a {@link 
DataServerQueryResult} with the yielder and list of handed
-   * off segments.
-   * - If some segments were not handed off, checks with the coordinator fetch 
an updated list of servers. This step is
-   * repeated up to {@link #DEFAULT_NUM_TRIES} times.
-   * - If the servers could not be found, checks if the segment was 
handed-off. If it was, returns a
-   * {@link DataServerQueryResult} with the yielder and list of handed off 
segments. Otherwise, throws an exception.
-   * <br>
+   * Issues a query to the server and segments that were specified by the 
{@link DataServerRequestDescriptor}
+   * originally passed to {@link 
DataServerQueryHandlerFactory#createDataServerQueryHandler} when this instance
+   * was created.
+   *
+   * The query datasource is updated to refer to the specific segments from
+   * {@link DataServerRequestDescriptor#getSegments()}.
+   *
    * Also applies {@link QueryToolChest#makePreComputeManipulatorFn(Query, 
MetricManipulationFn)} and reports channel
    * metrics on the returned results.
    *
-   * @param <QueryType> result return type for the query from the data server
-   * @param <RowType> type of the result rows after parsing from QueryType 
object
+   * @param query           query to run
+   * @param mappingFunction function to apply to results
+   * @param closer          will register query canceler with this closer
+   * @param <QueryType>     result return type for the query from the data 
server
+   * @param <RowType>       type of the result rows after parsing from 
QueryType object
    */
-  public <RowType, QueryType> DataServerQueryResult<RowType> 
fetchRowsFromDataServer(
+  <RowType, QueryType> ListenableFuture<DataServerQueryResult<RowType>> 
fetchRowsFromDataServer(
       Query<QueryType> query,
       Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
       Closer closer
-  )
-  {
-    // MSQ changes the datasource to a number datasource. This needs to be 
changed back for data servers to understand.
-    final Query<QueryType> preparedQuery = query.withDataSource(new 
TableDataSource(dataSource));
-    final List<Yielder<RowType>> yielders = new ArrayList<>();
-    final List<RichSegmentDescriptor> handedOffSegments = new ArrayList<>();
-
-    List<DataServerRequestDescriptor> pendingRequests = 
ImmutableList.of(dataServerRequestDescriptor);
-
-    final int maxRetries = 
preparedQuery.context().getNumRetriesOnMissingSegments(DEFAULT_NUM_TRIES);
-    int retryCount = 0;
-
-    while (!pendingRequests.isEmpty()) {
-      final ResponseContext responseContext = new DefaultResponseContext();
-      final Set<RichSegmentDescriptor> processedSegments = new HashSet<>();
-      for (DataServerRequestDescriptor descriptor : pendingRequests) {
-        log.info("Querying server [%s] for segments[%s]", 
descriptor.getServerMetadata(), descriptor.getSegments());
-        processedSegments.addAll(descriptor.getSegments());
-        Yielder<RowType> yielder = fetchRowsFromDataServerInternal(descriptor, 
responseContext, closer, preparedQuery, mappingFunction);
-
-        // Add results
-        if (yielder != null && !yielder.isDone()) {
-          yielders.add(yielder);
-        }
-      }
-
-      // Check for missing segments
-      List<SegmentDescriptor> missingSegments = 
getMissingSegments(responseContext);
-      if (missingSegments.isEmpty()) {
-        // No segments remaining.
-        break;
-      }
-
-      final List<SegmentDescriptor> handedOffSegmentDescriptors = 
checkSegmentHandoff(missingSegments);
-
-      Set<RichSegmentDescriptor> missingRichSegmentDescriptors = new 
HashSet<>();
-      for (RichSegmentDescriptor richSegmentDescriptor : processedSegments) {
-        SegmentDescriptor segmentDescriptor = 
toSegmentDescriptorWithFullInterval(richSegmentDescriptor);
-        if (missingSegments.contains(segmentDescriptor)) {
-          if (handedOffSegmentDescriptors.contains(segmentDescriptor)) {
-            handedOffSegments.add(richSegmentDescriptor);
-          } else {
-            missingRichSegmentDescriptors.add(richSegmentDescriptor);
-          }
-        }
-      }
-
-      pendingRequests = createNextPendingRequests(
-          missingRichSegmentDescriptors
-      );
-
-      if (!pendingRequests.isEmpty()) {
-        retryCount++;
-        if (retryCount > maxRetries) {
-          throw DruidException.forPersona(DruidException.Persona.OPERATOR)
-                              
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
-                              .build("Unable to fetch results from dataservers 
in [%d] retries.", retryCount);
-        }
-      }
-    }
-
-    return new DataServerQueryResult<>(yielders, handedOffSegments, 
dataSource);
-  }
-
-  private <QueryType, RowType> Yielder<RowType> 
fetchRowsFromDataServerInternal(
-      final DataServerRequestDescriptor requestDescriptor,
-      final ResponseContext responseContext,
-      final Closer closer,
-      final Query<QueryType> query,
-      final Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction
-  )
-  {
-    final ServiceLocation serviceLocation = 
ServiceLocation.fromDruidServerMetadata(requestDescriptor.getServerMetadata());
-    final DataServerClient dataServerClient = 
makeDataServerClient(serviceLocation);
-    final QueryToolChest<QueryType, Query<QueryType>> toolChest = 
warehouse.getToolChest(query);
-    final Function<QueryType, QueryType> preComputeManipulatorFn =
-        toolChest.makePreComputeManipulatorFn(query, 
MetricManipulatorFns.deserializing());
-    final JavaType queryResultType = toolChest.getBaseResultType();
-    final List<SegmentDescriptor> segmentDescriptors = 
requestDescriptor.getSegments()
-                                                                        
.stream()
-                                                                        
.map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval)
-                                                                        
.collect(Collectors.toList());
-
-    try {
-      return RetryUtils.retry(
-          () -> closer.register(createYielder(
-              dataServerClient.run(
-                  Queries.withSpecificSegments(
-                      query,
-                      requestDescriptor.getSegments()
-                                       .stream()
-                                       
.map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval)
-                                       .collect(Collectors.toList())
-                  ), responseContext, queryResultType, 
closer).map(preComputeManipulatorFn), mappingFunction)),
-          throwable -> !(throwable instanceof QueryInterruptedException
-                         && throwable.getCause() instanceof 
InterruptedException),
-          PER_SERVER_QUERY_NUM_TRIES
-      );
-    }
-    catch (QueryInterruptedException e) {
-      if (e.getCause() instanceof RpcException) {
-        // In the case that all the realtime servers for a segment are gone 
(for example, if they were scaled down),
-        // we would also be unable to fetch the segment.
-        responseContext.addMissingSegments(segmentDescriptors);
-        return Yielders.each(Sequences.empty());
-      } else {
-        throw DruidException.forPersona(DruidException.Persona.OPERATOR)
-                            
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
-                            .build(e, "Exception while fetching rows for query 
from dataservers[%s]", serviceLocation);
-      }
-    }
-    catch (Exception e) {
-      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
-                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
-                          .build(e, "Exception while fetching rows for query 
from dataservers[%s]", serviceLocation);
-    }
-  }
-
-  private <RowType, QueryType> Yielder<RowType> createYielder(
-      final Sequence<QueryType> sequence,
-      final Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction
-  )
-  {
-    return Yielders.each(
-        mappingFunction.apply(sequence)
-                       .map(row -> {
-                         channelCounters.incrementRowCount();
-                         return row;
-                       })
-    );
-  }
-
-  private List<DataServerRequestDescriptor> createNextPendingRequests(
-      final Set<RichSegmentDescriptor> richSegmentDescriptors
-  )
-  {
-    final Map<DruidServerMetadata, Set<RichSegmentDescriptor>> 
serverVsSegmentsMap = new HashMap<>();
-
-    Iterable<ImmutableSegmentLoadInfo> immutableSegmentLoadInfos =
-        coordinatorClient.fetchServerViewSegments(
-            dataSource,
-            
richSegmentDescriptors.stream().map(RichSegmentDescriptor::getFullInterval).collect(Collectors.toList())
-        );
-
-    Map<SegmentDescriptor, ImmutableSegmentLoadInfo> segmentVsServerMap = new 
HashMap<>();
-    immutableSegmentLoadInfos.forEach(immutableSegmentLoadInfo -> {
-      
segmentVsServerMap.put(immutableSegmentLoadInfo.getSegment().toDescriptor(), 
immutableSegmentLoadInfo);
-    });
-
-    for (RichSegmentDescriptor richSegmentDescriptor : richSegmentDescriptors) 
{
-      SegmentDescriptor segmentDescriptorWithFullInterval = 
toSegmentDescriptorWithFullInterval(richSegmentDescriptor);
-      if (!segmentVsServerMap.containsKey(segmentDescriptorWithFullInterval)) {
-        throw DruidException.forPersona(DruidException.Persona.OPERATOR)
-                            
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
-                            .build("Could not find a server for segment[%s]", 
richSegmentDescriptor);
-      }
-
-      ImmutableSegmentLoadInfo segmentLoadInfo = 
segmentVsServerMap.get(segmentDescriptorWithFullInterval);
-      if 
(segmentLoadInfo.getSegment().toDescriptor().equals(segmentDescriptorWithFullInterval))
 {
-        Set<DruidServerMetadata> servers = segmentLoadInfo.getServers()
-                                                          .stream()
-                                                          
.filter(druidServerMetadata -> SegmentSource.REALTIME.getUsedServerTypes()
-                                                                               
                                .contains(druidServerMetadata.getType()))
-                                                          
.collect(Collectors.toSet());
-        if (servers.isEmpty()) {
-          throw DruidException.forPersona(DruidException.Persona.OPERATOR)
-                              
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
-                              .build("Could not find a server matching 
includeSegmentSource[%s] for segment[%s]. Only found servers [%s]",
-                                     SegmentSource.REALTIME, 
richSegmentDescriptor, servers);
-        }
-
-        DruidServerMetadata druidServerMetadata = 
DataServerSelector.RANDOM.getSelectServerFunction().apply(servers);
-        serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> 
new HashSet<>());
-        SegmentDescriptor descriptor = 
segmentLoadInfo.getSegment().toDescriptor();
-        serverVsSegmentsMap.get(druidServerMetadata)
-                           .add(new 
RichSegmentDescriptor(richSegmentDescriptor.getFullInterval(), 
richSegmentDescriptor.getInterval(), descriptor.getVersion(), 
descriptor.getPartitionNumber()));
-      }
-    }
-
-    final List<DataServerRequestDescriptor> requestDescriptors = new 
ArrayList<>();
-    for (Map.Entry<DruidServerMetadata, Set<RichSegmentDescriptor>> 
druidServerMetadataSetEntry : serverVsSegmentsMap.entrySet()) {
-      DataServerRequestDescriptor dataServerRequest = new 
DataServerRequestDescriptor(
-          druidServerMetadataSetEntry.getKey(),
-          ImmutableList.copyOf(druidServerMetadataSetEntry.getValue())
-      );
-      requestDescriptors.add(dataServerRequest);
-    }
-
-    return requestDescriptors;
-  }
-
-  /**
-   * Retreives the list of missing segments from the response context.
-   */
-  private static List<SegmentDescriptor> getMissingSegments(final 
ResponseContext responseContext)
-  {
-    List<SegmentDescriptor> missingSegments = 
responseContext.getMissingSegments();
-    if (missingSegments == null) {
-      return ImmutableList.of();
-    }
-    return missingSegments;
-  }
-
-  /**
-   * Queries the coordinator to check if a list of segments has been handed 
off.
-   * Returns a list of segments which have been handed off.
-   * <br>
-   * See {@link  
org.apache.druid.server.http.DataSourcesResource#isHandOffComplete(String, 
String, int, String)}
-   */
-  private List<SegmentDescriptor> checkSegmentHandoff(List<SegmentDescriptor> 
segmentDescriptors)
-  {
-    try {
-      List<SegmentDescriptor> handedOffSegments = new ArrayList<>();
-
-      for (SegmentDescriptor segmentDescriptor : segmentDescriptors) {
-        Boolean wasHandedOff = FutureUtils.get(
-            coordinatorClient.isHandoffComplete(dataSource, segmentDescriptor),
-            true
-        );
-        if (Boolean.TRUE.equals(wasHandedOff)) {
-          handedOffSegments.add(segmentDescriptor);
-        }
-      }
-      return handedOffSegments;
-    }
-    catch (Exception e) {
-      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
-                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
-                          .build(e, "Could not contact coordinator");
-    }
-  }
-
-  static SegmentDescriptor 
toSegmentDescriptorWithFullInterval(RichSegmentDescriptor richSegmentDescriptor)
-  {
-    return new SegmentDescriptor(
-        richSegmentDescriptor.getFullInterval(),
-        richSegmentDescriptor.getVersion(),
-        richSegmentDescriptor.getPartitionNumber()
-    );
-  }
+  );
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java
index 1caed919ef0..245c078a1c8 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java
@@ -19,79 +19,17 @@
 
 package org.apache.druid.msq.exec;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.java.util.common.RE;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.counters.ChannelCounters;
 import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
-import org.apache.druid.query.QueryToolChestWarehouse;
-import org.apache.druid.rpc.ServiceClientFactory;
-
-import java.io.Closeable;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Creates new instances of {@link DataServerQueryHandler} and manages the 
cancellation threadpool.
  */
-public class DataServerQueryHandlerFactory implements Closeable
+public interface DataServerQueryHandlerFactory
 {
-  private static final Logger log = new 
Logger(DataServerQueryHandlerFactory.class);
-  private static final int DEFAULT_THREAD_COUNT = 4;
-  private final CoordinatorClient coordinatorClient;
-  private final ServiceClientFactory serviceClientFactory;
-  private final ObjectMapper objectMapper;
-  private final QueryToolChestWarehouse warehouse;
-  private final ScheduledExecutorService queryCancellationExecutor;
-
-  public DataServerQueryHandlerFactory(
-      CoordinatorClient coordinatorClient,
-      ServiceClientFactory serviceClientFactory,
-      ObjectMapper objectMapper,
-      QueryToolChestWarehouse warehouse
-  )
-  {
-    this.coordinatorClient = coordinatorClient;
-    this.serviceClientFactory = serviceClientFactory;
-    this.objectMapper = objectMapper;
-    this.warehouse = warehouse;
-    this.queryCancellationExecutor = 
ScheduledExecutors.fixed(DEFAULT_THREAD_COUNT, "query-cancellation-executor");
-  }
-
-  public DataServerQueryHandler createDataServerQueryHandler(
+  DataServerQueryHandler createDataServerQueryHandler(
       String dataSource,
       ChannelCounters channelCounters,
       DataServerRequestDescriptor dataServerRequestDescriptor
-  )
-  {
-    return new DataServerQueryHandler(
-        dataSource,
-        channelCounters,
-        serviceClientFactory,
-        coordinatorClient,
-        objectMapper,
-        warehouse,
-        queryCancellationExecutor,
-        dataServerRequestDescriptor
-    );
-  }
-
-  @Override
-  public void close()
-  {
-    // Wait for all query cancellations to be complete.
-    log.info("Waiting for any data server queries to be canceled.");
-    queryCancellationExecutor.shutdown();
-    try {
-      if (!queryCancellationExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
-        log.error("Unable to cancel all ongoing queries.");
-      }
-    }
-    catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RE(e);
-    }
-  }
+  );
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerUtils.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerUtils.java
new file mode 100644
index 00000000000..ce63f8100e1
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerUtils.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.discovery.DataServerClient;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.query.Queries;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.SegmentDescriptor;
+import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.context.ResponseContext;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Static utility functions for {@link DataServerQueryHandler} implementations.
+ */
+public class DataServerQueryHandlerUtils
+{
+  private DataServerQueryHandlerUtils()
+  {
+    // No instantiation.
+  }
+
+  /**
+   * Performs necessary transforms to a query destined for data servers. Does 
not update the list of segments; callers
+   * should do this themselves using {@link 
Queries#withSpecificSegments(Query, List)}.
+   *
+   * @param query      the query
+   * @param dataSource datasource name
+   */
+  public static <R, T extends Query<R>> Query<R> prepareQuery(final T query, 
final String dataSource)
+  {
+    // MSQ changes the datasource to an inputNumber datasource. This needs to 
be changed back for data servers
+    // to understand.
+
+    // BUG: This transformation is incorrect; see 
https://github.com/apache/druid/issues/18198. It loses decorations
+    // such as join, unnest, etc.
+    return query.withDataSource(new TableDataSource(dataSource));
+  }
+
+  /**
+   * Given results from {@link DataServerClient#run}, returns a {@link 
Yielder} that applies the provided
+   * mapping function and increments the row count on the provided {@link 
ChannelCounters}.
+   */
+  public static <RowType, QueryType> Yielder<RowType> createYielder(
+      final Sequence<QueryType> sequence,
+      final Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
+      final ChannelCounters channelCounters
+  )
+  {
+    return Yielders.each(
+        mappingFunction.apply(sequence)
+                       .map(row -> {
+                         channelCounters.incrementRowCount();
+                         return row;
+                       })
+    );
+  }
+
+  /**
+   * Retreives the list of missing segments from the response context.
+   */
+  public static List<SegmentDescriptor> getMissingSegments(final 
ResponseContext responseContext)
+  {
+    List<SegmentDescriptor> missingSegments = 
responseContext.getMissingSegments();
+    if (missingSegments == null) {
+      return Collections.emptyList();
+    }
+    return missingSegments;
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index e7131411556..66d7fb1cc23 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -253,7 +253,6 @@ public class WorkerImpl implements Worker
       throws Exception
   {
     context.registerWorker(this, workerCloser);
-    workerCloser.register(context.dataServerQueryHandlerFactory());
     this.workerClient = workerCloser.register(new 
ExceptionWrappingWorkerClient(context.makeWorkerClient()));
     final FrameProcessorExecutor workerExec = new 
FrameProcessorExecutor(makeProcessingPool());
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
similarity index 72%
copy from 
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
copy to 
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
index 04458d70170..b8b2afd5137 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandler.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandler.java
@@ -17,12 +17,14 @@
  * under the License.
  */
 
-package org.apache.druid.msq.exec;
+package org.apache.druid.msq.indexing;
 
 import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.common.guava.FutureUtils;
@@ -36,6 +38,10 @@ import org.apache.druid.java.util.common.guava.Yielders;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.exec.DataServerQueryHandler;
+import org.apache.druid.msq.exec.DataServerQueryHandlerUtils;
+import org.apache.druid.msq.exec.DataServerQueryResult;
+import org.apache.druid.msq.exec.SegmentSource;
 import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
 import org.apache.druid.msq.input.table.DataServerSelector;
 import org.apache.druid.msq.input.table.RichSegmentDescriptor;
@@ -45,8 +51,6 @@ import org.apache.druid.query.QueryInterruptedException;
 import org.apache.druid.query.QueryToolChest;
 import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.query.SegmentDescriptor;
-import org.apache.druid.query.TableDataSource;
-import org.apache.druid.query.aggregation.MetricManipulationFn;
 import org.apache.druid.query.aggregation.MetricManipulatorFns;
 import org.apache.druid.query.context.DefaultResponseContext;
 import org.apache.druid.query.context.ResponseContext;
@@ -61,17 +65,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
- * Class responsible for querying dataservers and retriving results for a 
given query. Also queries the coordinator
- * to check if a segment has been handed off.
+ * Task implementation of {@link DataServerQueryHandler}. Implements retry 
logic as described in
+ * {@link #fetchRowsFromDataServer(Query, Function, Closer)}.
  */
-public class DataServerQueryHandler
+public class IndexerDataServerQueryHandler implements DataServerQueryHandler
 {
-  private static final Logger log = new Logger(DataServerQueryHandler.class);
+  private static final Logger log = new 
Logger(IndexerDataServerQueryHandler.class);
   private static final int DEFAULT_NUM_TRIES = 3;
   private static final int PER_SERVER_QUERY_NUM_TRIES = 5;
   private final String dataSource;
@@ -80,17 +84,15 @@ public class DataServerQueryHandler
   private final CoordinatorClient coordinatorClient;
   private final ObjectMapper objectMapper;
   private final QueryToolChestWarehouse warehouse;
-  private final ScheduledExecutorService queryCancellationExecutor;
   private final DataServerRequestDescriptor dataServerRequestDescriptor;
 
-  public DataServerQueryHandler(
+  public IndexerDataServerQueryHandler(
       String dataSource,
       ChannelCounters channelCounters,
       ServiceClientFactory serviceClientFactory,
       CoordinatorClient coordinatorClient,
       ObjectMapper objectMapper,
       QueryToolChestWarehouse warehouse,
-      ScheduledExecutorService queryCancellationExecutor,
       DataServerRequestDescriptor dataServerRequestDescriptor
   )
   {
@@ -100,44 +102,45 @@ public class DataServerQueryHandler
     this.coordinatorClient = coordinatorClient;
     this.objectMapper = objectMapper;
     this.warehouse = warehouse;
-    this.queryCancellationExecutor = queryCancellationExecutor;
     this.dataServerRequestDescriptor = dataServerRequestDescriptor;
   }
 
   @VisibleForTesting
   DataServerClient makeDataServerClient(ServiceLocation serviceLocation)
   {
-    return new DataServerClient(serviceClientFactory, serviceLocation, 
objectMapper, queryCancellationExecutor);
+    return new DataServerClient(serviceClientFactory, serviceLocation, 
objectMapper);
   }
 
   /**
-   * Performs some necessary transforms to the query, so that the dataserver 
is able to understand it first.
-   * - Changing the datasource to a {@link TableDataSource}
-   * - Limiting the query to the required segments with {@link 
Queries#withSpecificSegments(Query, List)}
-   * <br>
-   * Then queries a data server and returns a {@link Yielder} for the results, 
retrying if needed. If a dataserver
-   * indicates that some segments were not found, checks with the coordinator 
to see if the segment was handed off.
-   * - If all the segments were handed off, returns a {@link 
DataServerQueryResult} with the yielder and list of handed
-   * off segments.
-   * - If some segments were not handed off, checks with the coordinator fetch 
an updated list of servers. This step is
-   * repeated up to {@link #DEFAULT_NUM_TRIES} times.
-   * - If the servers could not be found, checks if the segment was 
handed-off. If it was, returns a
-   * {@link DataServerQueryResult} with the yielder and list of handed off 
segments. Otherwise, throws an exception.
-   * <br>
-   * Also applies {@link QueryToolChest#makePreComputeManipulatorFn(Query, 
MetricManipulationFn)} and reports channel
-   * metrics on the returned results.
+   * {@inheritDoc}
+   *
+   * This method blocks until servers start sending their results back. When 
the method returns, the future is
+   * immediately resolved. This means that on tasks, queries to realtime 
servers block the processing pool.
+   * In principle, however, it should be possible to make the logic in this 
class asynchronous.
+   *
+   * Queries are retried if needed. If a data server indicates that some 
segments were not found, this function checks
+   * with the coordinator to see if the segment was handed off.
+   * <ul>
+   * <li>If all the segments were handed off, returns a {@link 
DataServerQueryResult} with the yielder and list of
+   * handed-off segments.</li>
+   * <li>If some segments were not handed off, checks with the coordinator to 
fetch an updated list of servers.
+   * This step is repeated up to {@link #DEFAULT_NUM_TRIES} times.</li>
+   * <li>If the servers could not be found, checks if the segment was 
handed-off. If it was, returns a
+   * {@link DataServerQueryResult} with the yielder and list of handed off 
segments. Otherwise,
+   * throws an exception.</li>
+   * </ul>
    *
    * @param <QueryType> result return type for the query from the data server
-   * @param <RowType> type of the result rows after parsing from QueryType 
object
+   * @param <RowType>   type of the result rows after parsing from QueryType 
object
    */
-  public <RowType, QueryType> DataServerQueryResult<RowType> 
fetchRowsFromDataServer(
+  @Override
+  public <RowType, QueryType> ListenableFuture<DataServerQueryResult<RowType>> 
fetchRowsFromDataServer(
       Query<QueryType> query,
       Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction,
       Closer closer
   )
   {
-    // MSQ changes the datasource to a number datasource. This needs to be 
changed back for data servers to understand.
-    final Query<QueryType> preparedQuery = query.withDataSource(new 
TableDataSource(dataSource));
+    final Query<QueryType> preparedQuery = 
DataServerQueryHandlerUtils.prepareQuery(query, dataSource);
     final List<Yielder<RowType>> yielders = new ArrayList<>();
     final List<RichSegmentDescriptor> handedOffSegments = new ArrayList<>();
 
@@ -152,7 +155,13 @@ public class DataServerQueryHandler
       for (DataServerRequestDescriptor descriptor : pendingRequests) {
         log.info("Querying server [%s] for segments[%s]", 
descriptor.getServerMetadata(), descriptor.getSegments());
         processedSegments.addAll(descriptor.getSegments());
-        Yielder<RowType> yielder = fetchRowsFromDataServerInternal(descriptor, 
responseContext, closer, preparedQuery, mappingFunction);
+        Yielder<RowType> yielder = fetchRowsFromDataServerInternal(
+            descriptor,
+            responseContext,
+            closer,
+            preparedQuery,
+            mappingFunction
+        );
 
         // Add results
         if (yielder != null && !yielder.isDone()) {
@@ -161,7 +170,7 @@ public class DataServerQueryHandler
       }
 
       // Check for missing segments
-      List<SegmentDescriptor> missingSegments = 
getMissingSegments(responseContext);
+      List<SegmentDescriptor> missingSegments = 
DataServerQueryHandlerUtils.getMissingSegments(responseContext);
       if (missingSegments.isEmpty()) {
         // No segments remaining.
         break;
@@ -195,7 +204,9 @@ public class DataServerQueryHandler
       }
     }
 
-    return new DataServerQueryResult<>(yielders, handedOffSegments, 
dataSource);
+    // Not actually async. The retry logic above is written in synchronous 
fashion. Just return an immediate-future
+    // when we actually have all queries issued and all yielders set up.
+    return Futures.immediateFuture(new DataServerQueryResult<>(yielders, 
handedOffSegments, dataSource));
   }
 
   private <QueryType, RowType> Yielder<RowType> 
fetchRowsFromDataServerInternal(
@@ -212,29 +223,44 @@ public class DataServerQueryHandler
     final Function<QueryType, QueryType> preComputeManipulatorFn =
         toolChest.makePreComputeManipulatorFn(query, 
MetricManipulatorFns.deserializing());
     final JavaType queryResultType = toolChest.getBaseResultType();
-    final List<SegmentDescriptor> segmentDescriptors = 
requestDescriptor.getSegments()
-                                                                        
.stream()
-                                                                        
.map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval)
-                                                                        
.collect(Collectors.toList());
+    final List<SegmentDescriptor> segmentDescriptors =
+        requestDescriptor.getSegments()
+                         .stream()
+                         
.map(IndexerDataServerQueryHandler::toSegmentDescriptorWithFullInterval)
+                         .collect(Collectors.toList());
 
     try {
       return RetryUtils.retry(
-          () -> closer.register(createYielder(
-              dataServerClient.run(
-                  Queries.withSpecificSegments(
-                      query,
-                      requestDescriptor.getSegments()
-                                       .stream()
-                                       
.map(DataServerQueryHandler::toSegmentDescriptorWithFullInterval)
-                                       .collect(Collectors.toList())
-                  ), responseContext, queryResultType, 
closer).map(preComputeManipulatorFn), mappingFunction)),
-          throwable -> !(throwable instanceof QueryInterruptedException
-                         && throwable.getCause() instanceof 
InterruptedException),
+          () -> {
+            final ListenableFuture<Sequence<QueryType>> queryFuture = 
dataServerClient.run(
+                Queries.withSpecificSegments(
+                    query,
+                    requestDescriptor.getSegments()
+                                     .stream()
+                                     
.map(RichSegmentDescriptor::toPlainDescriptor)
+                                     .collect(Collectors.toList())
+                ),
+                responseContext,
+                queryResultType,
+                closer
+            );
+
+            return closer.register(
+                DataServerQueryHandlerUtils.createYielder(
+                    queryFuture.get().map(preComputeManipulatorFn),
+                    mappingFunction,
+                    channelCounters
+                )
+            );
+          },
+          throwable -> !(throwable instanceof ExecutionException
+                         && throwable.getCause() instanceof 
QueryInterruptedException
+                         && throwable.getCause().getCause() instanceof 
InterruptedException),
           PER_SERVER_QUERY_NUM_TRIES
       );
     }
-    catch (QueryInterruptedException e) {
-      if (e.getCause() instanceof RpcException) {
+    catch (ExecutionException e) {
+      if (e.getCause() instanceof QueryInterruptedException && 
e.getCause().getCause() instanceof RpcException) {
         // In the case that all the realtime servers for a segment are gone 
(for example, if they were scaled down),
         // we would also be unable to fetch the segment.
         responseContext.addMissingSegments(segmentDescriptors);
@@ -252,20 +278,6 @@ public class DataServerQueryHandler
     }
   }
 
-  private <RowType, QueryType> Yielder<RowType> createYielder(
-      final Sequence<QueryType> sequence,
-      final Function<Sequence<QueryType>, Sequence<RowType>> mappingFunction
-  )
-  {
-    return Yielders.each(
-        mappingFunction.apply(sequence)
-                       .map(row -> {
-                         channelCounters.incrementRowCount();
-                         return row;
-                       })
-    );
-  }
-
   private List<DataServerRequestDescriptor> createNextPendingRequests(
       final Set<RichSegmentDescriptor> richSegmentDescriptors
   )
@@ -296,20 +308,28 @@ public class DataServerQueryHandler
         Set<DruidServerMetadata> servers = segmentLoadInfo.getServers()
                                                           .stream()
                                                           
.filter(druidServerMetadata -> SegmentSource.REALTIME.getUsedServerTypes()
-                                                                               
                                .contains(druidServerMetadata.getType()))
+                                                                               
                                .contains(
+                                                                               
                                    druidServerMetadata.getType()))
                                                           
.collect(Collectors.toSet());
         if (servers.isEmpty()) {
           throw DruidException.forPersona(DruidException.Persona.OPERATOR)
                               
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
-                              .build("Could not find a server matching 
includeSegmentSource[%s] for segment[%s]. Only found servers [%s]",
-                                     SegmentSource.REALTIME, 
richSegmentDescriptor, servers);
+                              .build(
+                                  "Could not find a server matching 
includeSegmentSource[%s] for segment[%s]. Only found servers [%s]",
+                                  SegmentSource.REALTIME, 
richSegmentDescriptor, servers
+                              );
         }
 
         DruidServerMetadata druidServerMetadata = 
DataServerSelector.RANDOM.getSelectServerFunction().apply(servers);
         serverVsSegmentsMap.computeIfAbsent(druidServerMetadata, ignored -> 
new HashSet<>());
         SegmentDescriptor descriptor = 
segmentLoadInfo.getSegment().toDescriptor();
         serverVsSegmentsMap.get(druidServerMetadata)
-                           .add(new 
RichSegmentDescriptor(richSegmentDescriptor.getFullInterval(), 
richSegmentDescriptor.getInterval(), descriptor.getVersion(), 
descriptor.getPartitionNumber()));
+                           .add(new RichSegmentDescriptor(
+                               richSegmentDescriptor.getFullInterval(),
+                               richSegmentDescriptor.getInterval(),
+                               descriptor.getVersion(),
+                               descriptor.getPartitionNumber()
+                           ));
       }
     }
 
@@ -325,18 +345,6 @@ public class DataServerQueryHandler
     return requestDescriptors;
   }
 
-  /**
-   * Retreives the list of missing segments from the response context.
-   */
-  private static List<SegmentDescriptor> getMissingSegments(final 
ResponseContext responseContext)
-  {
-    List<SegmentDescriptor> missingSegments = 
responseContext.getMissingSegments();
-    if (missingSegments == null) {
-      return ImmutableList.of();
-    }
-    return missingSegments;
-  }
-
   /**
    * Queries the coordinator to check if a list of segments has been handed 
off.
    * Returns a list of segments which have been handed off.
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java
similarity index 55%
copy from 
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java
copy to 
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java
index 1caed919ef0..46e06a5484b 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/DataServerQueryHandlerFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerFactory.java
@@ -17,36 +17,27 @@
  * under the License.
  */
 
-package org.apache.druid.msq.exec;
+package org.apache.druid.msq.indexing;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.java.util.common.RE;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
 import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
 import org.apache.druid.query.QueryToolChestWarehouse;
 import org.apache.druid.rpc.ServiceClientFactory;
 
-import java.io.Closeable;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 /**
- * Creates new instances of {@link DataServerQueryHandler} and manages the 
cancellation threadpool.
+ * Creates new instances of {@link IndexerDataServerQueryHandler}.
  */
-public class DataServerQueryHandlerFactory implements Closeable
+public class IndexerDataServerQueryHandlerFactory implements 
DataServerQueryHandlerFactory
 {
-  private static final Logger log = new 
Logger(DataServerQueryHandlerFactory.class);
-  private static final int DEFAULT_THREAD_COUNT = 4;
   private final CoordinatorClient coordinatorClient;
   private final ServiceClientFactory serviceClientFactory;
   private final ObjectMapper objectMapper;
   private final QueryToolChestWarehouse warehouse;
-  private final ScheduledExecutorService queryCancellationExecutor;
 
-  public DataServerQueryHandlerFactory(
+  public IndexerDataServerQueryHandlerFactory(
       CoordinatorClient coordinatorClient,
       ServiceClientFactory serviceClientFactory,
       ObjectMapper objectMapper,
@@ -57,41 +48,23 @@ public class DataServerQueryHandlerFactory implements 
Closeable
     this.serviceClientFactory = serviceClientFactory;
     this.objectMapper = objectMapper;
     this.warehouse = warehouse;
-    this.queryCancellationExecutor = 
ScheduledExecutors.fixed(DEFAULT_THREAD_COUNT, "query-cancellation-executor");
   }
 
-  public DataServerQueryHandler createDataServerQueryHandler(
+  @Override
+  public IndexerDataServerQueryHandler createDataServerQueryHandler(
       String dataSource,
       ChannelCounters channelCounters,
-      DataServerRequestDescriptor dataServerRequestDescriptor
+      DataServerRequestDescriptor requestDescriptor
   )
   {
-    return new DataServerQueryHandler(
+    return new IndexerDataServerQueryHandler(
         dataSource,
         channelCounters,
         serviceClientFactory,
         coordinatorClient,
         objectMapper,
         warehouse,
-        queryCancellationExecutor,
-        dataServerRequestDescriptor
+        requestDescriptor
     );
   }
-
-  @Override
-  public void close()
-  {
-    // Wait for all query cancellations to be complete.
-    log.info("Waiting for any data server queries to be canceled.");
-    queryCancellationExecutor.shutdown();
-    try {
-      if (!queryCancellationExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
-        log.error("Unable to cancel all ongoing queries.");
-      }
-    }
-    catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RE(e);
-    }
-  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
index 9e2c9d41074..368709b7eff 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
@@ -50,7 +50,7 @@ public class IndexerFrameContext implements FrameContext
   private final ResourceHolder<ProcessingBuffers> processingBuffers;
   private final WorkerMemoryParameters memoryParameters;
   private final WorkerStorageParameters storageParameters;
-  private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
+  private final IndexerDataServerQueryHandlerFactory 
dataServerQueryHandlerFactory;
 
   public IndexerFrameContext(
       StageId stageId,
@@ -59,7 +59,7 @@ public class IndexerFrameContext implements FrameContext
       IndexIO indexIO,
       DataSegmentProvider dataSegmentProvider,
       ResourceHolder<ProcessingBuffers> processingBuffers,
-      DataServerQueryHandlerFactory dataServerQueryHandlerFactory,
+      IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory,
       WorkerMemoryParameters memoryParameters,
       WorkerStorageParameters storageParameters
   )
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 035544401f1..6c55a6add95 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -77,7 +77,7 @@ public class IndexerWorkerContext implements WorkerContext
   private final ServiceLocator controllerLocator;
   private final IndexIO indexIO;
   private final TaskDataSegmentProvider dataSegmentProvider;
-  private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
+  private final IndexerDataServerQueryHandlerFactory 
dataServerQueryHandlerFactory;
   private final ServiceClientFactory clientFactory;
   private final MemoryIntrospector memoryIntrospector;
   private final ProcessingBuffersProvider processingBuffersProvider;
@@ -98,7 +98,7 @@ public class IndexerWorkerContext implements WorkerContext
       final ServiceClientFactory clientFactory,
       final MemoryIntrospector memoryIntrospector,
       final ProcessingBuffersProvider processingBuffersProvider,
-      final DataServerQueryHandlerFactory dataServerQueryHandlerFactory
+      final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory
   )
   {
     this.task = task;
@@ -158,7 +158,7 @@ public class IndexerWorkerContext implements WorkerContext
         serviceClientFactory,
         memoryIntrospector,
         processingBuffersProvider,
-        new DataServerQueryHandlerFactory(
+        new IndexerDataServerQueryHandlerFactory(
             toolbox.getCoordinatorClient(),
             serviceClientFactory,
             smileMapper,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java
index 27f5202b6ce..869464df04d 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/RichSegmentDescriptor.java
@@ -93,6 +93,14 @@ public class RichSegmentDescriptor extends SegmentDescriptor
     return fullInterval;
   }
 
+  /**
+   * Returns a plain descriptor, with the {@link #fullInterval} field dropped.
+   */
+  public SegmentDescriptor toPlainDescriptor()
+  {
+    return new SegmentDescriptor(getInterval(), getVersion(), 
getPartitionNumber());
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
index db1259f21e5..879386e8707 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java
@@ -20,8 +20,10 @@
 package org.apache.druid.msq.querykit.groupby;
 
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.channel.FrameWithPartition;
 import org.apache.druid.frame.channel.ReadableFrameChannel;
@@ -63,6 +65,7 @@ import org.apache.druid.segment.column.RowSignature;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
@@ -85,6 +88,7 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
   private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
   private SegmentsInputSlice handedOffSegments = null;
   private Yielder<Yielder<ResultRow>> currentResultsYielder;
+  private ListenableFuture<DataServerQueryResult<ResultRow>> 
dataServerQueryResultFuture;
 
   public GroupByPreShuffleFrameProcessor(
       final GroupByQuery query,
@@ -117,12 +121,23 @@ public class GroupByPreShuffleFrameProcessor extends 
BaseLeafFrameProcessor
   {
     if (resultYielder == null || resultYielder.isDone()) {
       if (currentResultsYielder == null) {
+        if (dataServerQueryResultFuture == null) {
+          dataServerQueryResultFuture =
+              dataServerQueryHandler.fetchRowsFromDataServer(
+                  groupingEngine.prepareGroupByQuery(query),
+                  Function.identity(),
+                  closer
+              );
+
+          // Give up the processing thread while we wait for the query to 
finish. This is only really asynchronous
+          // with Dart. On tasks, the IndexerDataServerQueryHandler does not 
return from fetchRowsFromDataServer until
+          // the response has started to come back.
+          return 
ReturnOrAwait.awaitAllFutures(Collections.singletonList(dataServerQueryResultFuture));
+        }
+
         final DataServerQueryResult<ResultRow> dataServerQueryResult =
-            dataServerQueryHandler.fetchRowsFromDataServer(
-                groupingEngine.prepareGroupByQuery(query),
-                Function.identity(),
-                closer
-            );
+            FutureUtils.getUncheckedImmediately(dataServerQueryResultFuture);
+        dataServerQueryResultFuture = null;
         handedOffSegments = dataServerQueryResult.getHandedOffSegments();
         if (!handedOffSegments.getDescriptors().isEmpty()) {
           log.info(
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
index c90aec81af1..73c7917ed62 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java
@@ -23,8 +23,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ListenableFuture;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.channel.FrameWithPartition;
@@ -85,6 +87,7 @@ import javax.validation.constraints.NotNull;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -104,6 +107,7 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
   private final Closer closer = Closer.create();
 
   private Cursor cursor;
+  private ListenableFuture<DataServerQueryResult<Object[]>> 
dataServerQueryResultFuture;
   private Closeable cursorCloser;
   private Segment segment;
   private final SimpleSettableOffset cursorOffset = new 
SimpleAscendingOffset(Integer.MAX_VALUE);
@@ -196,12 +200,24 @@ public class ScanQueryFrameProcessor extends 
BaseLeafFrameProcessor
   {
     if (cursor == null) {
       ScanQuery preparedQuery = prepareScanQueryForDataServer(query);
+
+      if (dataServerQueryResultFuture == null) {
+        dataServerQueryResultFuture =
+            dataServerQueryHandler.fetchRowsFromDataServer(
+                preparedQuery,
+                ScanQueryFrameProcessor::mappingFunction,
+                closer
+            );
+
+        // Give up the processing thread while we wait for the query to 
finish. This is only really asynchronous
+        // with Dart. On tasks, the IndexerDataServerQueryHandler does not 
return from fetchRowsFromDataServer until
+        // the response has started to come back.
+        return 
ReturnOrAwait.awaitAllFutures(Collections.singletonList(dataServerQueryResultFuture));
+      }
+
       final DataServerQueryResult<Object[]> dataServerQueryResult =
-          dataServerQueryHandler.fetchRowsFromDataServer(
-              preparedQuery,
-              ScanQueryFrameProcessor::mappingFunction,
-              closer
-          );
+          FutureUtils.getUncheckedImmediately(dataServerQueryResultFuture);
+      dataServerQueryResultFuture = null;
       handedOffSegments = dataServerQueryResult.getHandedOffSegments();
       if (!handedOffSegments.getDescriptors().isEmpty()) {
         log.info(
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
index 07e3854ca06..cb4a75d7b1c 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQLoadedSegmentTests.java
@@ -22,6 +22,7 @@ package org.apache.druid.msq.exec;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Futures;
 import org.apache.druid.client.ImmutableSegmentLoadInfo;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
@@ -103,18 +104,20 @@ public class MSQLoadedSegmentTests extends MSQTestBase
                                                .build();
 
     doReturn(
-        new DataServerQueryResult<>(
-            ImmutableList.of(
-                Yielders.each(
-                    Sequences.simple(
-                        ImmutableList.of(
-                            new Object[]{1L, "qwe"},
-                            new Object[]{1L, "tyu"}
+        Futures.immediateFuture(
+            new DataServerQueryResult<>(
+                ImmutableList.of(
+                    Yielders.each(
+                        Sequences.simple(
+                            ImmutableList.of(
+                                new Object[]{1L, "qwe"},
+                                new Object[]{1L, "tyu"}
+                            )
                         )
-                    )
-                )),
-            ImmutableList.of(),
-            "foo"
+                    )),
+                ImmutableList.of(),
+                "foo"
+            )
         )).when(dataServerQueryHandler)
           .fetchRowsFromDataServer(any(), any(), any());
 
@@ -164,18 +167,20 @@ public class MSQLoadedSegmentTests extends MSQTestBase
           ScanQuery query = invocationOnMock.getArgument(0);
           ScanQuery.verifyOrderByForNativeExecution(query);
           Assert.assertEquals(Long.MAX_VALUE, query.getScanRowsLimit());
-          return new DataServerQueryResult<>(
-              ImmutableList.of(
-                  Yielders.each(
-                      Sequences.simple(
-                          ImmutableList.of(
-                              new Object[]{1L, "qwe"},
-                              new Object[]{1L, "tyu"}
+          return Futures.immediateFuture(
+              new DataServerQueryResult<>(
+                  ImmutableList.of(
+                      Yielders.each(
+                          Sequences.simple(
+                              ImmutableList.of(
+                                  new Object[]{1L, "qwe"},
+                                  new Object[]{1L, "tyu"}
+                              )
                           )
-                      )
-                  )),
-              ImmutableList.of(),
-              "foo"
+                      )),
+                  ImmutableList.of(),
+                  "foo"
+              )
           );
         }
     )
@@ -225,17 +230,19 @@ public class MSQLoadedSegmentTests extends MSQTestBase
                                             .build();
 
     doReturn(
-        new DataServerQueryResult<>(
-            ImmutableList.of(
-                Yielders.each(
-                    Sequences.simple(
-                        ImmutableList.of(
-                            ResultRow.of(1L, 2L)
+        Futures.immediateFuture(
+            new DataServerQueryResult<>(
+                ImmutableList.of(
+                    Yielders.each(
+                        Sequences.simple(
+                            ImmutableList.of(
+                                ResultRow.of(1L, 2L)
+                            )
                         )
-                    )
-                )),
-            ImmutableList.of(),
-            "foo"
+                    )),
+                ImmutableList.of(),
+                "foo"
+            )
         )
     )
         .when(dataServerQueryHandler)
@@ -285,17 +292,19 @@ public class MSQLoadedSegmentTests extends MSQTestBase
                                             .build();
 
     doReturn(
-        new DataServerQueryResult<>(
-            ImmutableList.of(
-                Yielders.each(
-                    Sequences.simple(
-                        ImmutableList.of(
-                            ResultRow.of(1L, 2L)
+        Futures.immediateFuture(
+            new DataServerQueryResult<>(
+                ImmutableList.of(
+                    Yielders.each(
+                        Sequences.simple(
+                            ImmutableList.of(
+                                ResultRow.of(1L, 2L)
+                            )
                         )
-                    )
-                )),
-            ImmutableList.of(),
-            "foo"
+                    )),
+                ImmutableList.of(),
+                "foo"
+            )
         )
     )
         .when(dataServerQueryHandler)
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
similarity index 78%
rename from 
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java
rename to 
extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
index f04a65f89a8..2285e49b7fc 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/DataServerQueryHandlerTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/IndexerDataServerQueryHandlerTest.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.druid.msq.exec;
+package org.apache.druid.msq.indexing;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -29,11 +29,12 @@ import org.apache.druid.discovery.DataServerClient;
 import org.apache.druid.discovery.DruidServiceTestUtils;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.guava.Yielder;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.msq.counters.ChannelCounters;
+import org.apache.druid.msq.exec.DataServerQueryResult;
+import org.apache.druid.msq.exec.SegmentSource;
 import org.apache.druid.msq.input.table.DataServerRequestDescriptor;
 import org.apache.druid.msq.input.table.RichSegmentDescriptor;
 import org.apache.druid.msq.querykit.InputNumberDataSource;
@@ -65,20 +66,19 @@ import org.junit.runner.RunWith;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
-import static 
org.apache.druid.msq.exec.DataServerQueryHandler.toSegmentDescriptorWithFullInterval;
 import static org.apache.druid.query.Druids.newScanQueryBuilder;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 @RunWith(MockitoJUnitRunner.class)
-public class DataServerQueryHandlerTest
+public class IndexerDataServerQueryHandlerTest
 {
   private static final String DATASOURCE1 = "dataSource1";
   private static final DruidServerMetadata DRUID_SERVER_1 = new 
DruidServerMetadata(
@@ -115,7 +115,7 @@ public class DataServerQueryHandlerTest
   private DataServerClient dataServerClient2;
   private CoordinatorClient coordinatorClient;
   private ScanQuery query;
-  private DataServerQueryHandler target;
+  private IndexerDataServerQueryHandler target;
 
   @Before
   public void setUp()
@@ -136,14 +136,13 @@ public class DataServerQueryHandlerTest
                     .build()
     );
     target = spy(
-        new DataServerQueryHandler(
+        new IndexerDataServerQueryHandler(
             DATASOURCE1,
             new ChannelCounters(),
             mock(ServiceClientFactory.class),
             coordinatorClient,
             DruidServiceTestUtils.newJsonMapper(),
             queryToolChestWarehouse,
-            Execs.scheduledSingleThreaded("query-cancellation-executor"),
             new DataServerRequestDescriptor(DRUID_SERVER_1, 
ImmutableList.of(SEGMENT_1, SEGMENT_2))
         )
     );
@@ -160,7 +159,7 @@ public class DataServerQueryHandlerTest
   }
 
   @Test
-  public void testFetchRowsFromServer()
+  public void testFetchRowsFromServer() throws ExecutionException, 
InterruptedException
   {
     ScanResultValue scanResultValue = new ScanResultValue(
         null,
@@ -172,13 +171,14 @@ public class DataServerQueryHandlerTest
         )
     );
 
-    
doReturn(Sequences.simple(ImmutableList.of(scanResultValue))).when(dataServerClient1).run(any(),
 any(), any(), any());
+    
doReturn(Futures.immediateFuture(Sequences.simple(ImmutableList.of(scanResultValue))))
+        .when(dataServerClient1).run(any(), any(), any(), any());
 
     DataServerQueryResult<Object[]> dataServerQueryResult = 
target.fetchRowsFromDataServer(
         query,
         ScanQueryFrameProcessor::mappingFunction,
         Closer.create()
-    );
+    ).get();
 
     
Assert.assertTrue(dataServerQueryResult.getHandedOffSegments().getDescriptors().isEmpty());
     List<List<Object>> events = (List<List<Object>>) 
scanResultValue.getEvents();
@@ -192,7 +192,7 @@ public class DataServerQueryHandlerTest
   }
 
   @Test
-  public void testOneSegmentRelocated()
+  public void testOneSegmentRelocated() throws ExecutionException, 
InterruptedException
   {
     ScanResultValue scanResultValue1 = new ScanResultValue(
         null,
@@ -207,10 +207,10 @@ public class DataServerQueryHandlerTest
       ResponseContext responseContext = invocation.getArgument(1);
       responseContext.addMissingSegments(
           ImmutableList.of(
-              toSegmentDescriptorWithFullInterval(SEGMENT_2)
+              
IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2)
           )
       );
-      return Sequences.simple(ImmutableList.of(scanResultValue1));
+      return 
Futures.immediateFuture(Sequences.simple(ImmutableList.of(scanResultValue1)));
     }).when(dataServerClient1).run(any(), any(), any(), any());
 
     ScanResultValue scanResultValue2 = new ScanResultValue(
@@ -222,9 +222,12 @@ public class DataServerQueryHandlerTest
         )
     );
 
-    
doReturn(Sequences.simple(ImmutableList.of(scanResultValue2))).when(dataServerClient2).run(any(),
 any(), any(), any());
+    
doReturn(Futures.immediateFuture(Sequences.simple(ImmutableList.of(scanResultValue2))))
+        .when(dataServerClient2).run(any(), any(), any(), any());
 
-    
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1,
 toSegmentDescriptorWithFullInterval(SEGMENT_2));
+    doReturn(Futures.immediateFuture(Boolean.FALSE))
+        .when(coordinatorClient)
+        .isHandoffComplete(DATASOURCE1, 
IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2));
     doReturn(ImmutableList.of(
         new ImmutableSegmentLoadInfo(
             DataSegment.builder()
@@ -241,7 +244,7 @@ public class DataServerQueryHandlerTest
         query,
         ScanQueryFrameProcessor::mappingFunction,
         Closer.create()
-    );
+    ).get();
 
     
Assert.assertTrue(dataServerQueryResult.getHandedOffSegments().getDescriptors().isEmpty());
 
@@ -263,26 +266,30 @@ public class DataServerQueryHandlerTest
   }
 
   @Test
-  public void testHandoff()
+  public void testHandoff() throws ExecutionException, InterruptedException
   {
     doAnswer(invocation -> {
       ResponseContext responseContext = invocation.getArgument(1);
       responseContext.addMissingSegments(
           ImmutableList.of(
-              toSegmentDescriptorWithFullInterval(SEGMENT_1),
-              toSegmentDescriptorWithFullInterval(SEGMENT_2)
+              
IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1),
+              
IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2)
           )
       );
-      return Sequences.empty();
+      return Futures.immediateFuture(Sequences.empty());
     }).when(dataServerClient1).run(any(), any(), any(), any());
-    
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1,
 toSegmentDescriptorWithFullInterval(SEGMENT_1));
-    
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1,
 toSegmentDescriptorWithFullInterval(SEGMENT_2));
+    doReturn(Futures.immediateFuture(Boolean.TRUE))
+        .when(coordinatorClient)
+        .isHandoffComplete(DATASOURCE1, 
IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1));
+    doReturn(Futures.immediateFuture(Boolean.TRUE))
+        .when(coordinatorClient)
+        .isHandoffComplete(DATASOURCE1, 
IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2));
 
     DataServerQueryResult<Object[]> dataServerQueryResult = 
target.fetchRowsFromDataServer(
         query,
         ScanQueryFrameProcessor::mappingFunction,
         Closer.create()
-    );
+    ).get();
 
     Assert.assertEquals(ImmutableList.of(SEGMENT_1, SEGMENT_2), 
dataServerQueryResult.getHandedOffSegments().getDescriptors());
     Assert.assertTrue(dataServerQueryResult.getResultsYielders().isEmpty());
@@ -291,13 +298,16 @@ public class DataServerQueryHandlerTest
   @Test
   public void testServerNotFoundWithoutHandoffShouldThrowException()
   {
-    doThrow(
-        new QueryInterruptedException(new RpcException("Could not connect to 
server"))
+    doReturn(
+        Futures.immediateFailedFuture(new QueryInterruptedException(new 
RpcException("Could not connect to server")))
     ).when(dataServerClient1).run(any(), any(), any(), any());
 
-    
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1,
 toSegmentDescriptorWithFullInterval(SEGMENT_1));
+    doReturn(Futures.immediateFuture(Boolean.FALSE))
+        .when(coordinatorClient)
+        .isHandoffComplete(DATASOURCE1, 
IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1));
 
-    ScanQuery queryWithRetry = 
query.withOverriddenContext(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY,
 3));
+    ScanQuery queryWithRetry =
+        
query.withOverriddenContext(ImmutableMap.of(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY,
 3));
 
     Assert.assertThrows(DruidException.class, () ->
         target.fetchRowsFromDataServer(
@@ -311,20 +321,24 @@ public class DataServerQueryHandlerTest
   }
 
   @Test
-  public void testServerNotFoundButHandoffShouldReturnWithStatus()
+  public void testServerNotFoundButHandoffShouldReturnWithStatus() throws 
ExecutionException, InterruptedException
   {
-    doThrow(
-        new QueryInterruptedException(new RpcException("Could not connect to 
server"))
+    doReturn(
+        Futures.immediateFailedFuture(new QueryInterruptedException(new 
RpcException("Could not connect to server")))
     ).when(dataServerClient1).run(any(), any(), any(), any());
 
-    
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1,
 toSegmentDescriptorWithFullInterval(SEGMENT_1));
-    
doReturn(Futures.immediateFuture(Boolean.TRUE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1,
 toSegmentDescriptorWithFullInterval(SEGMENT_2));
+    doReturn(Futures.immediateFuture(Boolean.TRUE))
+        .when(coordinatorClient)
+        .isHandoffComplete(DATASOURCE1, 
IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1));
+    doReturn(Futures.immediateFuture(Boolean.TRUE))
+        .when(coordinatorClient)
+        .isHandoffComplete(DATASOURCE1, 
IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_2));
 
     DataServerQueryResult<Object[]> dataServerQueryResult = 
target.fetchRowsFromDataServer(
         query,
         ScanQueryFrameProcessor::mappingFunction,
         Closer.create()
-    );
+    ).get();
 
     Assert.assertEquals(ImmutableList.of(SEGMENT_1, SEGMENT_2), 
dataServerQueryResult.getHandedOffSegments().getDescriptors());
     Assert.assertTrue(dataServerQueryResult.getResultsYielders().isEmpty());
@@ -333,11 +347,12 @@ public class DataServerQueryHandlerTest
   @Test
   public void testQueryFail()
   {
-    SegmentDescriptor segmentDescriptorWithFullInterval = 
toSegmentDescriptorWithFullInterval(SEGMENT_1);
+    SegmentDescriptor segmentDescriptorWithFullInterval =
+        
IndexerDataServerQueryHandler.toSegmentDescriptorWithFullInterval(SEGMENT_1);
     doAnswer(invocation -> {
       ResponseContext responseContext = invocation.getArgument(1);
       
responseContext.addMissingSegments(ImmutableList.of(segmentDescriptorWithFullInterval));
-      return Sequences.empty();
+      return Futures.immediateFuture(Sequences.empty());
     }).when(dataServerClient1).run(any(), any(), any(), any());
     
doReturn(Futures.immediateFuture(Boolean.FALSE)).when(coordinatorClient).isHandoffComplete(DATASOURCE1,
 segmentDescriptorWithFullInterval);
 
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
 
b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
index b76263b52e9..abcff1308d6 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
@@ -44,6 +44,7 @@ import org.apache.druid.java.util.common.logger.Logger;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -134,7 +135,7 @@ public class FrameProcessorExecutor
           logProcessorStatusString(processor, finished, allWritabilityFutures);
 
           if (!writabilityFuturesToWaitFor.isEmpty()) {
-            
runProcessorAfterFutureResolves(Futures.allAsList(writabilityFuturesToWaitFor));
+            
runProcessorAfterFutureResolves(Futures.allAsList(writabilityFuturesToWaitFor), 
false);
             return;
           }
 
@@ -150,13 +151,17 @@ public class FrameProcessorExecutor
 
           if (result.isReturn()) {
             succeed(result.value());
+          } else if (result.hasAwaitableFutures()) {
+            
runProcessorAfterFutureResolves(Futures.allAsList(result.awaitableFutures()), 
true);
           } else {
+            assert result.hasAwaitableChannels();
+
             // Don't retain a reference to this set: it may be mutated the 
next time the processor runs.
-            final IntSet await = result.awaitSet();
+            final IntSet await = result.awaitableChannels();
 
             if (await.isEmpty()) {
               exec.execute(ExecutorRunnable.this);
-            } else if (result.isAwaitAll() || await.size() == 1) {
+            } else if (result.isAwaitAllChannels() || await.size() == 1) {
               final List<ListenableFuture<?>> readabilityFutures = new 
ArrayList<>();
 
               for (final int channelNumber : await) {
@@ -169,11 +174,11 @@ public class FrameProcessorExecutor
               if (readabilityFutures.isEmpty()) {
                 exec.execute(ExecutorRunnable.this);
               } else {
-                
runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures));
+                
runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures), false);
               }
             } else {
               // Await any.
-              runProcessorAfterFutureResolves(awaitAnyWidget.awaitAny(await));
+              runProcessorAfterFutureResolves(awaitAnyWidget.awaitAny(await), 
false);
             }
           }
         }
@@ -272,7 +277,17 @@ public class FrameProcessorExecutor
         }
       }
 
-      private <V> void runProcessorAfterFutureResolves(final 
ListenableFuture<V> future)
+      /**
+       * Schedule this processor to run after the provided future resolves.
+       *
+       * @param future       the future
+       * @param failOnCancel whether the processor should be {@link 
#fail(Throwable)} if the future is itself canceled.
+       *                     This is true for futures provided by {@link 
ReturnOrAwait#awaitAllFutures(Collection)},
+       *                     because the processor has declared it wants to 
wait for them; if they are canceled
+       *                     the processor must fail. It is false for other 
futures, which the processor was not
+       *                     directly waiting for.
+       */
+      private <V> void runProcessorAfterFutureResolves(final 
ListenableFuture<V> future, final boolean failOnCancel)
       {
         final ListenableFuture<V> cancelableFuture = 
registerCancelableFuture(future, false, cancellationId);
 
@@ -294,8 +309,7 @@ public class FrameProcessorExecutor
               @Override
               public void onFailure(Throwable t)
               {
-                // Ignore cancellation.
-                if (!cancelableFuture.isCancelled()) {
+                if (failOnCancel || !cancelableFuture.isCancelled()) {
                   fail(t);
                 }
               }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java 
b/processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java
index 4ca69b6cc49..c154b425851 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/ReturnOrAwait.java
@@ -19,13 +19,14 @@
 
 package org.apache.druid.frame.processor;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSets;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 
 import javax.annotation.Nullable;
-import java.util.Objects;
+import java.util.Collection;
 
 /**
  * Instances of this class are returned by {@link 
FrameProcessor#runIncrementally}, and are used by
@@ -35,8 +36,8 @@ import java.util.Objects;
  * In this case {@link #isReturn()} is true and {@link #value()} contains the 
result.
  *
  * An instance can also be an "await", which means that the {@link 
FrameProcessor} wants to be scheduled again
- * in the future. In this case {@link #isAwait()} is true, {@link #awaitSet()} 
contains the set of input channels to
- * wait for, and {@link #isAwaitAll()} is whether the processor wants to wait 
for all channels, or any channel.
+ * in the future. In this case {@link #isAwait()} is true, and *either* {@link 
#hasAwaitableChannels()} or
+ * {@link #hasAwaitableFutures()} is true.
  */
 public class ReturnOrAwait<T>
 {
@@ -46,18 +47,31 @@ public class ReturnOrAwait<T>
   private final T retVal;
 
   @Nullable
-  private final IntSet await;
+  private final IntSet awaitChannels;
 
-  private final boolean awaitAll;
+  private final boolean awaitAllChannels;
 
-  private ReturnOrAwait(@Nullable T retVal, @Nullable IntSet await, final 
boolean awaitAll)
+  @Nullable
+  private final Collection<ListenableFuture<?>> awaitFutures;
+
+  private ReturnOrAwait(
+      @Nullable T retVal,
+      @Nullable IntSet awaitChannels,
+      @Nullable Collection<ListenableFuture<?>> awaitFutures,
+      final boolean awaitAllChannels
+  )
   {
     this.retVal = retVal;
-    this.await = await;
-    this.awaitAll = awaitAll;
+    this.awaitChannels = awaitChannels;
+    this.awaitAllChannels = awaitAllChannels;
+    this.awaitFutures = awaitFutures;
+
+    if (retVal != null && (awaitChannels != null || awaitFutures != null)) {
+      throw new IAE("Cannot have a value when await != null or futures != 
null");
+    }
 
-    if (retVal != null && await != null) {
-      throw new IAE("Cannot have a value when await != null");
+    if (awaitChannels != null && awaitFutures != null) {
+      throw new ISE("Cannot have both awaitChannels and awaitFutures");
     }
   }
 
@@ -66,7 +80,7 @@ public class ReturnOrAwait<T>
    */
   public static <T> ReturnOrAwait<T> runAgain()
   {
-    return new ReturnOrAwait<>(null, IntSets.emptySet(), true);
+    return new ReturnOrAwait<>(null, IntSets.emptySet(), null, true);
   }
 
   /**
@@ -78,7 +92,7 @@ public class ReturnOrAwait<T>
    */
   public static <T> ReturnOrAwait<T> awaitAll(final IntSet await)
   {
-    return new ReturnOrAwait<>(null, await, true);
+    return new ReturnOrAwait<>(null, await, null, true);
   }
 
   /**
@@ -86,7 +100,15 @@ public class ReturnOrAwait<T>
    */
   public static <T> ReturnOrAwait<T> awaitAll(final int count)
   {
-    return new ReturnOrAwait<>(null, rangeSet(count), true);
+    return new ReturnOrAwait<>(null, rangeSet(count), null, true);
+  }
+
+  /**
+   * Wait for all of the provided futures.
+   */
+  public static <T> ReturnOrAwait<T> awaitAllFutures(final 
Collection<ListenableFuture<?>> futures)
+  {
+    return new ReturnOrAwait<>(null, null, futures, true);
   }
 
   /**
@@ -100,7 +122,7 @@ public class ReturnOrAwait<T>
    */
   public static <T> ReturnOrAwait<T> awaitAny(final IntSet await)
   {
-    return new ReturnOrAwait<>(null, await, false);
+    return new ReturnOrAwait<>(null, await, null, false);
   }
 
   /**
@@ -108,7 +130,7 @@ public class ReturnOrAwait<T>
    */
   public static <T> ReturnOrAwait<T> returnObject(final T o)
   {
-    return new ReturnOrAwait<>(o, null, false);
+    return new ReturnOrAwait<>(o, null, null, false);
   }
 
   /**
@@ -129,13 +151,23 @@ public class ReturnOrAwait<T>
    *
    * Numbers in this set correspond to positions in the {@link 
FrameProcessor#inputChannels()} list.
    */
-  public IntSet awaitSet()
+  public IntSet awaitableChannels()
   {
-    if (isReturn()) {
+    if (!hasAwaitableChannels()) {
       throw new ISE("No await set");
     }
 
-    return await;
+    return awaitChannels;
+  }
+
+
+  public Collection<ListenableFuture<?>> awaitableFutures()
+  {
+    if (!hasAwaitableFutures()) {
+      throw new ISE("No futures set");
+    }
+
+    return awaitFutures;
   }
 
   /**
@@ -145,7 +177,7 @@ public class ReturnOrAwait<T>
    */
   public boolean isReturn()
   {
-    return await == null;
+    return awaitChannels == null && awaitFutures == null;
   }
 
   /**
@@ -155,41 +187,46 @@ public class ReturnOrAwait<T>
    */
   public boolean isAwait()
   {
-    return await != null;
+    return !isReturn();
   }
 
   /**
-   * Whether the processor wants to wait for all channels in {@link 
#awaitSet()} (true), or any channel (false)
+   * Whether the processor wants to wait for a set of futures. If true, {@link 
#awaitableFutures()} contains the
+   * set of futures to wait for.
    */
-  public boolean isAwaitAll()
+  public boolean hasAwaitableFutures()
   {
-    return awaitAll;
+    return awaitFutures != null;
   }
 
-  @Override
-  public boolean equals(Object o)
+  /**
+   * Whether the processor wants to wait for a set of channels. If true, 
{@link #awaitableChannels()} contains the
+   * set of channels to wait for, and {@link #isAwaitAllChannels()}.
+   */
+  public boolean hasAwaitableChannels()
   {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    ReturnOrAwait<?> that = (ReturnOrAwait<?>) o;
-    return awaitAll == that.awaitAll && Objects.equals(retVal, that.retVal) && 
Objects.equals(await, that.await);
+    return awaitChannels != null;
   }
 
-  @Override
-  public int hashCode()
+  /**
+   * Whether the processor wants to wait for all channels in {@link 
#awaitableChannels()} (true), or any channel (false)
+   */
+  public boolean isAwaitAllChannels()
   {
-    return Objects.hash(retVal, await, awaitAll);
+    if (!hasAwaitableChannels()) {
+      throw new ISE("No channels set");
+    }
+
+    return awaitAllChannels;
   }
 
   @Override
   public String toString()
   {
-    if (isAwait()) {
-      return "await=" + (awaitAll ? "all" : "any") + await;
+    if (hasAwaitableChannels()) {
+      return "await channels=" + (awaitAllChannels ? "all" : "any") + 
awaitChannels;
+    } else if (hasAwaitableFutures()) {
+      return "await futures=" + awaitFutures;
     } else {
       return "return=" + retVal;
     }
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
index cf53abd0e5f..60c2e1135b8 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/FrameProcessorExecutorTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.frame.file.FrameFile;
 import org.apache.druid.frame.file.FrameFileWriter;
 import org.apache.druid.frame.processor.test.ChompingFrameProcessor;
 import org.apache.druid.frame.processor.test.FailingFrameProcessor;
+import org.apache.druid.frame.processor.test.FutureWaitingProcessor;
 import org.apache.druid.frame.processor.test.InfiniteFrameProcessor;
 import org.apache.druid.frame.processor.test.SleepyFrameProcessor;
 import org.apache.druid.frame.processor.test.SuperBlasterFrameProcessor;
@@ -414,6 +415,70 @@ public class FrameProcessorExecutorTest
       Assert.assertFalse(processor.didGetInterrupt());
       Assert.assertFalse(processor.didCleanup());
     }
+
+    @Test
+    public void test_awaitAll_withFutures() throws Exception
+    {
+      // Test a processor that waits for futures to complete using 
ReturnOrAwait.awaitAll(Collection<ListenableFuture>)
+      final SettableFuture<String> future1 = SettableFuture.create();
+      final SettableFuture<String> future2 = SettableFuture.create();
+
+      // Start the processor
+      final FutureWaitingProcessor futureWaitingProcessor = new 
FutureWaitingProcessor(future1, future2);
+      final ListenableFuture<List<String>> processorFuture = 
exec.runFully(futureWaitingProcessor, null);
+
+      // Processor should be waiting for futures
+      Assert.assertFalse("Processor should be waiting", 
processorFuture.isDone());
+
+      // Complete the futures
+      future1.set("result1");
+      future2.set("result2");
+
+      // Processor should complete now
+      Assert.assertEquals(List.of("result1", "result2"), 
processorFuture.get());
+      Assert.assertTrue(futureWaitingProcessor.isCleanedUp());
+    }
+
+    @Test
+    public void test_awaitAll_withFutures_canceled()
+    {
+      // Test cancellation of a future that a processor is waiting for
+      final SettableFuture<String> future1 = SettableFuture.create();
+      final SettableFuture<String> future2 = SettableFuture.create();
+
+      // Start the processor
+      final FutureWaitingProcessor futureWaitingProcessor = new 
FutureWaitingProcessor(future1, future2);
+      final ListenableFuture<List<String>> processorFuture = 
exec.runFully(futureWaitingProcessor, null);
+
+      // Resolve one, cancel one
+      future1.set("result1");
+      future2.cancel(true);
+
+      // Verify exception
+      final ExecutionException e = 
Assert.assertThrows(ExecutionException.class, processorFuture::get);
+      MatcherAssert.assertThat(e.getCause(), 
CoreMatchers.instanceOf(CancellationException.class));
+    }
+
+    @Test
+    public void test_awaitAll_withFutures_error()
+    {
+      // Test errors on a future that a processor is waiting for
+      final SettableFuture<String> future1 = SettableFuture.create();
+      final SettableFuture<String> future2 = SettableFuture.create();
+
+      // Start the processor
+      final FutureWaitingProcessor futureWaitingProcessor = new 
FutureWaitingProcessor(future1, future2);
+      final ListenableFuture<List<String>> processorFuture = 
exec.runFully(futureWaitingProcessor, null);
+
+      // Resolve one, fail out one
+      future1.set("result1");
+      future2.setException(new RuntimeException("oops"));
+
+      // Verify exception
+      final ExecutionException e = 
Assert.assertThrows(ExecutionException.class, processorFuture::get);
+      MatcherAssert.assertThat(e.getCause(), 
CoreMatchers.instanceOf(RuntimeException.class));
+      MatcherAssert.assertThat(e.getCause(), 
ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("oops")));
+    }
   }
 
   public abstract static class BaseFrameProcessorExecutorTestSuite extends 
InitializedNullHandlingTest
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/ReturnOrAwaitTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/ReturnOrAwaitTest.java
index 8377cf0639c..0215b43f2c4 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/ReturnOrAwaitTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/ReturnOrAwaitTest.java
@@ -19,24 +19,27 @@
 
 package org.apache.druid.frame.processor;
 
+import com.google.common.util.concurrent.Futures;
 import it.unimi.dsi.fastutil.ints.IntSet;
-import nl.jqno.equalsverifier.EqualsVerifier;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
+
 public class ReturnOrAwaitTest
 {
   @Test
   public void testToString()
   {
-    Assert.assertEquals("await=any{0, 1}", ReturnOrAwait.awaitAny(IntSet.of(0, 
1)).toString());
-    Assert.assertEquals("await=all{0, 1}", 
ReturnOrAwait.awaitAll(2).toString());
+    Assert.assertEquals("await channels=any{0, 1}", 
ReturnOrAwait.awaitAny(IntSet.of(0, 1)).toString());
+    Assert.assertEquals("await channels=all{0, 1}", 
ReturnOrAwait.awaitAll(2).toString());
     Assert.assertEquals("return=xyzzy", 
ReturnOrAwait.returnObject("xyzzy").toString());
-  }
 
-  @Test
-  public void testEquals()
-  {
-    EqualsVerifier.forClass(ReturnOrAwait.class).usingGetClass().verify();
+    MatcherAssert.assertThat(
+        
ReturnOrAwait.awaitAllFutures(Collections.singletonList(Futures.immediateFuture(1))).toString(),
+        CoreMatchers.startsWith("await futures=[com.google.")
+    );
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/test/FutureWaitingProcessor.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/test/FutureWaitingProcessor.java
new file mode 100644
index 00000000000..a55e257675f
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/test/FutureWaitingProcessor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor.test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.java.util.common.ISE;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Processor that waits for two futures using {@link 
ReturnOrAwait#awaitAllFutures(Collection)}.
+ */
+public class FutureWaitingProcessor implements FrameProcessor<List<String>>
+{
+  private final ListenableFuture<String> future1;
+  private final ListenableFuture<String> future2;
+
+  public FutureWaitingProcessor(ListenableFuture<String> future1, 
ListenableFuture<String> future2)
+  {
+    this.future1 = future1;
+    this.future2 = future2;
+  }
+
+  private int runCount = 0;
+  private boolean cleanedUp;
+  private final List<String> results = new ArrayList<>();
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public ReturnOrAwait<List<String>> runIncrementally(IntSet readableInputs)
+  {
+    runCount++;
+
+    if (runCount == 1) {
+      // First run: wait for both futures
+      return ReturnOrAwait.awaitAllFutures(ImmutableList.of(future1, future2));
+    } else if (runCount == 2) {
+      // Second run: futures should be complete, collect results
+      Assert.assertTrue("future1 should be done", future1.isDone());
+      Assert.assertTrue("future2 should be done", future2.isDone());
+
+      try {
+        results.add(future1.get());
+        results.add(future2.get());
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+
+      return ReturnOrAwait.returnObject(results);
+    } else {
+      throw new ISE("Should not run more than twice");
+    }
+  }
+
+  @Override
+  public void cleanup()
+  {
+    cleanedUp = true;
+  }
+
+  public boolean isCleanedUp()
+  {
+    return cleanedUp;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java 
b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java
index c75b646bc8f..6a2021f8043 100644
--- a/server/src/main/java/org/apache/druid/discovery/DataServerClient.java
+++ b/server/src/main/java/org/apache/druid/discovery/DataServerClient.java
@@ -26,16 +26,17 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.client.JsonParserIterator;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.BaseSequence;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
-import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
 import org.apache.druid.query.Query;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.rpc.FixedServiceLocator;
+import org.apache.druid.rpc.IgnoreHttpResponseHandler;
 import org.apache.druid.rpc.RequestBuilder;
 import org.apache.druid.rpc.ServiceClient;
 import org.apache.druid.rpc.ServiceClientFactory;
@@ -43,30 +44,27 @@ import org.apache.druid.rpc.ServiceLocation;
 import org.apache.druid.rpc.StandardRetryPolicy;
 import org.apache.druid.utils.CloseableUtils;
 import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.joda.time.Duration;
 
 import java.io.InputStream;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Client to query data servers given a query.
  */
 public class DataServerClient
 {
-  private static final String BASE_PATH = "/druid/v2/";
   private static final Logger log = new Logger(DataServerClient.class);
+  private static final String BASE_PATH = "/druid/v2/";
+  private static final Duration CANCELLATION_TIMEOUT = 
Duration.standardSeconds(5);
+
   private final ServiceClient serviceClient;
   private final ObjectMapper objectMapper;
   private final ServiceLocation serviceLocation;
-  private final ScheduledExecutorService queryCancellationExecutor;
 
   public DataServerClient(
       ServiceClientFactory serviceClientFactory,
       ServiceLocation serviceLocation,
-      ObjectMapper objectMapper,
-      ScheduledExecutorService queryCancellationExecutor
+      ObjectMapper objectMapper
   )
   {
     this.serviceClient = serviceClientFactory.makeClient(
@@ -76,13 +74,23 @@ public class DataServerClient
     );
     this.serviceLocation = serviceLocation;
     this.objectMapper = objectMapper;
-    this.queryCancellationExecutor = queryCancellationExecutor;
   }
 
-  public <T> Sequence<T> run(Query<T> query, ResponseContext responseContext, 
JavaType queryResultType, Closer closer)
+  /**
+   * Issue a query. Returns a future that resolves when the server starts 
sending its response.
+   *
+   * @param query           query to run
+   * @param responseContext response context to populate
+   * @param queryResultType type of result object
+   * @param closer          closer; this call will register a query canceler 
with this closer
+   */
+  public <T> ListenableFuture<Sequence<T>> run(
+      final Query<T> query,
+      final ResponseContext responseContext,
+      final JavaType queryResultType,
+      final Closer closer
+  )
   {
-    final String cancelPath = BASE_PATH + query.getId();
-
     RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST, 
BASE_PATH);
     final boolean isSmile = objectMapper.getFactory() instanceof SmileFactory;
     if (isSmile) {
@@ -114,64 +122,72 @@ public class DataServerClient
           public void onFailure(Throwable t)
           {
             if (resultStreamFuture.isCancelled()) {
-              cancelQuery(query, cancelPath);
+              cancelQuery(query.getId());
             }
           }
         },
         Execs.directExecutor()
     );
 
-    return new BaseSequence<>(
-        new BaseSequence.IteratorMaker<T, JsonParserIterator<T>>()
+    return FutureUtils.transform(
+        resultStreamFuture,
+        resultStream -> new BaseSequence<>(
+            new BaseSequence.IteratorMaker<T, JsonParserIterator<T>>()
+            {
+              @Override
+              public JsonParserIterator<T> make()
+              {
+                return new JsonParserIterator<>(
+                    queryResultType,
+                    Futures.immediateFuture(resultStream),
+                    BASE_PATH,
+                    query,
+                    serviceLocation.getHost(),
+                    objectMapper
+                );
+              }
+
+              @Override
+              public void cleanup(JsonParserIterator<T> iterFromMake)
+              {
+                CloseableUtils.closeAndWrapExceptions(iterFromMake);
+              }
+            }
+        )
+    );
+  }
+
+  private void cancelQuery(final String queryId)
+  {
+    if (queryId == null) {
+      throw DruidException.defensive("Null queryId");
+    }
+
+    final String cancelPath = BASE_PATH + queryId;
+
+    final ListenableFuture<Void> cancelFuture = serviceClient.asyncRequest(
+        new RequestBuilder(HttpMethod.DELETE, 
cancelPath).timeout(CANCELLATION_TIMEOUT),
+        IgnoreHttpResponseHandler.INSTANCE
+    );
+
+    Futures.addCallback(
+        cancelFuture,
+        new FutureCallback<>()
         {
           @Override
-          public JsonParserIterator<T> make()
+          public void onSuccess(final Void result)
           {
-            return new JsonParserIterator<>(
-                queryResultType,
-                resultStreamFuture,
-                BASE_PATH,
-                query,
-                serviceLocation.getHost(),
-                objectMapper
-            );
+            // Do nothing on successful cancellation.
           }
 
           @Override
-          public void cleanup(JsonParserIterator<T> iterFromMake)
+          public void onFailure(final Throwable t)
           {
-            CloseableUtils.closeAndWrapExceptions(iterFromMake);
+            log.noStackTrace()
+               .warn(t, "Failed to cancel query[%s] on server[%s]", queryId, 
serviceLocation.getHostAndPort());
           }
-        }
+        },
+        Execs.directExecutor()
     );
   }
-
-  private void cancelQuery(Query<?> query, String cancelPath)
-  {
-    Runnable cancelRunnable = () -> {
-      Future<StatusResponseHolder> cancelFuture = serviceClient.asyncRequest(
-          new RequestBuilder(HttpMethod.DELETE, cancelPath),
-          StatusResponseHandler.getInstance());
-
-      Runnable checkRunnable = () -> {
-        try {
-          if (!cancelFuture.isDone()) {
-            log.error("Error cancelling query[%s]", query);
-          }
-          StatusResponseHolder response = cancelFuture.get();
-          if (response.getStatus().getCode() >= 500) {
-            log.error("Error cancelling query[%s]: queryable node returned 
status[%d] [%s].",
-                      query,
-                      response.getStatus().getCode(),
-                      response.getStatus().getReasonPhrase());
-          }
-        }
-        catch (ExecutionException | InterruptedException e) {
-          log.error(e, "Error cancelling query[%s]", query);
-        }
-      };
-      queryCancellationExecutor.schedule(checkRunnable, 5, TimeUnit.SECONDS);
-    };
-    queryCancellationExecutor.submit(cancelRunnable);
-  }
 }
diff --git a/server/src/main/java/org/apache/druid/rpc/ServiceLocation.java 
b/server/src/main/java/org/apache/druid/rpc/ServiceLocation.java
index 974f09fe89b..552e0d6e9ba 100644
--- a/server/src/main/java/org/apache/druid/rpc/ServiceLocation.java
+++ b/server/src/main/java/org/apache/druid/rpc/ServiceLocation.java
@@ -166,6 +166,18 @@ public class ServiceLocation
     return host;
   }
 
+  /**
+   * Returns a host:port string for the preferred port (TLS if available; 
plaintext otherwise).
+   */
+  public String getHostAndPort()
+  {
+    if (tlsPort > 0) {
+      return host + ":" + tlsPort;
+    } else {
+      return host + ":" + plaintextPort;
+    }
+  }
+
   public int getPlaintextPort()
   {
     return plaintextPort;
diff --git 
a/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java 
b/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java
index 7afdc948a7a..95ad46a580c 100644
--- a/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java
+++ b/server/src/test/java/org/apache/druid/discovery/DataServerClientTest.java
@@ -26,7 +26,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.QueryTimeoutException;
@@ -51,6 +50,7 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 import static org.apache.druid.query.Druids.newScanQueryBuilder;
 import static org.mockito.Mockito.mock;
@@ -81,13 +81,12 @@ public class DataServerClientTest
     target = new DataServerClient(
         serviceClientFactory,
         mock(ServiceLocation.class),
-        jsonMapper,
-        Execs.scheduledSingleThreaded("query-cancellation-executor")
+        jsonMapper
     );
   }
 
   @Test
-  public void testFetchSegmentFromDataServer() throws JsonProcessingException
+  public void testFetchSegmentFromDataServer() throws JsonProcessingException, 
ExecutionException, InterruptedException
   {
     ScanResultValue scanResultValue = new ScanResultValue(
         null,
@@ -112,7 +111,7 @@ public class DataServerClientTest
         responseContext,
         jsonMapper.getTypeFactory().constructType(ScanResultValue.class),
         Closer.create()
-    );
+    ).get();
 
     Assert.assertEquals(ImmutableList.of(scanResultValue), result.toList());
   }
@@ -170,7 +169,7 @@ public class DataServerClientTest
             responseContext,
             jsonMapper.getTypeFactory().constructType(ScanResultValue.class),
             Closer.create()
-        ).toList()
+        ).get().toList()
     );
   }
 
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
index 6c3cfccbccb..cbfd8ed86dc 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedDruidCluster.java
@@ -251,6 +251,7 @@ public class EmbeddedDruidCluster implements 
ClusterReferencesProvider, Embedded
         resource.onStarted(this);
       }
       catch (Exception e) {
+        log.warn(e, "Failed to start resource[%s]. Stopping cluster.", 
resource);
         // Clean up the resources that have already been started
         stop();
         throw e;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to