This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ab969cbf915 MSQ: Configurable maxPartitions and 
maxInputFilesPerWorker. (#18826)
ab969cbf915 is described below

commit ab969cbf915d158abecb57c83a7c787ec544530c
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Dec 17 13:36:12 2025 -0800

    MSQ: Configurable maxPartitions and maxInputFilesPerWorker. (#18826)
    
    * MSQ: Configurable maxPartitions and maxInputFilesPerWorker.
    
    The defaults remain the same, but the parameters become configurable.
    
    This patch also contains a removal of redundant logic: RunWorkOrder's
    initGlobalSortPartitionBoundariesIfNeeded is removed, since 
WorkerStageKernel
    has the same logic. We can copy the partitions over from there.
---
 docs/multi-stage-query/reference.md                |  8 ++-
 .../org/apache/druid/msq/exec/ControllerImpl.java  | 20 +++++-
 .../java/org/apache/druid/msq/exec/Limits.java     | 20 +++++-
 .../org/apache/druid/msq/exec/RunWorkOrder.java    | 21 ------
 .../java/org/apache/druid/msq/exec/WorkerImpl.java |  5 ++
 .../msq/indexing/error/TooManyInputFilesFault.java |  5 +-
 .../msq/indexing/error/TooManyPartitionsFault.java |  8 ++-
 .../apache/druid/msq/kernel/StageDefinition.java   | 25 ++++---
 .../druid/msq/kernel/WorkerAssignmentStrategy.java |  9 ++-
 .../kernel/controller/ControllerQueryKernel.java   | 37 ++++++----
 .../kernel/controller/ControllerStageTracker.java  | 34 ++++++---
 .../druid/msq/kernel/controller/WorkerInputs.java  |  2 +
 .../druid/msq/kernel/worker/WorkerStageKernel.java | 16 +++--
 .../druid/msq/util/MultiStageQueryContext.java     | 30 ++++++++
 .../org/apache/druid/msq/exec/MSQFaultsTest.java   | 74 ++++++++++++++++++-
 .../druid/msq/kernel/StageDefinitionTest.java      | 14 +++-
 .../controller/BaseControllerQueryKernelTest.java  |  8 ++-
 .../msq/kernel/controller/WorkerInputsTest.java    | 15 ++++
 .../druid/msq/util/MultiStageQueryContextTest.java | 82 +++++++++++++++++++++-
 19 files changed, 345 insertions(+), 88 deletions(-)

diff --git a/docs/multi-stage-query/reference.md 
b/docs/multi-stage-query/reference.md
index c64cab7a374..29b044b46b8 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -414,6 +414,8 @@ The following table lists the context parameters for the 
MSQ task engine:
 | `removeNullBytes` | SELECT, INSERT or REPLACE<br /><br /> The MSQ engine 
cannot process null bytes in strings and throws `InvalidNullByteFault` if it 
encounters them in the source data. If the parameter is set to true, The MSQ 
engine will remove the null bytes in string fields when reading the data. | 
`false` |
 | `includeAllCounters` | SELECT, INSERT or REPLACE<br /><br />Whether to 
include counters that were added in Druid 31 or later. This is a backwards 
compatibility option that must be set to `false` during a rolling update from 
versions prior to Druid 31. | `true` |
 | `maxFrameSize` | SELECT, INSERT or REPLACE<br /><br />Size of frames used 
for data transfer within the MSQ engine. You generally do not need to change 
this unless you have very large rows. | `1000000` (1 MB) |
+| `maxInputFilesPerWorker` | SELECT, INSERT, REPLACE<br /><br />Maximum number 
of input files or segments per worker. If a single worker would need to read 
more than this number of files, the query fails with a `TooManyInputFiles` 
error. In this case, you should either increase this limit if your tasks have 
enough memory to handle more files, add more workers by increasing 
`maxNumTasks`, or split your query into smaller queries that process fewer 
files. | 10,000 |
+| `maxPartitions` | SELECT, INSERT, REPLACE<br /><br />Maximum number of 
output partitions for any single stage. For INSERT or REPLACE queries, this 
controls the maximum number of segments that can be generated. If the query 
would exceed this limit, it fails with a `TooManyPartitions` error. You can 
increase this limit if needed, break your query into smaller queries, or use a 
larger target segment size (via `rowsPerSegment`). | 25,000 |
 | `maxThreads` | SELECT, INSERT or REPLACE<br /><br />Maximum number of 
threads to use for processing. This only has an effect if it is greater than 
zero and less than the default thread count based on system configuration. 
Otherwise, it is ignored, and workers use the default thread count. | Not set 
(use default thread count) |
 
 ## Joins
@@ -571,10 +573,10 @@ The following table lists query limits:
 
 | Limit | Value | Error if exceeded |
 |---|---|---|
-| Size of an individual row written to a frame. Row size when written to a 
frame may differ from the original row size. | 1 MB | `RowTooLarge` |
+| Size of an individual row written to a frame. Row size when written to a 
frame may differ from the original row size. Configurable with 
[`maxFrameSize`](#context). | 1 MB | `RowTooLarge` |
 | Number of segment-granular time chunks encountered during ingestion. | 5,000 
| `TooManyBuckets`|
-| Number of input files/segments per worker. | 10,000 | `TooManyInputFiles`|
-| Number of output partitions for any one stage. Number of segments generated 
during ingestion. |25,000 | `TooManyPartitions`|
+| Number of input files/segments per worker. Configurable with 
[`maxInputFilesPerWorker`](#context). | 10,000 | `TooManyInputFiles`|
+| Number of output partitions for any one stage. Number of segments generated 
during ingestion. Configurable with [`maxPartitions`](#context). | 25,000 | 
`TooManyPartitions`|
 | Number of output columns for any one stage. | 2,000 | `TooManyColumns`|
 | Number of cluster by columns that can appear in a stage | 1,500 | 
`TooManyClusteredByColumns` |
 | Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent 
soft limit may be lower. | `TooManyWorkers`|
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index b8238b81b16..0330c1f5b39 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1444,6 +1444,7 @@ public class ControllerImpl implements Controller
       final QueryDefinition queryDef,
       final ControllerQueryKernel queryKernel,
       final int stageNumber,
+      final int maxInputFilesPerWorker,
       @Nullable final List<SegmentIdWithShardSpec> segmentsToGenerate
   )
   {
@@ -1454,7 +1455,8 @@ public class ControllerImpl implements Controller
         segmentsToGenerate
     );
 
-    final Int2ObjectMap<WorkOrder> workOrders = 
queryKernel.createWorkOrders(stageNumber, extraInfos);
+    final Int2ObjectMap<WorkOrder> workOrders =
+        queryKernel.createWorkOrders(stageNumber, maxInputFilesPerWorker, 
extraInfos);
     final StageId stageId = new StageId(queryDef.getQueryId(), stageNumber);
 
     queryKernel.startStage(stageId);
@@ -2533,8 +2535,12 @@ public class ControllerImpl implements Controller
      */
     private void startStages() throws IOException, InterruptedException
     {
+      final int maxInputFilesPerWorker =
+          
MultiStageQueryContext.getMaxInputFilesPerWorker(querySpec.getContext());
       final long maxInputBytesPerWorker =
           
MultiStageQueryContext.getMaxInputBytesPerWorker(querySpec.getContext());
+      final int maxPartitions =
+          MultiStageQueryContext.getMaxPartitions(querySpec.getContext());
 
       logKernelStatus(queryDef.getQueryId(), queryKernel);
 
@@ -2545,7 +2551,9 @@ public class ControllerImpl implements Controller
             inputSpecSlicerFactory,
             querySpec.getAssignmentStrategy(),
             rowBasedFrameType,
-            maxInputBytesPerWorker
+            maxInputFilesPerWorker,
+            maxInputBytesPerWorker,
+            maxPartitions
         );
 
         for (final StageId stageId : newStageIds) {
@@ -2573,7 +2581,13 @@ public class ControllerImpl implements Controller
             retryWorkersOrFailJob(queryKernel, workerFaultSet);
           }
           stageRuntimesForLiveReports.put(stageId.getStageNumber(), new 
Interval(DateTimes.nowUtc(), DateTimes.MAX));
-          startWorkForStage(queryDef, queryKernel, stageId.getStageNumber(), 
segmentsToGenerate);
+          startWorkForStage(
+              queryDef,
+              queryKernel,
+              stageId.getStageNumber(),
+              maxInputFilesPerWorker,
+              segmentsToGenerate
+          );
         }
       } while (!newStageIds.isEmpty());
     }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
index fd210776277..0ca4a0709bb 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
@@ -19,6 +19,11 @@
 
 package org.apache.druid.msq.exec;
 
+import org.apache.druid.msq.indexing.error.TooManyBucketsFault;
+import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
+import org.apache.druid.msq.indexing.error.TooManyPartitionsFault;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+
 public class Limits
 {
   /**
@@ -45,9 +50,10 @@ public class Limits
   public static final int MAX_WORKERS = 1000;
 
   /**
-   * Maximum number of input files per worker
+   * Default maximum number of input files per worker. Exceeding this will 
yield a {@link TooManyInputFilesFault}.
+   * Can be overridden by the context parameter {@link 
MultiStageQueryContext#CTX_MAX_INPUT_FILES_PER_WORKER}.
    */
-  public static final int MAX_INPUT_FILES_PER_WORKER = 10_000;
+  public static final int DEFAULT_MAX_INPUT_FILES_PER_WORKER = 10_000;
 
   /**
    * Maximum number of parse exceptions with their stack traces a worker can 
send to the controller.
@@ -94,10 +100,18 @@ public class Limits
   public static final long MAX_SELECT_RESULT_ROWS = 3_000;
 
   /**
-   * Max number of partition buckets for ingestion queries.
+   * Max number of partition buckets. Exceeding this will yield a {@link 
TooManyBucketsFault}. For an ingestion job,
+   * this is the maximum number of output time chunks.
    */
   public static final int MAX_PARTITION_BUCKETS = 5_000;
 
+  /**
+   * Default max number of output partitions for a stage. Exceeding this will 
yield a {@link TooManyPartitionsFault}.
+   * For an ingestion job, this is the maximum number of segments that can be 
created.
+   * Can be overridden by the context parameter {@link 
MultiStageQueryContext#CTX_MAX_PARTITIONS}.
+   */
+  public static final int DEFAULT_MAX_PARTITIONS = 25_000;
+
   /**
    * Max number of rows with the same key in a window. This acts as a 
guardrail for
    * data distribution with high cardinality
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index 8931b26b915..16276f24202 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -65,7 +65,6 @@ import org.apache.druid.msq.input.stage.StageInputSlice;
 import org.apache.druid.msq.input.stage.StageInputSliceReader;
 import org.apache.druid.msq.input.table.SegmentsInputSlice;
 import org.apache.druid.msq.input.table.SegmentsInputSliceReader;
-import org.apache.druid.msq.kernel.ShuffleKind;
 import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.shuffle.output.DurableStorageOutputChannelFactory;
 import org.apache.druid.msq.util.MultiStageQueryContext;
@@ -174,7 +173,6 @@ public class RunWorkOrder
 
     try {
       exec.registerCancellationId(cancellationId);
-      initGlobalSortPartitionBoundariesIfNeeded();
       startStageProcessor();
       setUpCompletionCallbacks();
     }
@@ -284,25 +282,6 @@ public class RunWorkOrder
     stageResultFuture = processor.execute(executionContext);
   }
 
-  /**
-   * Initialize {@link #stagePartitionBoundariesFuture} if it will be needed 
(i.e. if {@link ShuffleKind#GLOBAL_SORT})
-   * but does not need statistics. In this case, it is known upfront, before 
the job starts.
-   */
-  private void initGlobalSortPartitionBoundariesIfNeeded()
-  {
-    if (workOrder.getStageDefinition().doesShuffle()
-        && workOrder.getStageDefinition().getShuffleSpec().kind() == 
ShuffleKind.GLOBAL_SORT
-        && !workOrder.getStageDefinition().mustGatherResultKeyStatistics()) {
-      // Result key stats aren't needed, so the partition boundaries are 
knowable ahead of time. Compute them now.
-      final ClusterByPartitions boundaries =
-          workOrder.getStageDefinition()
-                   .generatePartitionBoundariesForShuffle(null)
-                   .valueOrThrow();
-
-      stagePartitionBoundariesFuture.set(boundaries);
-    }
-  }
-
   /**
    * Callbacks that fire when all work for the stage is done (i.e. when {@link 
#stageResultFuture} resolves).
    */
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index ffd64dbe088..3f28593bc21 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -429,6 +429,11 @@ public class WorkerImpl implements Worker
     // Set up processorCloser (called when processing is done).
     kernelHolder.processorCloser.register(() -> runWorkOrder.stop(null));
 
+    // Set resultPartitionBoundaries in RunWorkOrder if it is known ahead of 
time.
+    if (kernel.hasResultPartitionBoundaries()) {
+      
runWorkOrder.getStagePartitionBoundariesFuture().set(kernel.getResultPartitionBoundaries());
+    }
+
     // Start working on this stage immediately.
     kernel.startReading();
     runWorkOrder.startAsync();
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyInputFilesFault.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyInputFilesFault.java
index ceb1367ffd8..cae7a5c36d7 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyInputFilesFault.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyInputFilesFault.java
@@ -46,10 +46,11 @@ public class TooManyInputFilesFault extends BaseMSQFault
     super(
         CODE,
         "Too many input files/segments [%d] encountered. Maximum input 
files/segments per worker is set to [%d]. Try"
-        + " breaking your query up into smaller queries, or increasing the 
number of workers to at least [%d] by"
-        + " setting %s in your query context",
+        + " increasing the limit using the %s query context parameter, 
breaking your query up into smaller queries,"
+        + " or increasing the number of workers to at least [%d] by setting %s 
in your query context.",
         numInputFiles,
         maxInputFiles,
+        MultiStageQueryContext.CTX_MAX_INPUT_FILES_PER_WORKER,
         minNumWorkers,
         MultiStageQueryContext.CTX_MAX_NUM_TASKS
     );
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyPartitionsFault.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyPartitionsFault.java
index e9d91a42d4e..e59b6645f75 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyPartitionsFault.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyPartitionsFault.java
@@ -22,6 +22,7 @@ package org.apache.druid.msq.indexing.error;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.msq.util.MultiStageQueryContext;
 
 import java.util.Objects;
 
@@ -37,9 +38,10 @@ public class TooManyPartitionsFault extends BaseMSQFault
   {
     super(
         CODE,
-        "Too many partitions (max = %d); try breaking your query up into 
smaller queries or "
-        + "using a larger target size",
-        maxPartitions
+        "Too many partitions (max = %d). Try increasing the limit using the %s 
query context parameter, "
+        + "breaking your query up into smaller queries, or using a larger 
target size.",
+        maxPartitions,
+        MultiStageQueryContext.CTX_MAX_PARTITIONS
     );
     this.maxPartitions = maxPartitions;
   }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index 4555f13eb75..736195155c6 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -48,6 +48,8 @@ import org.apache.druid.msq.input.InputSpecs;
 import org.apache.druid.msq.input.table.TableInputSpec;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.QueryContext;
 import org.apache.druid.segment.column.RowSignature;
 
 import javax.annotation.Nullable;
@@ -85,8 +87,6 @@ import java.util.function.Supplier;
  */
 public class StageDefinition
 {
-  private static final int MAX_PARTITIONS = 25_000; // Limit for 
TooManyPartitions
-
   // If adding any fields here, add them to builder(StageDefinition) below too.
   private final StageId id;
   private final List<InputSpec> inputSpecs;
@@ -292,12 +292,6 @@ public class StageDefinition
     return shuffleCheckHasMultipleValues;
   }
 
-  public int getMaxPartitionCount()
-  {
-    // Pretends to be an instance method, but really returns a constant. Maybe 
one day this will be defined per stage.
-    return MAX_PARTITIONS;
-  }
-
   public int getStageNumber()
   {
     return id.getStageNumber();
@@ -334,8 +328,19 @@ public class StageDefinition
     return mustGatherResultKeyStatistics(shuffleSpec);
   }
 
+  /**
+   * Generate partition boundaries for {@link ShuffleKind#GLOBAL_SORT} 
shuffles.
+   *
+   * @param collector     statistics collector, to be provided if {@link 
#mustGatherResultKeyStatistics()}
+   * @param maxPartitions maximum number of partitions to generate. On the 
controller, this is the value of
+   *                      {@link 
MultiStageQueryContext#getMaxPartitions(QueryContext)}. On workers, this method
+   *                      is only used when the number of partitions is 
determined ahead of time by the
+   *                      {@link ShuffleSpec}, so {@link Integer#MAX_VALUE} is 
typically provided for this parameter
+   *                      out of convenience.
+   */
   public Either<Long, ClusterByPartitions> 
generatePartitionBoundariesForShuffle(
-      @Nullable ClusterByStatisticsCollector collector
+      @Nullable ClusterByStatisticsCollector collector,
+      int maxPartitions
   )
   {
     if (shuffleSpec == null) {
@@ -351,7 +356,7 @@ public class StageDefinition
     } else if (!mustGatherResultKeyStatistics() && collector != null) {
       throw new ISE("Statistics gathered, but not required for stage[%d]", 
getStageNumber());
     } else {
-      return ((GlobalSortShuffleSpec) 
shuffleSpec).generatePartitionsForGlobalSort(collector, MAX_PARTITIONS);
+      return ((GlobalSortShuffleSpec) 
shuffleSpec).generatePartitionsForGlobalSort(collector, maxPartitions);
     }
   }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
index cdf4e2e20b0..0c9115ec437 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonValue;
 import it.unimi.dsi.fastutil.ints.Int2IntMap;
 import it.unimi.dsi.fastutil.ints.IntSet;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.InputSlice;
 import org.apache.druid.msq.input.InputSpec;
 import org.apache.druid.msq.input.InputSpecSlicer;
@@ -49,6 +48,7 @@ public enum WorkerAssignmentStrategy
         final InputSpec inputSpec,
         final Int2IntMap stageWorkerCountMap,
         final InputSpecSlicer slicer,
+        final int maxInputFilesPerSlice,
         final long maxInputBytesPerSlice
     )
     {
@@ -58,7 +58,7 @@ public enum WorkerAssignmentStrategy
 
   /**
    * Use the lowest possible number of workers, while keeping each worker's 
workload under
-   * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@code 
maxInputBytesPerWorker} bytes.
+   * {@code maxInputFilesPerSlice} files and {@code maxInputBytesPerWorker} 
bytes.
    *
    * Implemented using {@link InputSpecSlicer#sliceDynamic} whenever possible.
    */
@@ -69,6 +69,7 @@ public enum WorkerAssignmentStrategy
         final InputSpec inputSpec,
         final Int2IntMap stageWorkerCountMap,
         final InputSpecSlicer slicer,
+        final int maxInputFilesPerSlice,
         final long maxInputBytesPerSlice
     )
     {
@@ -76,7 +77,7 @@ public enum WorkerAssignmentStrategy
         return slicer.sliceDynamic(
             inputSpec,
             stageDef.getMaxWorkerCount(),
-            Limits.MAX_INPUT_FILES_PER_WORKER,
+            maxInputFilesPerSlice,
             maxInputBytesPerSlice
         );
       } else {
@@ -117,6 +118,7 @@ public enum WorkerAssignmentStrategy
    * @param inputSpec inputSpec containing information on where the input is 
read from
    * @param stageWorkerCountMap map of past stage number vs number of worker 
inputs
    * @param slicer creates slices of input spec based on other parameters
+   * @param maxInputFilesPerSlice hard maximum number of files per input slice
    * @param maxInputBytesPerSlice maximum suggested bytes per input slice
    * @return list containing input slices
    */
@@ -125,6 +127,7 @@ public enum WorkerAssignmentStrategy
       InputSpec inputSpec,
       Int2IntMap stageWorkerCountMap,
       InputSpecSlicer slicer,
+      int maxInputFilesPerSlice,
       long maxInputBytesPerSlice
   );
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
index f61022044b0..7356daef89a 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java
@@ -37,7 +37,6 @@ import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.exec.ExtraInfoHolder;
-import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.exec.OutputChannelMode;
 import org.apache.druid.msq.indexing.error.CanceledFault;
 import org.apache.druid.msq.indexing.error.MSQException;
@@ -183,14 +182,18 @@ public class ControllerQueryKernel
       final InputSpecSlicerFactory slicerFactory,
       final WorkerAssignmentStrategy assignmentStrategy,
       final FrameType rowBasedFrameType,
-      final long maxInputBytesPerWorker
+      final int maxInputFilesPerWorker,
+      final long maxInputBytesPerWorker,
+      final int maxPartitions
   )
   {
     createNewKernels(
         slicerFactory,
         assignmentStrategy,
         rowBasedFrameType,
-        maxInputBytesPerWorker
+        maxInputFilesPerWorker,
+        maxInputBytesPerWorker,
+        maxPartitions
     );
 
     return stageTrackers.values()
@@ -289,6 +292,7 @@ public class ControllerQueryKernel
    */
   public Int2ObjectMap<WorkOrder> createWorkOrders(
       final int stageNumber,
+      final int maxInputFilesPerWorker,
       @Nullable final Int2ObjectMap<Object> extraInfos
   )
   {
@@ -318,16 +322,14 @@ public class ControllerQueryKernel
       );
 
       final int numInputFiles = 
Ints.checkedCast(workOrder.getInputs().stream().mapToLong(InputSlice::fileCount).sum());
-      fault = fault || IntMath.divide(numInputFiles, 
Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING) > 1;
+      fault = fault || IntMath.divide(numInputFiles, maxInputFilesPerWorker, 
RoundingMode.CEILING) > 1;
       totalFileCount += numInputFiles;
       workerToWorkOrder.put(workerNumber, workOrder);
     }
 
-    final int requiredWorkers = IntMath.divide(totalFileCount, 
Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING);
+    final int requiredWorkers = IntMath.divide(totalFileCount, 
maxInputFilesPerWorker, RoundingMode.CEILING);
     if (fault) {
-      throw new MSQException(
-          new TooManyInputFilesFault(totalFileCount, 
Limits.MAX_INPUT_FILES_PER_WORKER, requiredWorkers)
-      );
+      throw new MSQException(new TooManyInputFilesFault(totalFileCount, 
maxInputFilesPerWorker, requiredWorkers));
     }
     stageWorkOrders.put(new StageId(queryDef.getQueryId(), stageNumber), 
workerToWorkOrder);
     return workerToWorkOrder;
@@ -337,7 +339,9 @@ public class ControllerQueryKernel
       final InputSpecSlicerFactory slicerFactory,
       final WorkerAssignmentStrategy assignmentStrategy,
       final FrameType rowBasedFrameType,
-      final long maxInputBytesPerWorker
+      final int maxInputFilesPerWorker,
+      final long maxInputBytesPerWorker,
+      final int maxPartitions
   )
   {
     StageGroup stageGroup;
@@ -357,7 +361,9 @@ public class ControllerQueryKernel
                   slicerFactory,
                   assignmentStrategy,
                   rowBasedFrameType,
-                  maxInputBytesPerWorker
+                  maxInputFilesPerWorker,
+                  maxInputBytesPerWorker,
+                  maxPartitions
               )
           );
 
@@ -380,7 +386,9 @@ public class ControllerQueryKernel
       final InputSpecSlicerFactory slicerFactory,
       final WorkerAssignmentStrategy assignmentStrategy,
       final FrameType rowBasedFrameType,
-      final long maxInputBytesPerWorker
+      final int maxInputFilesPerWorker,
+      final long maxInputBytesPerWorker,
+      final int maxPartitions
   )
   {
     final Int2IntMap stageWorkerCountMap = new Int2IntAVLTreeMap();
@@ -410,7 +418,9 @@ public class ControllerQueryKernel
         assignmentStrategy,
         rowBasedFrameType,
         config.getMaxRetainedPartitionSketchBytes(),
-        maxInputBytesPerWorker
+        maxInputFilesPerWorker,
+        maxInputBytesPerWorker,
+        maxPartitions
     );
   }
 
@@ -571,7 +581,8 @@ public class ControllerQueryKernel
 
   /**
    * Checks if the stage can be started, delegates call to {@link 
ControllerStageTracker#start()} for internal phase
-   * transition and registers the transition in this queryKernel. Work orders 
need to be created via {@link ControllerQueryKernel#createWorkOrders(int, 
Int2ObjectMap)} before calling this method.
+   * transition and registers the transition in this queryKernel. Work orders 
need to be created via
+   * {@link ControllerQueryKernel#createWorkOrders(int, int, Int2ObjectMap)} 
before calling this method.
    */
   public void startStage(final StageId stageId)
   {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
index 1c3fc1f84d4..1a2a0dc7b79 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java
@@ -88,6 +88,11 @@ class ControllerStageTracker
    */
   private final FrameType rowBasedFrameType;
 
+  /**
+   * Maximum number of partitions for this stage. See {@link 
MultiStageQueryContext#getMaxPartitions}.
+   */
+  private final int maxPartitions;
+
   // worker-> workerStagePhase
   // Controller keeps track of the stage with this map.
   // Currently, we rely on the serial nature of the state machine to keep 
things in sync between the controller and the worker.
@@ -136,7 +141,8 @@ class ControllerStageTracker
       final StageDefinition stageDef,
       final WorkerInputs workerInputs,
       final FrameType rowBasedFrameType,
-      final int maxRetainedPartitionSketchBytes
+      final int maxRetainedPartitionSketchBytes,
+      final int maxPartitions
   )
   {
     this.stageDef = stageDef;
@@ -144,6 +150,7 @@ class ControllerStageTracker
     this.workerInputs = workerInputs;
     this.rowBasedFrameType = rowBasedFrameType;
     this.maxRetainedPartitionSketchBytes = maxRetainedPartitionSketchBytes;
+    this.maxPartitions = maxPartitions;
 
     initializeWorkerState(workerInputs.workers());
 
@@ -178,7 +185,9 @@ class ControllerStageTracker
       final WorkerAssignmentStrategy assignmentStrategy,
       final FrameType rowBasedFrameType,
       final int maxRetainedPartitionSketchBytes,
-      final long maxInputBytesPerWorker
+      final int maxInputFilesPerWorker,
+      final long maxInputBytesPerWorker,
+      final int maxPartitions
   )
   {
     final WorkerInputs workerInputs = WorkerInputs.create(
@@ -186,6 +195,7 @@ class ControllerStageTracker
         stageWorkerCountMap,
         slicer,
         assignmentStrategy,
+        maxInputFilesPerWorker,
         maxInputBytesPerWorker
     );
 
@@ -193,7 +203,8 @@ class ControllerStageTracker
         stageDef,
         workerInputs,
         rowBasedFrameType,
-        maxRetainedPartitionSketchBytes
+        maxRetainedPartitionSketchBytes,
+        maxPartitions
     );
   }
 
@@ -589,10 +600,10 @@ class ControllerStageTracker
             );
             ClusterByStatisticsCollector collector = 
timeChunkToCollector.get(tc);
             Either<Long, ClusterByPartitions> countOrPartitions =
-                stageDef.generatePartitionBoundariesForShuffle(collector);
+                stageDef.generatePartitionBoundariesForShuffle(collector, 
maxPartitions);
             totalPartitionCount += 
getPartitionCountFromEither(countOrPartitions);
-            if (totalPartitionCount > stageDef.getMaxPartitionCount()) {
-              failForReason(new 
TooManyPartitionsFault(stageDef.getMaxPartitionCount()));
+            if (totalPartitionCount > maxPartitions) {
+              failForReason(new TooManyPartitionsFault(maxPartitions));
               return null;
             }
             timeChunkToBoundaries.put(tc, countOrPartitions.valueOrThrow());
@@ -726,10 +737,11 @@ class ControllerStageTracker
       }
       if (resultPartitions == null) {
         final ClusterByStatisticsCollector collector = 
timeChunkToCollector.get(STATIC_TIME_CHUNK_FOR_PARALLEL_MERGE);
-        Either<Long, ClusterByPartitions> countOrPartitions = 
stageDef.generatePartitionBoundariesForShuffle(collector);
+        Either<Long, ClusterByPartitions> countOrPartitions =
+            stageDef.generatePartitionBoundariesForShuffle(collector, 
maxPartitions);
         totalPartitionCount += getPartitionCountFromEither(countOrPartitions);
-        if (totalPartitionCount > stageDef.getMaxPartitionCount()) {
-          failForReason(new 
TooManyPartitionsFault(stageDef.getMaxPartitionCount()));
+        if (totalPartitionCount > maxPartitions) {
+          failForReason(new TooManyPartitionsFault(maxPartitions));
           return;
         }
         resultPartitionBoundaries = countOrPartitions.valueOrThrow();
@@ -954,10 +966,10 @@ class ControllerStageTracker
         }
 
         final Either<Long, ClusterByPartitions> maybeResultPartitionBoundaries 
=
-            stageDef.generatePartitionBoundariesForShuffle(null);
+            stageDef.generatePartitionBoundariesForShuffle(null, 
maxPartitions);
 
         if (maybeResultPartitionBoundaries.isError()) {
-          failForReason(new 
TooManyPartitionsFault(stageDef.getMaxPartitionCount()));
+          failForReason(new TooManyPartitionsFault(maxPartitions));
           return;
         }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
index 8dcaee9c213..6e1f00b1c84 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java
@@ -62,6 +62,7 @@ public class WorkerInputs
       final Int2IntMap stageWorkerCountMap,
       final InputSpecSlicer slicer,
       final WorkerAssignmentStrategy assignmentStrategy,
+      final int maxInputFilesPerWorker,
       final long maxInputBytesPerWorker
   )
   {
@@ -99,6 +100,7 @@ public class WorkerInputs
             inputSpec,
             stageWorkerCountMap,
             slicer,
+            maxInputFilesPerWorker,
             maxInputBytesPerWorker
         );
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java
index 726333a2d1d..ea99dc973f0 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java
@@ -74,13 +74,15 @@ public class WorkerStageKernel
   {
     this.workOrder = workOrder;
 
-    if (workOrder.getStageDefinition().doesShuffle()
-        && workOrder.getStageDefinition().getShuffleSpec().kind() == 
ShuffleKind.GLOBAL_SORT
-        && !workOrder.getStageDefinition().mustGatherResultKeyStatistics()) {
-      // Use valueOrThrow instead of a nicer error collection mechanism, 
because we really don't expect the
-      // MAX_PARTITIONS to be exceeded here. It would involve having a 
shuffleSpec that was statically configured
-      // to use a huge number of partitions.
-      resultPartitionBoundaries = 
workOrder.getStageDefinition().generatePartitionBoundariesForShuffle(null).valueOrThrow();
+    final StageDefinition stageDef = workOrder.getStageDefinition();
+    if (stageDef.doesShuffle()
+        && stageDef.getShuffleSpec().kind() == ShuffleKind.GLOBAL_SORT
+        && !stageDef.mustGatherResultKeyStatistics()) {
+      // Result key stats aren't needed, so the partition boundaries are 
knowable ahead of time. Compute them now.
+      // Use Integer.MAX_VALUE for maxPartitions since it isn't relevant in 
this path anyway.
+      resultPartitionBoundaries =
+          stageDef.generatePartitionBoundariesForShuffle(null, 
Integer.MAX_VALUE)
+                  .valueOrThrow();
     }
   }
 
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index d7630e0b414..11e2784d29d 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -141,6 +141,8 @@ public class MultiStageQueryContext
   public static final String CTX_SEGMENT_LOAD_WAIT = "waitUntilSegmentsLoad";
   public static final boolean DEFAULT_SEGMENT_LOAD_WAIT = false;
   public static final String CTX_MAX_INPUT_BYTES_PER_WORKER = 
"maxInputBytesPerWorker";
+  public static final String CTX_MAX_INPUT_FILES_PER_WORKER = 
"maxInputFilesPerWorker";
+  public static final String CTX_MAX_PARTITIONS = "maxPartitions";
 
   public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = 
"clusterStatisticsMergeMode";
   public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = 
ClusterStatisticsMergeMode.SEQUENTIAL.toString();
@@ -326,6 +328,34 @@ public class MultiStageQueryContext
     );
   }
 
+  public static int getMaxInputFilesPerWorker(final QueryContext queryContext)
+  {
+    final Integer value = queryContext.getInt(CTX_MAX_INPUT_FILES_PER_WORKER);
+    if (value == null) {
+      return Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER;
+    }
+    if (value <= 0) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("%s must be a positive integer, got[%d]", 
CTX_MAX_INPUT_FILES_PER_WORKER, value);
+    }
+    return value;
+  }
+
+  public static int getMaxPartitions(final QueryContext queryContext)
+  {
+    final Integer value = queryContext.getInt(CTX_MAX_PARTITIONS);
+    if (value == null) {
+      return Limits.DEFAULT_MAX_PARTITIONS;
+    }
+    if (value <= 0) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.INVALID_INPUT)
+                          .build("%s must be a positive integer, got[%d]", 
CTX_MAX_PARTITIONS, value);
+    }
+    return value;
+  }
+
   public static ClusterStatisticsMergeMode 
getClusterStatisticsMergeMode(QueryContext queryContext)
   {
     return QueryContexts.getAsEnum(
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
index 7e85722e480..12cab8fb7b5 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java
@@ -495,10 +495,82 @@ public class MSQFaultsTest extends MSQTestBase
         .setQueryContext(Map.of("maxNumTasks", 8))
         .setExpectedDataSource("foo1")
         .setExpectedRowSignature(dummyRowSignature)
-        .setExpectedMSQFault(new TooManyInputFilesFault(numFiles, 
Limits.MAX_INPUT_FILES_PER_WORKER, 10))
+        .setExpectedMSQFault(new TooManyInputFilesFault(numFiles, 
Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, 10))
         .verifyResults();
   }
 
+  @Test
+  public void testTooManyInputFilesWithLowContextLimit() throws IOException
+  {
+    final RowSignature dummyRowSignature = 
RowSignature.builder().addTimeColumn().build();
+
+    final int numFiles = 100;
+    final int maxInputFilesPerWorker = 10;
+
+    final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json");
+    final String toReadFileNameAsJson = 
queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath());
+    final String externalFiles = String.join(", ", 
Collections.nCopies(numFiles, toReadFileNameAsJson));
+
+    final Map<String, Object> context =
+        ImmutableMap.<String, Object>builder()
+                    .putAll(DEFAULT_MSQ_CONTEXT)
+                    
.put(MultiStageQueryContext.CTX_MAX_INPUT_FILES_PER_WORKER, 
maxInputFilesPerWorker)
+                    .build();
+
+    testIngestQuery()
+        .setSql(StringUtils.format(
+            "insert into foo1 SELECT\n"
+            + "  floor(TIME_PARSE(\"timestamp\") to day) AS __time\n"
+            + "FROM TABLE(\n"
+            + "  EXTERN(\n"
+            + "    '{ \"files\": [%s],\"type\":\"local\"}',\n"
+            + "    '{\"type\": \"csv\", \"hasHeaderRow\": true}',\n"
+            + "    '[{\"name\": \"timestamp\", \"type\": \"string\"}]'\n"
+            + "  )\n"
+            + ") PARTITIONED by day",
+            externalFiles
+        ))
+        .setExpectedDataSource("foo1")
+        .setExpectedRowSignature(dummyRowSignature)
+        .setQueryContext(context)
+        .setExpectedMSQFault(new TooManyInputFilesFault(numFiles, 
maxInputFilesPerWorker, 10))
+        .verifyResults();
+  }
+
+  @Test
+  public void testTooManyPartitionsWithLowContextLimit() throws IOException
+  {
+    final int maxPartitions = 5;
+
+    final Map<String, Object> context =
+        ImmutableMap.<String, Object>builder()
+                    .putAll(DEFAULT_MSQ_CONTEXT)
+                    .put(MultiStageQueryContext.CTX_ROWS_PER_SEGMENT, 1)
+                    .put(MultiStageQueryContext.CTX_MAX_PARTITIONS, 
maxPartitions)
+                    .build();
+
+    final RowSignature rowSignature = 
RowSignature.builder().addTimeColumn().build();
+
+    // Create a file with enough rows to exceed the partition limit
+    final File file = createNdJsonFile(newTempFile("ndjson"), 100, 1);
+    final String filePathAsJson = 
queryFramework().queryJsonMapper().writeValueAsString(file.getAbsolutePath());
+
+    testIngestQuery().setSql(" insert into foo1 SELECT\n"
+                             + "  floor(TIME_PARSE(\"timestamp\") to day) AS 
__time\n"
+                             + "FROM TABLE(\n"
+                             + "  EXTERN(\n"
+                             + "    '{ \"files\": [" + filePathAsJson + 
"],\"type\":\"local\"}',\n"
+                             + "    '{\"type\": \"json\"}',\n"
+                             + "    '[{\"name\": 
\"timestamp\",\"type\":\"string\"}]'\n"
+                             + "  )\n"
+                             + ") PARTITIONED by day")
+                     .setExpectedDataSource("foo1")
+                     .setExpectedRowSignature(rowSignature)
+                     .setQueryContext(context)
+                     .setExpectedMSQFault(new 
TooManyPartitionsFault(maxPartitions))
+                     .verifyResults();
+  }
+
   @Test
   public void testUnionAllWithDifferentColumnNames()
   {
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
index 73b7247728e..b51f229a669 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java
@@ -27,6 +27,7 @@ import org.apache.druid.frame.key.ClusterBy;
 import org.apache.druid.frame.key.KeyColumn;
 import org.apache.druid.frame.key.KeyOrder;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.stage.StageInputSpec;
 import org.apache.druid.msq.querykit.common.OffsetLimitStageProcessor;
 import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
@@ -60,7 +61,10 @@ public class StageDefinitionTest
         false
     );
 
-    Assert.assertThrows(ISE.class, () -> 
stageDefinition.generatePartitionBoundariesForShuffle(null));
+    Assert.assertThrows(
+        ISE.class,
+        () -> stageDefinition.generatePartitionBoundariesForShuffle(null, 
Limits.DEFAULT_MAX_PARTITIONS)
+    );
   }
 
   @Test
@@ -82,7 +86,10 @@ public class StageDefinitionTest
         false
     );
 
-    Assert.assertThrows(ISE.class, () -> 
stageDefinition.generatePartitionBoundariesForShuffle(null));
+    Assert.assertThrows(
+        ISE.class,
+        () -> stageDefinition.generatePartitionBoundariesForShuffle(null, 
Limits.DEFAULT_MAX_PARTITIONS)
+    );
   }
 
   @Test
@@ -117,7 +124,8 @@ public class StageDefinitionTest
                 100,
                 false,
                 false
-            )
+            ),
+            Limits.DEFAULT_MAX_PARTITIONS
         )
     );
   }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
index 9efdcffdac9..96312e293fa 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java
@@ -139,7 +139,7 @@ public class BaseControllerQueryKernelTest extends 
InitializedNullHandlingTest
           break;
 
         case READING_INPUT:
-          controllerQueryKernel.createWorkOrders(stageId.getStageNumber(), 
null);
+          controllerQueryKernel.createWorkOrders(stageId.getStageNumber(), 
Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, null);
           controllerQueryKernel.startStage(stageId);
           for (int i = 0; i < 
queryDefinition.getStageDefinition(stageId).getMaxWorkerCount(); ++i) {
             controllerQueryKernel.workOrdersSentForWorker(stageId, i);
@@ -248,7 +248,9 @@ public class BaseControllerQueryKernelTest extends 
InitializedNullHandlingTest
               inputSlicerFactory,
               WorkerAssignmentStrategy.MAX,
               
MultiStageQueryContext.getRowBasedFrameType(QueryContext.of(config.getWorkerContextMap())),
-              Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
+              Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
+              Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER,
+              Limits.DEFAULT_MAX_PARTITIONS
           )
       );
     }
@@ -280,7 +282,7 @@ public class BaseControllerQueryKernelTest extends 
InitializedNullHandlingTest
     public void startStage(int stageNumber)
     {
       Preconditions.checkArgument(initialized);
-      controllerQueryKernel.createWorkOrders(stageNumber, null);
+      controllerQueryKernel.createWorkOrders(stageNumber, 
Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, null);
       controllerQueryKernel.startStage(new 
StageId(queryDefinition.getQueryId(), stageNumber));
 
     }
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
index c3e42824bec..43f3c5c5635 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java
@@ -79,6 +79,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(denseWorkers(4), true),
         WorkerAssignmentStrategy.MAX,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -108,6 +109,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(new IntAVLTreeSet(new int[]{1, 3, 4, 5}), 
true),
         WorkerAssignmentStrategy.MAX,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -137,6 +139,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(denseWorkers(4), true),
         WorkerAssignmentStrategy.MAX,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -166,6 +169,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(denseWorkers(4), true),
         WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -192,6 +196,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(denseWorkers(4), true),
         WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -219,6 +224,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(denseWorkers(4), true),
         WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -245,6 +251,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(denseWorkers(4), true),
         WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -274,6 +281,7 @@ public class WorkerInputsTest
             new Int2ObjectAVLTreeMap<>(ImmutableMap.of(0, 
OutputChannelMode.LOCAL_STORAGE))
         ),
         WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -322,6 +330,7 @@ public class WorkerInputsTest
             new Int2ObjectAVLTreeMap<>(ImmutableMap.of(0, 
OutputChannelMode.LOCAL_STORAGE))
         ),
         WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -357,6 +366,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(denseWorkers(4), true),
         WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -384,6 +394,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(denseWorkers(2), true),
         WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -417,6 +428,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         new TestInputSpecSlicer(denseWorkers(1), true),
         WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -449,6 +461,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         testInputSpecSlicer,
         WorkerAssignmentStrategy.MAX,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
@@ -493,6 +506,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         testInputSpecSlicer,
         WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         100
     );
 
@@ -536,6 +550,7 @@ public class WorkerInputsTest
         Int2IntMaps.EMPTY_MAP,
         testInputSpecSlicer,
         WorkerAssignmentStrategy.AUTO,
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
         Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER
     );
 
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
index 598c820e0f5..953d7200500 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
@@ -22,8 +22,10 @@ package org.apache.druid.msq.util;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.exec.WorkerMemoryParameters;
 import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
 import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
@@ -117,7 +119,80 @@ public class MultiStageQueryContextTest
 
     Assert.assertEquals(
         1024,
-        
MultiStageQueryContext.getMaxInputBytesPerWorker(QueryContext.of(propertyMap)));
+        
MultiStageQueryContext.getMaxInputBytesPerWorker(QueryContext.of(propertyMap))
+    );
+  }
+
+  @Test
+  public void getMaxInputFilesPerWorker_unset_returnsDefaultValue()
+  {
+    Assert.assertEquals(
+        Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER,
+        MultiStageQueryContext.getMaxInputFilesPerWorker(QueryContext.empty())
+    );
+  }
+
+  @Test
+  public void getMaxInputFilesPerWorker_set_returnsCorrectValue()
+  {
+    Map<String, Object> propertyMap = 
ImmutableMap.of(MultiStageQueryContext.CTX_MAX_INPUT_FILES_PER_WORKER, 5000);
+    Assert.assertEquals(5000, 
MultiStageQueryContext.getMaxInputFilesPerWorker(QueryContext.of(propertyMap)));
+  }
+
+  @Test
+  public void getMaxInputFilesPerWorker_zero_throwsException()
+  {
+    Map<String, Object> propertyMap = 
ImmutableMap.of(MultiStageQueryContext.CTX_MAX_INPUT_FILES_PER_WORKER, 0);
+    Assert.assertThrows(
+        DruidException.class,
+        () -> 
MultiStageQueryContext.getMaxInputFilesPerWorker(QueryContext.of(propertyMap))
+    );
+  }
+
+  @Test
+  public void getMaxInputFilesPerWorker_negative_throwsException()
+  {
+    Map<String, Object> propertyMap = 
ImmutableMap.of(MultiStageQueryContext.CTX_MAX_INPUT_FILES_PER_WORKER, -1);
+    Assert.assertThrows(
+        DruidException.class,
+        () -> 
MultiStageQueryContext.getMaxInputFilesPerWorker(QueryContext.of(propertyMap))
+    );
+  }
+
+  @Test
+  public void getMaxPartitions_unset_returnsDefaultValue()
+  {
+    Assert.assertEquals(
+        Limits.DEFAULT_MAX_PARTITIONS,
+        MultiStageQueryContext.getMaxPartitions(QueryContext.empty())
+    );
+  }
+
+  @Test
+  public void getMaxPartitions_set_returnsCorrectValue()
+  {
+    Map<String, Object> propertyMap = 
ImmutableMap.of(MultiStageQueryContext.CTX_MAX_PARTITIONS, 50000);
+    Assert.assertEquals(50000, 
MultiStageQueryContext.getMaxPartitions(QueryContext.of(propertyMap)));
+  }
+
+  @Test
+  public void getMaxPartitions_zero_throwsException()
+  {
+    Map<String, Object> propertyMap = 
ImmutableMap.of(MultiStageQueryContext.CTX_MAX_PARTITIONS, 0);
+    Assert.assertThrows(
+        DruidException.class,
+        () -> 
MultiStageQueryContext.getMaxPartitions(QueryContext.of(propertyMap))
+    );
+  }
+
+  @Test
+  public void getMaxPartitions_negative_throwsException()
+  {
+    Map<String, Object> propertyMap = 
ImmutableMap.of(MultiStageQueryContext.CTX_MAX_PARTITIONS, -1);
+    Assert.assertThrows(
+        DruidException.class,
+        () -> 
MultiStageQueryContext.getMaxPartitions(QueryContext.of(propertyMap))
+    );
   }
 
   @Test
@@ -221,7 +296,10 @@ public class MultiStageQueryContextTest
   @Test
   public void getSelectDestination_unset_returnsDefaultValue()
   {
-    Assert.assertEquals(MSQSelectDestination.TASKREPORT, 
MultiStageQueryContext.getSelectDestination(QueryContext.empty()));
+    Assert.assertEquals(
+        MSQSelectDestination.TASKREPORT,
+        MultiStageQueryContext.getSelectDestination(QueryContext.empty())
+    );
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to