This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 520978e94e2 feat: Extension point for MSQ InputSpecs. (#19479)
520978e94e2 is described below

commit 520978e94e24ce6cd8bde2a4c158c68a61e9cd5d
Author: Gian Merlino <[email protected]>
AuthorDate: Thu May 21 12:29:41 2026 -0700

    feat: Extension point for MSQ InputSpecs. (#19479)
    
    This patch adds extension points for InputSpecSlicer and InputSliceReader,
    and uses them to implement TableInputSpec. This eliminates and generalizes
    the "newTableInputSpecSlicer" method on the ControllerContext, which was
    previously needed because the slicing logic differs for tasks and Dart.
---
 .../msq/dart/controller/DartControllerContext.java | 21 +++++--
 .../DartControllerContextFactoryImpl.java          |  9 +++
 .../DartTableInputSpecSlicerProvider.java          | 55 ++++++++++++++++
 .../druid/msq/dart/guice/DartControllerModule.java |  6 ++
 .../druid/msq/dart/guice/DartWorkerModule.java     |  7 +++
 .../DartSegmentsInputSliceReaderProvider.java      | 46 ++++++++++++++
 .../druid/msq/dart/worker/DartWorkerContext.java   | 17 +++--
 .../dart/worker/DartWorkerContextFactoryImpl.java  | 12 +++-
 .../apache/druid/msq/exec/ControllerContext.java   | 11 ++--
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 43 +++++++++----
 .../org/apache/druid/msq/exec/RunWorkOrder.java    | 28 +++++----
 .../org/apache/druid/msq/exec/WorkerContext.java   | 12 ++++
 .../org/apache/druid/msq/guice/MSQBinders.java     | 34 ++++++++++
 .../apache/druid/msq/guice/MSQIndexingModule.java  | 13 ++++
 .../msq/indexing/IndexerControllerContext.java     | 20 +++---
 .../IndexerSegmentsInputSliceReaderProvider.java   | 47 ++++++++++++++
 .../IndexerTableInputSpecSlicerProvider.java       | 73 ++++++++++++++++++++++
 .../druid/msq/indexing/IndexerWorkerContext.java   | 21 ++++++-
 .../druid/msq/input/InputSliceReaderProvider.java  | 40 ++++++++++++
 .../druid/msq/input/InputSpecSlicerProvider.java   | 46 ++++++++++++++
 .../dart/controller/DartControllerContextTest.java | 13 +++-
 .../druid/msq/exec/MSQCompactionTaskRunTest.java   |  2 +
 .../apache/druid/msq/exec/TestMSQSqlModule.java    |  9 +++
 .../druid/msq/sql/MSQTaskQueryMakerTest.java       |  7 ++-
 .../msq/test/AbstractDartComponentSupplier.java    |  3 -
 .../org/apache/druid/msq/test/MSQTestBase.java     |  5 +-
 .../druid/msq/test/MSQTestControllerContext.java   | 14 ++---
 .../test/TestDartControllerContextFactoryImpl.java | 16 ++++-
 28 files changed, 560 insertions(+), 70 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
index 52936b8e0f8..ec748a74b7d 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
@@ -40,7 +40,7 @@ import org.apache.druid.msq.exec.WorkerManager;
 import org.apache.druid.msq.indexing.IndexerControllerContext;
 import org.apache.druid.msq.indexing.MSQSpec;
 import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
-import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
 import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.QueryContext;
@@ -88,8 +88,9 @@ public class DartControllerContext implements 
ControllerContext
   private final DartWorkerClient workerClient;
   private final TimelineServerView serverView;
   private final MemoryIntrospector memoryIntrospector;
-  private final QueryContext context;
+  private final List<InputSpecSlicerProvider> inputSpecSlicerProviders;
   private final ServiceEmitter emitter;
+  private final QueryContext context;
 
   public DartControllerContext(
       final Injector injector,
@@ -98,6 +99,7 @@ public class DartControllerContext implements 
ControllerContext
       final DartWorkerClient workerClient,
       final MemoryIntrospector memoryIntrospector,
       final TimelineServerView serverView,
+      final List<InputSpecSlicerProvider> inputSpecSlicerProviders,
       final ServiceEmitter emitter,
       final QueryContext context
   )
@@ -108,8 +110,9 @@ public class DartControllerContext implements 
ControllerContext
     this.workerClient = workerClient;
     this.serverView = serverView;
     this.memoryIntrospector = memoryIntrospector;
-    this.context = context;
+    this.inputSpecSlicerProviders = inputSpecSlicerProviders;
     this.emitter = emitter;
+    this.context = context;
   }
 
   @Override
@@ -190,9 +193,9 @@ public class DartControllerContext implements 
ControllerContext
   }
 
   @Override
-  public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager)
+  public List<InputSpecSlicerProvider> inputSpecSlicerProviders()
   {
-    return 
DartTableInputSpecSlicer.createFromWorkerIds(workerManager.getWorkerIds(), 
serverView, context);
+    return inputSpecSlicerProviders;
   }
 
   @Override
@@ -260,4 +263,12 @@ public class DartControllerContext implements 
ControllerContext
   {
     return context.isDebug();
   }
+
+  /**
+   * Getter for {@link DartTableInputSpecSlicerProvider} to retrieve the 
server view.
+   */
+  TimelineServerView serverView()
+  {
+    return serverView;
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java
index 87582d977aa..260d6156c8c 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContextFactoryImpl.java
@@ -28,14 +28,19 @@ import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.msq.dart.Dart;
 import org.apache.druid.msq.dart.worker.DartWorkerClientImpl;
 import org.apache.druid.msq.exec.ControllerContext;
 import org.apache.druid.msq.exec.MemoryIntrospector;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.server.DruidNode;
 
+import java.util.List;
+import java.util.Set;
+
 public class DartControllerContextFactoryImpl implements 
DartControllerContextFactory
 {
   protected final Injector injector;
@@ -45,6 +50,7 @@ public class DartControllerContextFactoryImpl implements 
DartControllerContextFa
   protected final ServiceClientFactory serviceClientFactory;
   protected final TimelineServerView serverView;
   protected final MemoryIntrospector memoryIntrospector;
+  protected final List<InputSpecSlicerProvider> inputSpecSlicerProviders;
   protected final ServiceEmitter emitter;
 
   @Inject
@@ -56,6 +62,7 @@ public class DartControllerContextFactoryImpl implements 
DartControllerContextFa
       @EscalatedGlobal final ServiceClientFactory serviceClientFactory,
       final MemoryIntrospector memoryIntrospector,
       final TimelineServerView serverView,
+      @Dart final Set<InputSpecSlicerProvider> inputSpecSlicerProviders,
       final ServiceEmitter emitter
   )
   {
@@ -66,6 +73,7 @@ public class DartControllerContextFactoryImpl implements 
DartControllerContextFa
     this.serviceClientFactory = serviceClientFactory;
     this.serverView = serverView;
     this.memoryIntrospector = memoryIntrospector;
+    this.inputSpecSlicerProviders = List.copyOf(inputSpecSlicerProviders);
     this.emitter = emitter;
   }
 
@@ -80,6 +88,7 @@ public class DartControllerContextFactoryImpl implements 
DartControllerContextFa
         new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, 
selfNode.getHostAndPortToUse()),
         memoryIntrospector,
         serverView,
+        inputSpecSlicerProviders,
         emitter,
         context
     );
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java
new file mode 100644
index 00000000000..13dcb49e519
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartTableInputSpecSlicerProvider.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.controller;
+
+import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
+import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.query.QueryContext;
+
+import java.util.List;
+
+/**
+ * Controller-side provider for {@link TableInputSpec} in Dart.
+ */
+public class DartTableInputSpecSlicerProvider implements 
InputSpecSlicerProvider
+{
+  @Override
+  public Class<? extends InputSpec> specClass()
+  {
+    return TableInputSpec.class;
+  }
+
+  @Override
+  public InputSpecSlicer createSlicer(
+      ControllerContext controllerContext,
+      QueryContext queryContext,
+      List<String> workerIds
+  )
+  {
+    return DartTableInputSpecSlicer.createFromWorkerIds(
+        workerIds,
+        ((DartControllerContext) controllerContext).serverView(),
+        queryContext
+    );
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
index 8a9b1cf3607..0937888b0a9 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartControllerModule.java
@@ -47,11 +47,13 @@ import 
org.apache.druid.msq.dart.controller.DartControllerContextFactoryImpl;
 import org.apache.druid.msq.dart.controller.DartControllerRegistry;
 import org.apache.druid.msq.dart.controller.DartMessageRelayFactoryImpl;
 import org.apache.druid.msq.dart.controller.DartMessageRelays;
+import org.apache.druid.msq.dart.controller.DartTableInputSpecSlicerProvider;
 import org.apache.druid.msq.dart.controller.http.DartQueryInfo;
 import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactory;
 import org.apache.druid.msq.dart.controller.sql.DartSqlClientFactoryImpl;
 import org.apache.druid.msq.dart.controller.sql.DartSqlClients;
 import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
+import org.apache.druid.msq.guice.MSQBinders;
 import org.apache.druid.msq.rpc.ResourcePermissionMapper;
 import org.apache.druid.query.DefaultQueryConfig;
 import org.apache.druid.query.QueryConfigProvider;
@@ -111,6 +113,10 @@ public class DartControllerModule implements DruidModule
                  .addBinding()
                  .to(DartSqlEngine.class)
                  .in(LazySingleton.class);
+      MSQBinders.inputSpecSlicerProviderBinder(binder, Dart.class)
+                .addBinding()
+                .to(DartTableInputSpecSlicerProvider.class)
+                .in(LazySingleton.class);
     }
 
     @Provides
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
index e98d0ec2195..5ee9aa1f6ee 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/guice/DartWorkerModule.java
@@ -53,11 +53,13 @@ import 
org.apache.druid.msq.dart.controller.http.DartQueryInfo;
 import org.apache.druid.msq.dart.controller.messages.ControllerMessage;
 import org.apache.druid.msq.dart.controller.sql.DartSqlEngine;
 import org.apache.druid.msq.dart.worker.DartDataServerQueryHandlerFactory;
+import org.apache.druid.msq.dart.worker.DartSegmentsInputSliceReaderProvider;
 import org.apache.druid.msq.dart.worker.DartWorkerContextFactory;
 import org.apache.druid.msq.dart.worker.DartWorkerContextFactoryImpl;
 import org.apache.druid.msq.dart.worker.DartWorkerRunner;
 import org.apache.druid.msq.dart.worker.http.DartWorkerResource;
 import org.apache.druid.msq.exec.MemoryIntrospector;
+import org.apache.druid.msq.guice.MSQBinders;
 import org.apache.druid.msq.rpc.ResourcePermissionMapper;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.rpc.ServiceClientFactory;
@@ -104,6 +106,11 @@ public class DartWorkerModule implements DruidModule
       binder.bind(ResourcePermissionMapper.class)
             .annotatedWith(Dart.class)
             .to(DartResourcePermissionMapper.class);
+
+      MSQBinders.inputSliceReaderProviderBinder(binder, Dart.class)
+                .addBinding()
+                .to(DartSegmentsInputSliceReaderProvider.class)
+                .in(LazySingleton.class);
     }
 
     @Provides
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java
new file mode 100644
index 00000000000..7421e87f0a3
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartSegmentsInputSliceReaderProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.dart.worker;
+
+import org.apache.druid.msq.exec.FrameContext;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSliceReader;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
+import org.apache.druid.msq.input.table.SegmentsInputSlice;
+import org.apache.druid.msq.input.table.SegmentsInputSliceReader;
+import org.apache.druid.query.QueryContext;
+
+/**
+ * Worker-side provider for {@link SegmentsInputSlice} in Dart.
+ */
+public class DartSegmentsInputSliceReaderProvider implements 
InputSliceReaderProvider
+{
+  @Override
+  public Class<? extends InputSlice> sliceClass()
+  {
+    return SegmentsInputSlice.class;
+  }
+
+  @Override
+  public InputSliceReader createReader(FrameContext frameContext, QueryContext 
queryContext)
+  {
+    return new SegmentsInputSliceReader(frameContext, false);
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
index 768801261d1..39888b75717 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContext.java
@@ -43,12 +43,12 @@ import org.apache.druid.msq.exec.WorkerClient;
 import org.apache.druid.msq.exec.WorkerContext;
 import org.apache.druid.msq.exec.WorkerMemoryParameters;
 import org.apache.druid.msq.exec.WorkerStorageParameters;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
 import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryContexts;
-import org.apache.druid.query.groupby.GroupingEngine;
 import org.apache.druid.query.policy.PolicyEnforcer;
 import org.apache.druid.segment.SegmentWrangler;
 import org.apache.druid.server.DruidNode;
@@ -57,6 +57,7 @@ import org.apache.druid.utils.CloseableUtils;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
 import java.io.File;
+import java.util.List;
 
 /**
  * Dart implementation of {@link WorkerContext}.
@@ -79,7 +80,6 @@ public class DartWorkerContext implements WorkerContext
   private final Injector injector;
   private final DartWorkerClient workerClient;
   private final SegmentWrangler segmentWrangler;
-  private final GroupingEngine groupingEngine;
   private final SegmentManager segmentManager;
   private final CoordinatorClient coordinatorClient;
   private final MemoryIntrospector memoryIntrospector;
@@ -96,6 +96,7 @@ public class DartWorkerContext implements WorkerContext
   @MonotonicNonNull
   private volatile ResourceHolder<ProcessingBuffersSet> processingBuffersSet;
   private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
+  private final List<InputSliceReaderProvider> inputSliceReaderProviders;
 
   DartWorkerContext(
       final String queryId,
@@ -107,7 +108,6 @@ public class DartWorkerContext implements WorkerContext
       final DartWorkerClient workerClient,
       final DruidProcessingConfig processingConfig,
       final SegmentWrangler segmentWrangler,
-      final GroupingEngine groupingEngine,
       final SegmentManager segmentManager,
       final CoordinatorClient coordinatorClient,
       final MemoryIntrospector memoryIntrospector,
@@ -116,7 +116,8 @@ public class DartWorkerContext implements WorkerContext
       final File tempDir,
       final QueryContext queryContext,
       final DataServerQueryHandlerFactory dataServerQueryHandlerFactory,
-      final ServiceEmitter emitter
+      final ServiceEmitter emitter,
+      final List<InputSliceReaderProvider> inputSliceReaderProviders
   )
   {
     this.queryId = queryId;
@@ -129,7 +130,6 @@ public class DartWorkerContext implements WorkerContext
     this.injector = injector;
     this.workerClient = workerClient;
     this.segmentWrangler = segmentWrangler;
-    this.groupingEngine = groupingEngine;
     this.segmentManager = segmentManager;
     this.coordinatorClient = coordinatorClient;
     this.memoryIntrospector = memoryIntrospector;
@@ -138,6 +138,7 @@ public class DartWorkerContext implements WorkerContext
     this.tempDir = tempDir;
     this.queryContext = Preconditions.checkNotNull(queryContext, 
"queryContext");
     this.emitter = emitter;
+    this.inputSliceReaderProviders = inputSliceReaderProviders;
 
     // Compute thread count once in constructor
     final int baseThreadCount = processingConfig.getNumThreads();
@@ -175,6 +176,12 @@ public class DartWorkerContext implements WorkerContext
     return injector;
   }
 
+  @Override
+  public List<InputSliceReaderProvider> inputSliceReaderProviders()
+  {
+    return inputSliceReaderProviders;
+  }
+
   @Override
   public void registerWorker(Worker worker, Closer closer)
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
index 07117c80f19..dc74cea3513 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
@@ -34,6 +34,7 @@ import 
org.apache.druid.msq.dart.controller.messages.ControllerMessage;
 import org.apache.druid.msq.exec.MemoryIntrospector;
 import org.apache.druid.msq.exec.ProcessingBuffersProvider;
 import org.apache.druid.msq.exec.WorkerContext;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
 import org.apache.druid.query.DruidProcessingConfig;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.groupby.GroupingEngine;
@@ -44,6 +45,8 @@ import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.SegmentManager;
 
 import java.io.File;
+import java.util.List;
+import java.util.Set;
 
 /**
  * Production implementation of {@link DartWorkerContextFactory}.
@@ -66,6 +69,7 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
   private final Outbox<ControllerMessage> outbox;
   private final DartDataServerQueryHandlerFactory 
dataServerQueryHandlerFactory;
   private final ServiceEmitter emitter;
+  private final List<InputSliceReaderProvider> inputSliceReaderProviders;
 
   @Inject
   public DartWorkerContextFactoryImpl(
@@ -84,7 +88,8 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
       @Dart ProcessingBuffersProvider processingBuffersProvider,
       Outbox<ControllerMessage> outbox,
       DartDataServerQueryHandlerFactory dataServerQueryHandlerFactory,
-      ServiceEmitter emitter
+      ServiceEmitter emitter,
+      @Dart Set<InputSliceReaderProvider> inputSliceReaderProviders
   )
   {
     this.selfNode = selfNode;
@@ -103,6 +108,7 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
     this.outbox = outbox;
     this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
     this.emitter = emitter;
+    this.inputSliceReaderProviders = List.copyOf(inputSliceReaderProviders);
   }
 
   @Override
@@ -123,7 +129,6 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
         createWorkerClient(queryId),
         processingConfig,
         segmentWrangler,
-        groupingEngine,
         segmentManager,
         coordinatorClient,
         memoryIntrospector,
@@ -132,7 +137,8 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
         tempDir,
         queryContext,
         dataServerQueryHandlerFactory,
-        emitter
+        emitter,
+        inputSliceReaderProviders
     );
   }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
index e12c5c3b336..013df33fad1 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerContext.java
@@ -26,14 +26,14 @@ import 
org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.msq.indexing.MSQSpec;
-import org.apache.druid.msq.input.InputSpecSlicer;
-import org.apache.druid.msq.input.table.SegmentsInputSlice;
-import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
 import org.apache.druid.msq.kernel.ShuffleSpec;
 import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
 import org.apache.druid.server.DruidNode;
 
 import java.io.File;
+import java.util.List;
 
 /**
  * Context used by multi-stage query controllers. Useful because it allows 
test fixtures to provide their own
@@ -82,9 +82,10 @@ public interface ControllerContext
   DruidNode selfNode();
 
   /**
-   * Provides an {@link InputSpecSlicer} that slices {@link TableInputSpec} 
into {@link SegmentsInputSlice}.
+   * Extension point for {@link InputSpec} beyond the builtin ones provided by
+   * {@link ControllerImpl#makeInputSpecSlicerFactory}.
    */
-  InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager);
+  List<InputSpecSlicerProvider> inputSpecSlicerProviders();
 
   /**
    * Provide access to segment actions in the Overlord. Only called for 
ingestion queries, i.e., where
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 83568629be1..db9b3cbc9fb 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -139,6 +139,7 @@ import 
org.apache.druid.msq.indexing.report.MSQTaskReportPayload;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.InputSpecSlicer;
 import org.apache.druid.msq.input.InputSpecSlicerFactory;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
 import org.apache.druid.msq.input.MapInputSpecSlicer;
 import org.apache.druid.msq.input.external.ExternalInputSpec;
 import org.apache.druid.msq.input.external.ExternalInputSpecSlicer;
@@ -148,7 +149,6 @@ import org.apache.druid.msq.input.lookup.LookupInputSpec;
 import org.apache.druid.msq.input.lookup.LookupInputSpecSlicer;
 import org.apache.druid.msq.input.stage.StageInputSpec;
 import org.apache.druid.msq.input.stage.StageInputSpecSlicer;
-import org.apache.druid.msq.input.table.TableInputSpec;
 import org.apache.druid.msq.kernel.QueryDefinition;
 import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.kernel.StageId;
@@ -208,6 +208,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -423,8 +424,11 @@ public class ControllerImpl implements Controller
       closer.register(workerSketchFetcher::close);
 
       // Execution-related: run the multi-stage QueryDefinition.
-      final InputSpecSlicerFactory inputSpecSlicerFactory =
-          
makeInputSpecSlicerFactory(context.newTableInputSpecSlicer(workerManager));
+      final InputSpecSlicerFactory inputSpecSlicerFactory = 
makeInputSpecSlicerFactory(
+          context,
+          workerManager.getWorkerIds(),
+          getQueryContext()
+      );
 
       final Pair<ControllerQueryKernel, ListenableFuture<?>> queryRunResult =
           new RunQueryUntilDone(
@@ -2149,17 +2153,30 @@ public class ControllerImpl implements Controller
     );
   }
 
-  private static InputSpecSlicerFactory makeInputSpecSlicerFactory(final 
InputSpecSlicer tableInputSpecSlicer)
+  private static InputSpecSlicerFactory makeInputSpecSlicerFactory(
+      final ControllerContext controllerContext,
+      final List<String> workerIds,
+      final QueryContext queryContext
+  )
   {
-    return (stagePartitionsMap, stageOutputChannelModeMap) -> new 
MapInputSpecSlicer(
-        ImmutableMap.<Class<? extends InputSpec>, InputSpecSlicer>builder()
-                    .put(StageInputSpec.class, new 
StageInputSpecSlicer(stagePartitionsMap, stageOutputChannelModeMap))
-                    .put(ExternalInputSpec.class, new 
ExternalInputSpecSlicer())
-                    .put(InlineInputSpec.class, new InlineInputSpecSlicer())
-                    .put(LookupInputSpec.class, new LookupInputSpecSlicer())
-                    .put(TableInputSpec.class, tableInputSpecSlicer)
-                    .build()
-    );
+    return (stagePartitionsMap, stageOutputChannelModeMap) -> {
+      Map<Class<? extends InputSpec>, InputSpecSlicer> slicers = new 
LinkedHashMap<>();
+
+      slicers.put(StageInputSpec.class, new 
StageInputSpecSlicer(stagePartitionsMap, stageOutputChannelModeMap));
+      slicers.put(ExternalInputSpec.class, new ExternalInputSpecSlicer());
+      slicers.put(InlineInputSpec.class, new InlineInputSpecSlicer());
+      slicers.put(LookupInputSpec.class, new LookupInputSpecSlicer());
+
+      // Context-supplied providers override the default ones, so they get 
added last.
+      for (final InputSpecSlicerProvider slicerProvider : 
controllerContext.inputSpecSlicerProviders()) {
+        slicers.put(
+            slicerProvider.specClass(),
+            slicerProvider.createSlicer(controllerContext, queryContext, 
workerIds)
+        );
+      }
+
+      return new MapInputSpecSlicer(slicers);
+    };
   }
 
   private static Map<Integer, Interval> copyOfStageRuntimesEndingAtCurrentTime(
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index ba613768781..20593f26621 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -21,7 +21,6 @@ package org.apache.druid.msq.exec;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -48,6 +47,7 @@ import org.apache.druid.msq.indexing.error.CanceledFault;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.input.InputSlice;
 import org.apache.druid.msq.input.InputSliceReader;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
 import org.apache.druid.msq.input.MapInputSliceReader;
 import org.apache.druid.msq.input.NilInputSlice;
 import org.apache.druid.msq.input.NilInputSliceReader;
@@ -71,6 +71,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
@@ -399,16 +400,21 @@ public class RunWorkOrder
   private InputSliceReader makeInputSliceReader()
   {
     final boolean reindex = 
MultiStageQueryContext.isReindex(workOrder.getWorkerContext());
-    return new MapInputSliceReader(
-        ImmutableMap.<Class<? extends InputSlice>, InputSliceReader>builder()
-                    .put(NilInputSlice.class, NilInputSliceReader.INSTANCE)
-                    .put(StageInputSlice.class, StageInputSliceReader.INSTANCE)
-                    .put(ExternalInputSlice.class, new 
ExternalInputSliceReader(frameContext.tempDir("external")))
-                    .put(InlineInputSlice.class, new 
InlineInputSliceReader(frameContext.segmentWrangler()))
-                    .put(LookupInputSlice.class, new 
LookupInputSliceReader(frameContext.segmentWrangler()))
-                    .put(SegmentsInputSlice.class, new 
SegmentsInputSliceReader(frameContext, reindex))
-                    .build()
-    );
+    LinkedHashMap<Class<? extends InputSlice>, InputSliceReader> readers = new 
LinkedHashMap<>();
+    readers.put(NilInputSlice.class, NilInputSliceReader.INSTANCE);
+    readers.put(StageInputSlice.class, StageInputSliceReader.INSTANCE);
+    readers.put(ExternalInputSlice.class, new 
ExternalInputSliceReader(frameContext.tempDir("external")));
+    readers.put(InlineInputSlice.class, new 
InlineInputSliceReader(frameContext.segmentWrangler()));
+    readers.put(LookupInputSlice.class, new 
LookupInputSliceReader(frameContext.segmentWrangler()));
+    readers.put(SegmentsInputSlice.class, new 
SegmentsInputSliceReader(frameContext, reindex));
+
+    // Context-supplied providers override the default ones, so they get added 
last.
+    for (final InputSliceReaderProvider readerProvider : 
workerContext.inputSliceReaderProviders()) {
+      final InputSliceReader reader = 
readerProvider.createReader(frameContext, workOrder.getWorkerContext());
+      readers.put(readerProvider.sliceClass(), reader);
+    }
+
+    return new MapInputSliceReader(readers);
   }
 
   private OutputChannelFactory makeStageOutputChannelFactory()
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
index 9e696a16cce..b225112236d 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
@@ -24,6 +24,8 @@ import com.google.inject.Injector;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.msq.indexing.MSQWorkerTask;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
 import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.policy.PolicyEnforcer;
@@ -31,6 +33,7 @@ import org.apache.druid.server.DruidNode;
 
 import java.io.Closeable;
 import java.io.File;
+import java.util.List;
 
 /**
  * Context used by multi-stage query workers.
@@ -121,6 +124,15 @@ public interface WorkerContext extends Closeable
    */
   boolean isDebug();
 
+  /**
+   * Extension point for additional {@link InputSlice} beyond those provided by
+   * {@link RunWorkOrder#makeInputSliceReader()}.
+   */
+  default List<InputSliceReaderProvider> inputSliceReaderProviders()
+  {
+    return List.of();
+  }
+
   @Override
   void close();
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java
index 0a6b0827324..e3cd3e4028e 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQBinders.java
@@ -20,11 +20,20 @@
 package org.apache.druid.msq.guice;
 
 import com.google.inject.Binder;
+import com.google.inject.Key;
 import com.google.inject.TypeLiteral;
 import com.google.inject.multibindings.MapBinder;
+import com.google.inject.multibindings.Multibinder;
+import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.msq.dart.Dart;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
 import org.apache.druid.msq.querykit.QueryKit;
 import org.apache.druid.query.Query;
 
+import java.lang.annotation.Annotation;
+
 /**
  * Utility class for MSQ-related Guice bindings.
  */
@@ -50,4 +59,29 @@ public class MSQBinders
         new TypeLiteral<>() {}
     );
   }
+
+  /**
+   * Bind an {@link InputSpecSlicerProvider} for use on a controller. The 
annotation should be
+   * {@link IndexingService} for providers used by tasks, or {@link Dart} for 
providers used by Dart.
+   */
+  public static Multibinder<InputSpecSlicerProvider> 
inputSpecSlicerProviderBinder(
+      Binder binder,
+      Class<? extends Annotation> annotation
+  )
+  {
+    return Multibinder.newSetBinder(binder, 
Key.get(InputSpecSlicerProvider.class, annotation));
+  }
+
+  /**
+   * Bind an {@link InputSliceReaderProvider} for use on a worker, to handle a 
particular {@link InputSlice}.
+   * The annotation should be {@link IndexingService} for providers used by 
tasks, or {@link Dart} for providers
+   * used by Dart.
+   */
+  public static Multibinder<InputSliceReaderProvider> 
inputSliceReaderProviderBinder(
+      Binder binder,
+      Class<? extends Annotation> annotation
+  )
+  {
+    return Multibinder.newSetBinder(binder, 
Key.get(InputSliceReaderProvider.class, annotation));
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
index 629754f2a9a..8781fa5bb0f 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
@@ -27,6 +27,7 @@ import com.google.inject.Binder;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Provides;
+import org.apache.druid.client.indexing.IndexingService;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.initialization.DruidModule;
@@ -40,6 +41,8 @@ import org.apache.druid.msq.counters.StorageCounters;
 import org.apache.druid.msq.counters.SuperSorterProgressTrackerCounter;
 import org.apache.druid.msq.counters.WarningCounters;
 import org.apache.druid.msq.indexing.IndexerControllerContextFactory;
+import org.apache.druid.msq.indexing.IndexerSegmentsInputSliceReaderProvider;
+import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicerProvider;
 import org.apache.druid.msq.indexing.MSQCompactionRunner;
 import org.apache.druid.msq.indexing.MSQControllerTask;
 import org.apache.druid.msq.indexing.MSQWorkerTask;
@@ -250,6 +253,16 @@ public class MSQIndexingModule implements DruidModule
               .addBinding(WindowOperatorQuery.class)
               .to(WindowOperatorQueryKit.class);
     binder.bind(WindowOperatorQueryKit.class).in(LazySingleton.class);
+
+    MSQBinders.inputSpecSlicerProviderBinder(binder, IndexingService.class)
+              .addBinding()
+              .to(IndexerTableInputSpecSlicerProvider.class)
+              .in(LazySingleton.class);
+
+    MSQBinders.inputSliceReaderProviderBinder(binder, IndexingService.class)
+              .addBinding()
+              .to(IndexerSegmentsInputSliceReaderProvider.class)
+              .in(LazySingleton.class);
   }
 
   @Provides
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 18aebe0bcd2..5ea69d8c6a0 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Injector;
 import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
+import org.apache.druid.client.indexing.IndexingService;
 import org.apache.druid.frame.FrameType;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.indexing.common.TaskLockType;
@@ -48,7 +50,7 @@ import 
org.apache.druid.msq.indexing.destination.MSQDestination;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.MSQWarnings;
 import org.apache.druid.msq.indexing.error.UnknownFault;
-import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
 import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
 import org.apache.druid.msq.util.MultiStageQueryContext;
@@ -66,7 +68,9 @@ import org.apache.druid.storage.StorageConnectorProvider;
 import java.io.File;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
@@ -90,6 +94,7 @@ public class IndexerControllerContext implements 
ControllerContext
   private final ServiceClientFactory clientFactory;
   private final OverlordClient overlordClient;
   private final MemoryIntrospector memoryIntrospector;
+  private final List<InputSpecSlicerProvider> inputSpecSlicerProviders;
 
   public IndexerControllerContext(
       final MSQControllerTask task,
@@ -110,6 +115,9 @@ public class IndexerControllerContext implements 
ControllerContext
     this.memoryIntrospector = injector.getInstance(MemoryIntrospector.class);
     final StorageConnectorProvider storageConnectorProvider = 
injector.getInstance(Key.get(StorageConnectorProvider.class, 
MultiStageQuery.class));
     final StorageConnector storageConnector = 
storageConnectorProvider.createStorageConnector(toolbox.getIndexingTmpDir());
+    final Set<InputSpecSlicerProvider> inputSpecSlicerProviders =
+        injector.getInstance(Key.get(new TypeLiteral<>() {}, 
IndexingService.class));
+    this.inputSpecSlicerProviders = List.copyOf(inputSpecSlicerProviders);
     this.injector = injector.createChildInjector(
         binder -> binder.bind(Key.get(StorageConnector.class, 
MultiStageQuery.class))
                         .toInstance(storageConnector));
@@ -174,15 +182,9 @@ public class IndexerControllerContext implements 
ControllerContext
   }
 
   @Override
-  public InputSpecSlicer newTableInputSpecSlicer(final WorkerManager 
workerManager)
+  public List<InputSpecSlicerProvider> inputSpecSlicerProviders()
   {
-    final SegmentSource includeSegmentSource =
-        MultiStageQueryContext.getSegmentSources(taskQuerySpecContext, 
DEFAULT_SEGMENT_SOURCE);
-    return new IndexerTableInputSpecSlicer(
-        toolbox.getCoordinatorClient(),
-        toolbox.getTaskActionClient(),
-        includeSegmentSource
-    );
+    return inputSpecSlicerProviders;
   }
 
   @Override
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java
new file mode 100644
index 00000000000..d27a7852cac
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerSegmentsInputSliceReaderProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import org.apache.druid.msq.exec.FrameContext;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSliceReader;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
+import org.apache.druid.msq.input.table.SegmentsInputSlice;
+import org.apache.druid.msq.input.table.SegmentsInputSliceReader;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.QueryContext;
+
+/**
+ * Worker-side provider for {@link SegmentsInputSlice} in tasks.
+ */
+public class IndexerSegmentsInputSliceReaderProvider implements 
InputSliceReaderProvider
+{
+  @Override
+  public Class<? extends InputSlice> sliceClass()
+  {
+    return SegmentsInputSlice.class;
+  }
+
+  @Override
+  public InputSliceReader createReader(FrameContext frameContext, QueryContext 
queryContext)
+  {
+    return new SegmentsInputSliceReader(frameContext, 
MultiStageQueryContext.isReindex(queryContext));
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java
new file mode 100644
index 00000000000..052d5c7691c
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerTableInputSpecSlicerProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import com.google.inject.Inject;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.msq.exec.SegmentSource;
+import org.apache.druid.msq.input.InputSpec;
+import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
+import org.apache.druid.msq.input.table.TableInputSpec;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.rpc.StandardRetryPolicy;
+
+import java.util.List;
+
+/**
+ * Controller-side provider for {@link TableInputSpec} in tasks.
+ */
+public class IndexerTableInputSpecSlicerProvider implements 
InputSpecSlicerProvider
+{
+  private final CoordinatorClient coordinatorClient;
+
+  @Inject
+  public IndexerTableInputSpecSlicerProvider(CoordinatorClient 
coordinatorClient)
+  {
+    // Use the "aboutAnHour" retry policy, same as the one used in the 
TaskToolboxFactory. This prevents
+    // long-running tasks from failing if there are Coordinator/Overlord 
problems. Calls will still fail
+    // eventually if problems persist.
+    this.coordinatorClient = 
coordinatorClient.withRetryPolicy(StandardRetryPolicy.aboutAnHour());
+  }
+
+  @Override
+  public Class<? extends InputSpec> specClass()
+  {
+    return TableInputSpec.class;
+  }
+
+  @Override
+  public InputSpecSlicer createSlicer(
+      ControllerContext controllerContext,
+      QueryContext queryContext,
+      List<String> workerIds
+  )
+  {
+    final SegmentSource includeSegmentSource =
+        MultiStageQueryContext.getSegmentSources(queryContext, 
IndexerControllerContext.DEFAULT_SEGMENT_SOURCE);
+    return new IndexerTableInputSpecSlicer(
+        coordinatorClient,
+        controllerContext.taskActionClient(),
+        includeSegmentSource
+    );
+  }
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 2589d0c0dab..82d462fd096 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -22,7 +22,9 @@ package org.apache.druid.msq.indexing;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Injector;
 import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
 import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.indexing.IndexingService;
 import org.apache.druid.collections.ResourceHolder;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.guice.annotations.Smile;
@@ -46,6 +48,7 @@ import org.apache.druid.msq.guice.MultiStageQuery;
 import org.apache.druid.msq.indexing.client.IndexerControllerClient;
 import org.apache.druid.msq.indexing.client.IndexerWorkerClient;
 import org.apache.druid.msq.indexing.client.WorkerChatHandler;
+import org.apache.druid.msq.input.InputSliceReaderProvider;
 import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.QueryContext;
@@ -66,6 +69,8 @@ import org.apache.druid.storage.StorageConnectorProvider;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.util.List;
+import java.util.Set;
 
 public class IndexerWorkerContext implements WorkerContext
 {
@@ -90,6 +95,7 @@ public class IndexerWorkerContext implements WorkerContext
   private final ServiceClientFactory clientFactory;
   private final MemoryIntrospector memoryIntrospector;
   private final ProcessingBuffersProvider processingBuffersProvider;
+  private final List<InputSliceReaderProvider> inputSliceReaderProviders;
   private final int maxConcurrentStages;
   private final boolean liveReportCounters;
   private final boolean includeAllCounters;
@@ -112,7 +118,8 @@ public class IndexerWorkerContext implements WorkerContext
       final ServiceClientFactory clientFactory,
       final MemoryIntrospector memoryIntrospector,
       final ProcessingBuffersProvider processingBuffersProvider,
-      final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory
+      final IndexerDataServerQueryHandlerFactory dataServerQueryHandlerFactory,
+      final List<InputSliceReaderProvider> inputSliceReaderProviders
   )
   {
     this.task = task;
@@ -127,6 +134,7 @@ public class IndexerWorkerContext implements WorkerContext
     this.memoryIntrospector = memoryIntrospector;
     this.processingBuffersProvider = processingBuffersProvider;
     this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
+    this.inputSliceReaderProviders = inputSliceReaderProviders;
 
     final QueryContext queryContext = QueryContext.of(task.getContext());
     this.maxConcurrentStages = 
MultiStageQueryContext.getMaxConcurrentStagesWithDefault(
@@ -171,6 +179,8 @@ public class IndexerWorkerContext implements WorkerContext
         
injector.getInstance(OverlordClient.class).withRetryPolicy(StandardRetryPolicy.unlimited());
     final ProcessingBuffersProvider processingBuffersProvider = 
injector.getInstance(ProcessingBuffersProvider.class);
     final ObjectMapper smileMapper = 
injector.getInstance(Key.get(ObjectMapper.class, Smile.class));
+    final Set<InputSliceReaderProvider> inputSliceReaderProviders =
+        injector.getInstance(Key.get(new TypeLiteral<>() {}, 
IndexingService.class));
 
     return new IndexerWorkerContext(
         task,
@@ -189,7 +199,8 @@ public class IndexerWorkerContext implements WorkerContext
             toolbox.getCoordinatorClient(),
             serviceClientFactory,
             smileMapper
-        )
+        ),
+        List.copyOf(inputSliceReaderProviders)
     );
   }
 
@@ -228,6 +239,12 @@ public class IndexerWorkerContext implements WorkerContext
     return injector;
   }
 
+  @Override
+  public List<InputSliceReaderProvider> inputSliceReaderProviders()
+  {
+    return inputSliceReaderProviders;
+  }
+
   @Override
   public void emitMetric(MSQMetricEventBuilder metricBuilder)
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java
new file mode 100644
index 00000000000..8dd71efe593
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSliceReaderProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.input;
+
+import org.apache.druid.msq.exec.FrameContext;
+import org.apache.druid.query.QueryContext;
+
+/**
+ * Worker-side extension point for {@link InputSlice}: provides an {@link 
InputSliceReader} for a particular
+ * {@link InputSlice} class.
+ */
+public interface InputSliceReaderProvider
+{
+  /**
+   * The {@link InputSlice} class handled by this provider.
+   */
+  Class<? extends InputSlice> sliceClass();
+
+  /**
+   * Returns an {@link InputSliceReader} that accepts {@link #sliceClass()}.
+   */
+  InputSliceReader createReader(FrameContext frameContext, QueryContext 
queryContext);
+}
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java
new file mode 100644
index 00000000000..3d0f0e77428
--- /dev/null
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecSlicerProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.input;
+
+import org.apache.druid.msq.exec.ControllerContext;
+import org.apache.druid.query.QueryContext;
+
+import java.util.List;
+
+/**
+ * Controller-side extension point for {@link InputSpec}: provides an {@link 
InputSpecSlicer} for a particular
+ * {@link InputSpec} class.
+ */
+public interface InputSpecSlicerProvider
+{
+  /**
+   * The {@link InputSpec} class handled by this provider.
+   */
+  Class<? extends InputSpec> specClass();
+
+  /**
+   * Returns an {@link InputSpecSlicer} that accepts {@link #specClass()}.
+   */
+  InputSpecSlicer createSlicer(
+      ControllerContext controllerContext,
+      QueryContext queryContext,
+      List<String> workerIds
+  );
+}
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java
index 3888368c1b8..1ab1b55024c 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/controller/DartControllerContextTest.java
@@ -106,8 +106,17 @@ public class DartControllerContextTest
   @Test
   public void test_queryKernelConfig()
   {
-    final DartControllerContext controllerContext =
-        new DartControllerContext(null, null, SELF_NODE, null, 
memoryIntrospector, serverView, null, queryContext);
+    final DartControllerContext controllerContext = new DartControllerContext(
+        null,
+        null,
+        SELF_NODE,
+        null,
+        memoryIntrospector,
+        serverView,
+        List.of(),
+        null,
+        queryContext
+    );
     final ControllerQueryKernelConfig queryKernelConfig = 
controllerContext.queryKernelConfig(querySpec);
 
     Assertions.assertFalse(queryKernelConfig.isFaultTolerant());
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
index 787b980307f..895de0319da 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java
@@ -30,6 +30,7 @@ import com.google.inject.Injector;
 import com.google.inject.Module;
 import com.google.inject.util.Modules;
 import com.google.inject.util.Providers;
+import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.error.DruidException;
@@ -237,6 +238,7 @@ public class MSQCompactionTaskRunTest extends 
CompactionTaskRunBase
         new SegmentWranglerModule(),
         new LookylooModule(),
         new MSQIndexingModule(),
+        binder -> 
binder.bind(CoordinatorClient.class).toInstance(coordinatorClient),
         binder -> 
binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance()),
         binder -> binder.bind(WireTransferableContext.class).toInstance(new 
WireTransferableContext(null, null, true)),
         binder -> binder.bind(DataSegmentPusher.class)
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java
index 7b7f4cb93b1..fc7b499eb1a 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TestMSQSqlModule.java
@@ -21,8 +21,11 @@ package org.apache.druid.msq.exec;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
 import com.google.inject.Injector;
 import com.google.inject.Provides;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.TestDruidModule;
 import org.apache.druid.msq.guice.MultiStageQuery;
@@ -39,6 +42,12 @@ import org.apache.druid.sql.avatica.MSQDruidMeta;
 
 public class TestMSQSqlModule extends TestDruidModule
 {
+  @Override
+  public void configure(Binder binder)
+  {
+    
binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class).in(LazySingleton.class);
+  }
+
   @Provides
   @MultiStageQuery
   @LazySingleton
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
index 41956d55695..f7947676ed7 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/sql/MSQTaskQueryMakerTest.java
@@ -31,6 +31,8 @@ import com.google.inject.testing.fieldbinder.BoundFieldModule;
 import com.google.inject.util.Modules;
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
 import org.apache.druid.frame.testutil.FrameTestUtil;
 import org.apache.druid.guice.ConfigModule;
 import org.apache.druid.guice.DruidGuiceExtensions;
@@ -217,7 +219,10 @@ public class MSQTaskQueryMakerTest
         new SegmentWranglerModule(),
         new LookylooModule(),
         new MSQIndexingModule(),
-        binder -> 
binder.bind(WireTransferableContext.class).toInstance(FrameTestUtil.WT_CONTEXT_LEGACY)
+        binder -> {
+          
binder.bind(WireTransferableContext.class).toInstance(FrameTestUtil.WT_CONTEXT_LEGACY);
+          binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class);
+        }
     );
     Injector injector = Guice.createInjector(defaultModule, 
BoundFieldModule.of(this));
     DruidSecondaryModule.setupJackson(injector, objectMapper, 
Collections.emptyMap(), true);
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
index dd0370008ad..43558e8d517 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/AbstractDartComponentSupplier.java
@@ -22,8 +22,6 @@ package org.apache.druid.msq.test;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Binder;
 import com.google.inject.Provides;
-import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.client.coordinator.NoopCoordinatorClient;
 import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
 import org.apache.druid.guice.LazySingleton;
@@ -126,7 +124,6 @@ public abstract class AbstractDartComponentSupplier extends 
AbstractMSQComponent
     @Override
     public void configure(Binder binder)
     {
-      binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class);
     }
   }
 
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
index 5a2ecbcf243..10c3e8cbec3 100644
--- a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
+++ b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java
@@ -38,6 +38,8 @@ import com.google.inject.util.Modules;
 import com.google.inject.util.Providers;
 import org.apache.calcite.avatica.remote.TypedValue;
 import org.apache.druid.client.ImmutableSegmentLoadInfo;
+import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.client.coordinator.NoopCoordinatorClient;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.LongDimensionSchema;
@@ -576,7 +578,8 @@ public class MSQTestBase extends BaseCalciteQueryTest
         new SegmentWranglerModule(),
         new HllSketchModule(),
         binder -> binder.bind(Bouncer.class).toInstance(new Bouncer(1)),
-        binder -> 
binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance())
+        binder -> 
binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance()),
+        binder -> 
binder.bind(CoordinatorClient.class).to(NoopCoordinatorClient.class).in(LazySingleton.class)
     );
     // adding node role injection to the modules, since CliPeon would also do 
that through run method
     injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build(), 
ImmutableSet.of(NodeRole.PEON))
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
index 6f09c43ff02..8cc6ed620d2 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestControllerContext.java
@@ -51,7 +51,6 @@ import org.apache.druid.msq.exec.Controller;
 import org.apache.druid.msq.exec.ControllerContext;
 import org.apache.druid.msq.exec.ControllerMemoryParameters;
 import org.apache.druid.msq.exec.MSQMetricEventBuilder;
-import org.apache.druid.msq.exec.SegmentSource;
 import org.apache.druid.msq.exec.Worker;
 import org.apache.druid.msq.exec.WorkerClient;
 import org.apache.druid.msq.exec.WorkerFailureListener;
@@ -61,12 +60,12 @@ import org.apache.druid.msq.exec.WorkerMemoryParameters;
 import org.apache.druid.msq.exec.WorkerRunRef;
 import org.apache.druid.msq.exec.WorkerStorageParameters;
 import org.apache.druid.msq.indexing.IndexerControllerContext;
-import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicer;
+import org.apache.druid.msq.indexing.IndexerTableInputSpecSlicerProvider;
 import org.apache.druid.msq.indexing.MSQSpec;
 import org.apache.druid.msq.indexing.MSQWorkerTask;
 import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
 import 
org.apache.druid.msq.indexing.MSQWorkerTaskLauncher.MSQWorkerTaskLauncherConfig;
-import org.apache.druid.msq.input.InputSpecSlicer;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
 import org.apache.druid.msq.kernel.controller.ControllerQueryKernelConfig;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.query.QueryContext;
@@ -169,6 +168,7 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
         serviceEmitter,
         Mockito.mock(CoordinatorClient.class)
     );
+    
Mockito.when(coordinatorClient.withRetryPolicy(ArgumentMatchers.any())).thenReturn(coordinatorClient);
     Mockito.when(coordinatorClient.fetchServerViewSegments(
                      ArgumentMatchers.anyString(),
                      ArgumentMatchers.any()
@@ -416,13 +416,9 @@ public class MSQTestControllerContext implements 
ControllerContext, DartControll
   }
 
   @Override
-  public InputSpecSlicer newTableInputSpecSlicer(WorkerManager workerManager)
+  public List<InputSpecSlicerProvider> inputSpecSlicerProviders()
   {
-    return new IndexerTableInputSpecSlicer(
-        coordinatorClient,
-        taskActionClient,
-        MultiStageQueryContext.getSegmentSources(queryContext, 
SegmentSource.NONE)
-    );
+    return List.of(new IndexerTableInputSpecSlicerProvider(coordinatorClient));
   }
 
   @Override
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
index c3eaf6e9c9e..d7f48f2dc41 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/test/TestDartControllerContextFactoryImpl.java
@@ -46,6 +46,7 @@ import org.apache.druid.msq.exec.Worker;
 import org.apache.druid.msq.exec.WorkerImpl;
 import org.apache.druid.msq.exec.WorkerRunRef;
 import org.apache.druid.msq.exec.WorkerStorageParameters;
+import org.apache.druid.msq.input.InputSpecSlicerProvider;
 import org.apache.druid.msq.kernel.StageId;
 import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.query.QueryContext;
@@ -53,6 +54,7 @@ import org.apache.druid.rpc.ServiceClientFactory;
 import org.apache.druid.server.DruidNode;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.Executors;
 
 public class TestDartControllerContextFactoryImpl extends 
DartControllerContextFactoryImpl
@@ -74,11 +76,22 @@ public class TestDartControllerContextFactoryImpl extends 
DartControllerContextF
       @EscalatedGlobal final ServiceClientFactory serviceClientFactory,
       final MemoryIntrospector memoryIntrospector,
       final TimelineServerView serverView,
+      @Dart final Set<InputSpecSlicerProvider> inputSpecSlicerProviders,
       final ServiceEmitter emitter,
       @Dart Map<String, WorkerRunRef> workerMap
   )
   {
-    super(injector, jsonMapper, smileMapper, selfNode, serviceClientFactory, 
memoryIntrospector, serverView, emitter);
+    super(
+        injector,
+        jsonMapper,
+        smileMapper,
+        selfNode,
+        serviceClientFactory,
+        memoryIntrospector,
+        serverView,
+        inputSpecSlicerProviders,
+        emitter
+    );
     this.workerMap = workerMap;
   }
 
@@ -92,6 +105,7 @@ public class TestDartControllerContextFactoryImpl extends 
DartControllerContextF
         new DartTestWorkerClient(),
         memoryIntrospector,
         serverView,
+        inputSpecSlicerProviders,
         emitter,
         context
     )


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to