This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch 28.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/28.0.0 by this push:
     new f1057043c89 Ability to send task types to k8s or worker task runner 
(#15196) (#15255)
f1057043c89 is described below

commit f1057043c89548b5134109c50cd268bf63912166
Author: Suneet Saldanha <[email protected]>
AuthorDate: Wed Oct 25 13:30:12 2023 -0700

    Ability to send task types to k8s or worker task runner (#15196) (#15255)
---
 docs/development/extensions-contrib/k8s-jobs.md    |   7 +-
 .../overlord/KubernetesAndWorkerTaskRunner.java    |  10 +-
 .../KubernetesAndWorkerTaskRunnerConfig.java       |  51 ++++----
 .../KubernetesAndWorkerTaskRunnerFactory.java      |  19 ++-
 .../k8s/overlord/KubernetesOverlordModule.java     |  55 ++++++++-
 .../runnerstrategy/KubernetesRunnerStrategy.java   |  43 +++++++
 .../overlord/runnerstrategy/RunnerStrategy.java    |  75 ++++++++++++
 .../runnerstrategy/TaskTypeRunnerStrategy.java     | 128 +++++++++++++++++++++
 .../runnerstrategy/WorkerRunnerStrategy.java       |  43 +++++++
 .../KubernetesAndWorkerTaskRunnerConfigTest.java   |  10 +-
 .../KubernetesAndWorkerTaskRunnerFactoryTest.java  |  11 +-
 .../KubernetesAndWorkerTaskRunnerTest.java         |  36 +++++-
 .../KubernetesRunnerStrategyTest.java              |  43 +++++++
 .../runnerstrategy/TaskTypeRunnerStrategyTest.java |  64 +++++++++++
 .../runnerstrategy/WorkerRunnerStrategyTest.java   |  43 +++++++
 .../kubernetesAndWorkerTaskRunnerConfig.json       |   4 +-
 16 files changed, 587 insertions(+), 55 deletions(-)

diff --git a/docs/development/extensions-contrib/k8s-jobs.md 
b/docs/development/extensions-contrib/k8s-jobs.md
index 2132b55ea1e..27290a9bee5 100644
--- a/docs/development/extensions-contrib/k8s-jobs.md
+++ b/docs/development/extensions-contrib/k8s-jobs.md
@@ -284,6 +284,9 @@ To do this, set the following property.
 
 |Property| Possible Values |Description|Default|required|
 |--------|-----------------|-----------|-------|--------|
-|`druid.indexer.runner.k8sAndWorker.workerTaskRunnerType`|`String`|Determines 
whether the `httpRemote` or the `remote` task runner should be used in addition 
to the Kubernetes task runner.|`httpRemote`|No|
-|`druid.indexer.runner.k8sAndWorker.sendAllTasksToWorkerTaskRunner`|`boolean`| 
Whether to send all the tasks to the worker task runner. If this is set to 
false all tasks will be sent to Kubernetes|`false`|No|
+|`druid.indexer.runner.k8sAndWorker.runnerStrategy.type`| `String` (e.g., 
`k8s`, `worker`, `taskType`)| Defines the strategy for task runner selection. 
|`k8s`|No|
+|`druid.indexer.runner.k8sAndWorker.runnerStrategy.workerType`| `String` 
(e.g., `httpRemote`, `remote`)| Specifies the variant of the worker task runner 
to be utilized.|`httpRemote`|No|
+| **For `taskType` runner strategy:**|||||
+|`druid.indexer.runner.k8sAndWorker.runnerStrategy.taskType.default`| `String` 
(e.g., `k8s`, `worker`) | Specifies the default runner to use if no overrides 
apply. This setting ensures there is always a fallback runner 
available.|None|No|
+|`druid.indexer.runner.k8sAndWorker.runnerStrategy.taskType.overrides`| 
`JsonObject`(e.g., `{"index_kafka": "worker"}`)| Defines task-specific 
overrides for runner types. Each entry sets a task type to a specific runner, 
allowing fine control. |`{}`|No|
 
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
index 243f6626c66..2c45a0ec7b8 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
@@ -38,6 +38,7 @@ import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
+import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
 import org.apache.druid.tasklogs.TaskLogStreamer;
 
 import javax.annotation.Nullable;
@@ -57,17 +58,17 @@ public class KubernetesAndWorkerTaskRunner implements 
TaskLogStreamer, WorkerTas
 {
   private final KubernetesTaskRunner kubernetesTaskRunner;
   private final WorkerTaskRunner workerTaskRunner;
-  private final KubernetesAndWorkerTaskRunnerConfig 
kubernetesAndWorkerTaskRunnerConfig;
+  private final RunnerStrategy runnerStrategy;
 
   public KubernetesAndWorkerTaskRunner(
       KubernetesTaskRunner kubernetesTaskRunner,
       WorkerTaskRunner workerTaskRunner,
-      KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
+      RunnerStrategy runnerStrategy
   )
   {
     this.kubernetesTaskRunner = kubernetesTaskRunner;
     this.workerTaskRunner = workerTaskRunner;
-    this.kubernetesAndWorkerTaskRunnerConfig = 
kubernetesAndWorkerTaskRunnerConfig;
+    this.runnerStrategy = runnerStrategy;
   }
 
   @Override
@@ -101,7 +102,8 @@ public class KubernetesAndWorkerTaskRunner implements 
TaskLogStreamer, WorkerTas
   @Override
   public ListenableFuture<TaskStatus> run(Task task)
   {
-    if 
(kubernetesAndWorkerTaskRunnerConfig.isSendAllTasksToWorkerTaskRunner()) {
+    RunnerStrategy.RunnerType runnerType = 
runnerStrategy.getRunnerTypeForTask(task);
+    if (RunnerStrategy.RunnerType.WORKER_RUNNER_TYPE.equals(runnerType)) {
       return workerTaskRunner.run(task);
     } else {
       return kubernetesTaskRunner.run(task);
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java
index 8e6fb8f7c61..0ffeb0103af 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfig.java
@@ -26,56 +26,49 @@ import org.apache.commons.lang3.ObjectUtils;
 import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
 import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
 
-import javax.validation.constraints.NotNull;
+import javax.annotation.Nullable;
 
 public class KubernetesAndWorkerTaskRunnerConfig
 {
-  private static final String DEFAULT_WORKER_TASK_RUNNER_TYPE = 
HttpRemoteTaskRunnerFactory.TYPE_NAME;
-  /**
-   * Select which worker task runner to use in addition to the Kubernetes 
runner, options are httpRemote or remote.
-   */
-  @JsonProperty
-  @NotNull
-  private final String workerTaskRunnerType;
 
+  private final String RUNNER_STRATEGY_TYPE = "runnerStrategy.type";
+  private final String RUNNER_STRATEGY_WORKER_TYPE = 
"runnerStrategy.workerType";
   /**
-   * Whether or not to send tasks to the worker task runner instead of the 
Kubernetes runner.
+   * Select which runner type a task would run on, options are k8s or worker.
    */
-  @JsonProperty
-  @NotNull
-  private final boolean sendAllTasksToWorkerTaskRunner;
+  @JsonProperty(RUNNER_STRATEGY_TYPE)
+  private String runnerStrategy;
+
+  @JsonProperty(RUNNER_STRATEGY_WORKER_TYPE)
+  private String workerType;
 
   @JsonCreator
   public KubernetesAndWorkerTaskRunnerConfig(
-      @JsonProperty("workerTaskRunnerType") String workerTaskRunnerType,
-      @JsonProperty("sendAllTasksToWorkerTaskRunner") Boolean 
sendAllTasksToWorkerTaskRunner
+      @JsonProperty(RUNNER_STRATEGY_TYPE) @Nullable String runnerStrategy,
+      @JsonProperty(RUNNER_STRATEGY_WORKER_TYPE) @Nullable String workerType
   )
   {
-    this.workerTaskRunnerType = ObjectUtils.defaultIfNull(
-        workerTaskRunnerType,
-        DEFAULT_WORKER_TASK_RUNNER_TYPE
-    );
+    this.runnerStrategy = ObjectUtils.defaultIfNull(runnerStrategy, 
KubernetesTaskRunnerFactory.TYPE_NAME);
+    this.workerType = ObjectUtils.defaultIfNull(workerType, 
HttpRemoteTaskRunnerFactory.TYPE_NAME);
     Preconditions.checkArgument(
-        
this.workerTaskRunnerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME) ||
-        this.workerTaskRunnerType.equals(RemoteTaskRunnerFactory.TYPE_NAME),
-        "workerTaskRunnerType must be set to one of (%s, %s)",
+        this.workerType.equals(HttpRemoteTaskRunnerFactory.TYPE_NAME) ||
+        this.workerType.equals(RemoteTaskRunnerFactory.TYPE_NAME),
+        "workerType must be set to one of (%s, %s)",
         HttpRemoteTaskRunnerFactory.TYPE_NAME,
         RemoteTaskRunnerFactory.TYPE_NAME
     );
-    this.sendAllTasksToWorkerTaskRunner = ObjectUtils.defaultIfNull(
-        sendAllTasksToWorkerTaskRunner,
-        false
-    );
   }
 
-  public String getWorkerTaskRunnerType()
+  @JsonProperty(RUNNER_STRATEGY_TYPE)
+  public String getRunnerStrategy()
   {
-    return workerTaskRunnerType;
+    return runnerStrategy;
   }
 
-  public boolean isSendAllTasksToWorkerTaskRunner()
+  @JsonProperty(RUNNER_STRATEGY_WORKER_TYPE)
+  public String getWorkerType()
   {
-    return sendAllTasksToWorkerTaskRunner;
+    return workerType;
   }
 
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
index 49ca454f50a..de6db915c8a 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactory.java
@@ -22,7 +22,9 @@ package org.apache.druid.k8s.overlord;
 import com.google.inject.Inject;
 import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
 import org.apache.druid.indexing.overlord.TaskRunnerFactory;
+import org.apache.druid.indexing.overlord.WorkerTaskRunner;
 import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
+import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
 
 
 public class KubernetesAndWorkerTaskRunnerFactory implements 
TaskRunnerFactory<KubernetesAndWorkerTaskRunner>
@@ -33,6 +35,7 @@ public class KubernetesAndWorkerTaskRunnerFactory implements 
TaskRunnerFactory<K
   private final HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory;
   private final RemoteTaskRunnerFactory remoteTaskRunnerFactory;
   private final KubernetesAndWorkerTaskRunnerConfig 
kubernetesAndWorkerTaskRunnerConfig;
+  private final RunnerStrategy runnerStrategy;
 
   private KubernetesAndWorkerTaskRunner runner;
 
@@ -41,13 +44,15 @@ public class KubernetesAndWorkerTaskRunnerFactory 
implements TaskRunnerFactory<K
       KubernetesTaskRunnerFactory kubernetesTaskRunnerFactory,
       HttpRemoteTaskRunnerFactory httpRemoteTaskRunnerFactory,
       RemoteTaskRunnerFactory remoteTaskRunnerFactory,
-      KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig
+      KubernetesAndWorkerTaskRunnerConfig kubernetesAndWorkerTaskRunnerConfig,
+      RunnerStrategy runnerStrategy
   )
   {
     this.kubernetesTaskRunnerFactory = kubernetesTaskRunnerFactory;
     this.httpRemoteTaskRunnerFactory = httpRemoteTaskRunnerFactory;
     this.remoteTaskRunnerFactory = remoteTaskRunnerFactory;
     this.kubernetesAndWorkerTaskRunnerConfig = 
kubernetesAndWorkerTaskRunnerConfig;
+    this.runnerStrategy = runnerStrategy;
   }
 
   @Override
@@ -55,13 +60,19 @@ public class KubernetesAndWorkerTaskRunnerFactory 
implements TaskRunnerFactory<K
   {
     runner = new KubernetesAndWorkerTaskRunner(
         kubernetesTaskRunnerFactory.build(),
-        
HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(kubernetesAndWorkerTaskRunnerConfig.getWorkerTaskRunnerType())
 ?
-            httpRemoteTaskRunnerFactory.build() : 
remoteTaskRunnerFactory.build(),
-        kubernetesAndWorkerTaskRunnerConfig
+        getWorkerTaskRunner(),
+        runnerStrategy
     );
     return runner;
   }
 
+  private WorkerTaskRunner getWorkerTaskRunner()
+  {
+    String workerType = kubernetesAndWorkerTaskRunnerConfig.getWorkerType();
+    return HttpRemoteTaskRunnerFactory.TYPE_NAME.equals(workerType) ?
+           httpRemoteTaskRunnerFactory.build() : 
remoteTaskRunnerFactory.build();
+  }
+
   @Override
   public KubernetesAndWorkerTaskRunner get()
   {
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
index 654a1cd108c..afd9d9a7c4e 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java
@@ -20,7 +20,9 @@
 package org.apache.druid.k8s.overlord;
 
 import com.google.inject.Binder;
+import com.google.inject.Inject;
 import com.google.inject.Key;
+import com.google.inject.Provider;
 import com.google.inject.Provides;
 import com.google.inject.multibindings.MapBinder;
 import io.fabric8.kubernetes.client.Config;
@@ -29,6 +31,7 @@ import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.guice.Binders;
 import org.apache.druid.guice.IndexingServiceModuleHelper;
 import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.guice.JsonConfigurator;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.PolyBind;
 import org.apache.druid.guice.annotations.LoadScope;
@@ -37,27 +40,35 @@ import 
org.apache.druid.indexing.common.tasklogs.FileTaskLogs;
 import org.apache.druid.indexing.overlord.TaskRunnerFactory;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.k8s.overlord.common.DruidKubernetesClient;
+import org.apache.druid.k8s.overlord.runnerstrategy.RunnerStrategy;
 import org.apache.druid.tasklogs.NoopTaskLogs;
 import org.apache.druid.tasklogs.TaskLogKiller;
 import org.apache.druid.tasklogs.TaskLogPusher;
 import org.apache.druid.tasklogs.TaskLogs;
 
+import java.util.Properties;
+
 
 @LoadScope(roles = NodeRole.OVERLORD_JSON_NAME)
 public class KubernetesOverlordModule implements DruidModule
 {
 
   private static final Logger log = new Logger(KubernetesOverlordModule.class);
+  private static final String K8SANDWORKER_PROPERTIES_PREFIX = 
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX
+                                                               + 
".k8sAndWorker";
+  private static final String RUNNERSTRATEGY_PROPERTIES_FORMAT_STRING = 
K8SANDWORKER_PROPERTIES_PREFIX
+                                                                        + 
".runnerStrategy.%s";
 
   @Override
   public void configure(Binder binder)
   {
     // druid.indexer.runner.type=k8s
     JsonConfigProvider.bind(binder, 
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX, 
KubernetesTaskRunnerConfig.class);
-    JsonConfigProvider.bind(binder, 
IndexingServiceModuleHelper.INDEXER_RUNNER_PROPERTY_PREFIX + ".k8sAndWorker", 
KubernetesAndWorkerTaskRunnerConfig.class);
+    JsonConfigProvider.bind(binder, K8SANDWORKER_PROPERTIES_PREFIX, 
KubernetesAndWorkerTaskRunnerConfig.class);
     JsonConfigProvider.bind(binder, "druid.indexer.queue", 
TaskQueueConfig.class);
     PolyBind.createChoice(
         binder,
@@ -78,6 +89,9 @@ public class KubernetesOverlordModule implements DruidModule
         .in(LazySingleton.class);
     binder.bind(KubernetesTaskRunnerFactory.class).in(LazySingleton.class);
     
binder.bind(KubernetesAndWorkerTaskRunnerFactory.class).in(LazySingleton.class);
+    binder.bind(RunnerStrategy.class)
+          .toProvider(RunnerStrategyProvider.class)
+          .in(LazySingleton.class);
     configureTaskLogs(binder);
   }
 
@@ -116,6 +130,45 @@ public class KubernetesOverlordModule implements 
DruidModule
     return client;
   }
 
+  private static class RunnerStrategyProvider implements 
Provider<RunnerStrategy>
+  {
+    private KubernetesAndWorkerTaskRunnerConfig runnerConfig;
+    private Properties props;
+    private JsonConfigurator configurator;
+
+    @Inject
+    public void inject(
+        KubernetesAndWorkerTaskRunnerConfig runnerConfig,
+        Properties props,
+        JsonConfigurator configurator
+    )
+    {
+      this.runnerConfig = runnerConfig;
+      this.props = props;
+      this.configurator = configurator;
+    }
+
+    @Override
+    public RunnerStrategy get()
+    {
+      String runnerStrategy = runnerConfig.getRunnerStrategy();
+
+      final String runnerStrategyPropertyBase = StringUtils.format(
+          RUNNERSTRATEGY_PROPERTIES_FORMAT_STRING,
+          runnerStrategy
+      );
+      final JsonConfigProvider<RunnerStrategy> provider = 
JsonConfigProvider.of(
+          runnerStrategyPropertyBase,
+          RunnerStrategy.class
+      );
+
+      props.put(runnerStrategyPropertyBase + ".type", runnerStrategy);
+      provider.inject(props, configurator);
+
+      return provider.get();
+    }
+  }
+
   private void configureTaskLogs(Binder binder)
   {
     PolyBind.createChoice(binder, "druid.indexer.logs.type", 
Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategy.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategy.java
new file mode 100644
index 00000000000..8b0a6374ad4
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.indexing.common.task.Task;
+
+/**
+ * Implementation of {@link RunnerStrategy} that always selects the Kubernetes 
runner type for tasks.
+ *
+ * <p>This strategy is specific for tasks that are intended to be executed in 
a Kubernetes environment.
+ * Regardless of task specifics, this strategy always returns {@link 
RunnerType#KUBERNETES_RUNNER_TYPE}.
+ */
+public class KubernetesRunnerStrategy implements RunnerStrategy
+{
+  @JsonCreator
+  public KubernetesRunnerStrategy()
+  {
+  }
+
+  @Override
+  public RunnerType getRunnerTypeForTask(Task task)
+  {
+    return RunnerType.KUBERNETES_RUNNER_TYPE;
+  }
+}
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/RunnerStrategy.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/RunnerStrategy.java
new file mode 100644
index 00000000000..5aa2bc4723a
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/RunnerStrategy.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerFactory;
+
+/**
+ * Strategy interface for selecting the appropriate runner type based on the 
task spec or specific context conditions.
+ *
+ * <p>This interface is part of a strategy pattern and is implemented by 
different classes that handle
+ * the logic of selecting a runner type based on various criteria. Each task 
submitted to the system
+ * will pass through the strategy implementation to determine the correct 
runner for execution.
+ *
+ * <p>The strategy uses {@link RunnerType} as a standardized way of referring 
to and managing different types of runners.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = 
KubernetesRunnerStrategy.class)
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "k8s", value = KubernetesRunnerStrategy.class),
+    @JsonSubTypes.Type(name = "worker", value = WorkerRunnerStrategy.class),
+    @JsonSubTypes.Type(name = "taskType", value = TaskTypeRunnerStrategy.class)
+})
+public interface RunnerStrategy
+{
+  String WORKER_NAME = "worker";
+
+  /**
+   * Enumerates the available runner types, each associated with a specific 
method of task execution.
+   * These runner types are used by the strategies to make decisions and by 
the system to route tasks appropriately.
+   */
+  enum RunnerType
+  {
+    KUBERNETES_RUNNER_TYPE(KubernetesTaskRunnerFactory.TYPE_NAME),
+    WORKER_RUNNER_TYPE(WORKER_NAME);
+
+    private final String type;
+
+    RunnerType(String type)
+    {
+      this.type = type;
+    }
+
+    public String getType()
+    {
+      return type;
+    }
+  }
+
+  /**
+   * Analyzes the task and determines the appropriate runner type for 
executing it.
+   *
+   * @param task The task that needs to be executed.
+   * @return The runner type deemed most suitable for executing the task.
+   */
+  RunnerType getRunnerTypeForTask(Task task);
+}
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategy.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategy.java
new file mode 100644
index 00000000000..6a16314be5b
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategy.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.indexing.common.task.Task;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * Implementation of {@link RunnerStrategy} that allows dynamic selection of 
runner type based on task type.
+ *
+ * <p>This strategy checks each task's type against a set of overrides to 
determine the appropriate runner type.
+ * If no override is specified for a task's type, it uses a default runner.
+ *
+ * <p>Runner types are determined based on configurations provided at 
construction, including default runner
+ * type and specific overrides per task type. This strategy is designed for 
environments where tasks may require
+ * different execution environments (e.g., Kubernetes or worker nodes).
+ */
+public class TaskTypeRunnerStrategy implements RunnerStrategy
+{
+  @Nullable
+  private final Map<String, String> overrides;
+  private final RunnerStrategy kubernetesRunnerStrategy = new 
KubernetesRunnerStrategy();
+  private WorkerRunnerStrategy workerRunnerStrategy;
+  private final RunnerStrategy defaultRunnerStrategy;
+  private final String defaultRunner;
+
+  @JsonCreator
+  public TaskTypeRunnerStrategy(
+      @JsonProperty("default") String defaultRunner,
+      @JsonProperty("overrides") @Nullable Map<String, String> overrides
+  )
+  {
+    Preconditions.checkNotNull(defaultRunner);
+    workerRunnerStrategy = new WorkerRunnerStrategy();
+    defaultRunnerStrategy = 
RunnerType.WORKER_RUNNER_TYPE.getType().equals(defaultRunner) ?
+                            workerRunnerStrategy : kubernetesRunnerStrategy;
+    validate(overrides);
+    this.defaultRunner = defaultRunner;
+    this.overrides = overrides;
+  }
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public Map<String, String> getOverrides()
+  {
+    return overrides;
+  }
+
+  @JsonProperty
+  public String getDefault()
+  {
+    return defaultRunner;
+  }
+
+  @Override
+  public RunnerType getRunnerTypeForTask(Task task)
+  {
+    String runnerType = null;
+    if (overrides != null) {
+      runnerType = overrides.get(task.getType());
+    }
+
+    RunnerStrategy runnerStrategy = getRunnerSelectStrategy(runnerType);
+    return runnerStrategy.getRunnerTypeForTask(task);
+  }
+
+  private RunnerStrategy getRunnerSelectStrategy(String runnerType)
+  {
+    if (runnerType == null) {
+      return defaultRunnerStrategy;
+    }
+
+    if (WORKER_NAME.equals(runnerType)) {
+      return workerRunnerStrategy;
+    } else {
+      return kubernetesRunnerStrategy;
+    }
+  }
+
+  private void validate(Map<String, String> overrides)
+  {
+    if (overrides == null) {
+      return;
+    }
+
+    boolean hasValidRunnerType =
+        overrides.values().stream().allMatch(v -> 
RunnerType.WORKER_RUNNER_TYPE.getType().equals(v)
+                                                  || 
RunnerType.KUBERNETES_RUNNER_TYPE.getType().equals(v));
+    Preconditions.checkArgument(
+        hasValidRunnerType,
+        "Invalid config in 'overrides'. Each runner type must be either '%s' 
or '%s'.",
+        RunnerType.WORKER_RUNNER_TYPE.getType(),
+        RunnerType.KUBERNETES_RUNNER_TYPE.getType()
+    );
+  }
+
+  @Override
+  public String toString()
+  {
+    return "TaskTypeRunnerStrategy{" +
+           "default=" + defaultRunner +
+           ", overrides=" + overrides +
+           '}';
+  }
+}
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategy.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategy.java
new file mode 100644
index 00000000000..bd06f91aa8f
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.druid.indexing.common.task.Task;
+
+/**
+ * Implementation of {@link RunnerStrategy} that always selects the Worker 
runner type for tasks.
+ *
+ * <p>This strategy is specific for tasks that are intended to be executed in 
a Worker environment.
+ * Regardless of task specifics, this strategy always returns {@link 
RunnerType#WORKER_RUNNER_TYPE}.
+ */
+public class WorkerRunnerStrategy implements RunnerStrategy
+{
+  @JsonCreator
+  public WorkerRunnerStrategy()
+  {
+  }
+
+  @Override
+  public RunnerType getRunnerTypeForTask(Task task)
+  {
+    return RunnerType.WORKER_RUNNER_TYPE;
+  }
+}
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
index 979aba69291..8ad631682f9 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerConfigTest.java
@@ -20,8 +20,6 @@
 package org.apache.druid.k8s.overlord;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
-import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.junit.Assert;
 import org.junit.Test;
@@ -39,9 +37,8 @@ public class KubernetesAndWorkerTaskRunnerConfigTest
         KubernetesAndWorkerTaskRunnerConfig.class
     );
 
-    Assert.assertEquals(RemoteTaskRunnerFactory.TYPE_NAME, 
config.getWorkerTaskRunnerType());
-    Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner());
-
+    Assert.assertEquals("worker", config.getRunnerStrategy());
+    Assert.assertEquals("remote", config.getWorkerType());
   }
 
   @Test
@@ -49,7 +46,6 @@ public class KubernetesAndWorkerTaskRunnerConfigTest
   {
     KubernetesAndWorkerTaskRunnerConfig config = new 
KubernetesAndWorkerTaskRunnerConfig(null, null);
 
-    Assert.assertEquals(HttpRemoteTaskRunnerFactory.TYPE_NAME, 
config.getWorkerTaskRunnerType());
-    Assert.assertFalse(config.isSendAllTasksToWorkerTaskRunner());
+    Assert.assertEquals(KubernetesTaskRunnerFactory.TYPE_NAME, 
config.getRunnerStrategy());
   }
 }
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
index c8e6d3afa03..88696017d05 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerFactoryTest.java
@@ -22,6 +22,8 @@ package org.apache.druid.k8s.overlord;
 
 import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory;
 import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunnerFactory;
+import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy;
+import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.EasyMockSupport;
@@ -45,7 +47,8 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest extends 
EasyMockSupport
         kubernetesTaskRunnerFactory,
         httpRemoteTaskRunnerFactory,
         remoteTaskRunnerFactory,
-        new KubernetesAndWorkerTaskRunnerConfig(null, null)
+        new KubernetesAndWorkerTaskRunnerConfig(null, null),
+        new WorkerRunnerStrategy()
     );
 
     EasyMock.expect(httpRemoteTaskRunnerFactory.build()).andReturn(null);
@@ -63,7 +66,8 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest extends 
EasyMockSupport
         kubernetesTaskRunnerFactory,
         httpRemoteTaskRunnerFactory,
         remoteTaskRunnerFactory,
-        new KubernetesAndWorkerTaskRunnerConfig("remote", null)
+        new KubernetesAndWorkerTaskRunnerConfig(null, "remote"),
+        new WorkerRunnerStrategy()
     );
 
     EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
@@ -81,7 +85,8 @@ public class KubernetesAndWorkerTaskRunnerFactoryTest extends 
EasyMockSupport
         kubernetesTaskRunnerFactory,
         httpRemoteTaskRunnerFactory,
         remoteTaskRunnerFactory,
-        new KubernetesAndWorkerTaskRunnerConfig("noop", null)
+        new KubernetesAndWorkerTaskRunnerConfig(null, "noop"),
+        new KubernetesRunnerStrategy()
     );
 
     EasyMock.expect(remoteTaskRunnerFactory.build()).andReturn(null);
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
index af5a6c39bb0..3ab515cc6e5 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
@@ -31,6 +31,9 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.hrtr.HttpRemoteTaskRunner;
+import org.apache.druid.k8s.overlord.runnerstrategy.KubernetesRunnerStrategy;
+import org.apache.druid.k8s.overlord.runnerstrategy.TaskTypeRunnerStrategy;
+import org.apache.druid.k8s.overlord.runnerstrategy.WorkerRunnerStrategy;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
 import org.easymock.EasyMockSupport;
@@ -67,7 +70,7 @@ public class KubernetesAndWorkerTaskRunnerTest extends 
EasyMockSupport
     runner = new KubernetesAndWorkerTaskRunner(
         kubernetesTaskRunner,
         workerTaskRunner,
-        new KubernetesAndWorkerTaskRunnerConfig(null, null)
+        new KubernetesRunnerStrategy()
     );
   }
 
@@ -77,7 +80,7 @@ public class KubernetesAndWorkerTaskRunnerTest extends 
EasyMockSupport
     KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new 
KubernetesAndWorkerTaskRunner(
         kubernetesTaskRunner,
         workerTaskRunner,
-        new KubernetesAndWorkerTaskRunnerConfig(null, false)
+        new KubernetesRunnerStrategy()
     );
     TaskStatus taskStatus = TaskStatus.success(ID);
     
EasyMock.expect(kubernetesTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus));
@@ -93,7 +96,7 @@ public class KubernetesAndWorkerTaskRunnerTest extends 
EasyMockSupport
     KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new 
KubernetesAndWorkerTaskRunner(
         kubernetesTaskRunner,
         workerTaskRunner,
-        new KubernetesAndWorkerTaskRunnerConfig(null, true)
+        new WorkerRunnerStrategy()
     );
     TaskStatus taskStatus = TaskStatus.success(ID);
     
EasyMock.expect(workerTaskRunner.run(task)).andReturn(Futures.immediateFuture(taskStatus));
@@ -103,6 +106,33 @@ public class KubernetesAndWorkerTaskRunnerTest extends 
EasyMockSupport
     verifyAll();
   }
 
+  @Test
+  public void test_runOnKubernetesOrWorkerBasedOnStrategy() throws 
ExecutionException, InterruptedException
+  {
+    TaskTypeRunnerStrategy runnerStrategy = new TaskTypeRunnerStrategy("k8s", 
ImmutableMap.of("index_kafka", "worker"));
+    KubernetesAndWorkerTaskRunner kubernetesAndWorkerTaskRunner = new 
KubernetesAndWorkerTaskRunner(
+        kubernetesTaskRunner,
+        workerTaskRunner,
+        runnerStrategy
+    );
+    Task taskMock = EasyMock.createMock(Task.class);
+    TaskStatus taskStatus = TaskStatus.success(ID);
+    EasyMock.expect(taskMock.getId()).andReturn(ID).anyTimes();
+
+    EasyMock.expect(taskMock.getType()).andReturn("index_kafka").once();
+    
EasyMock.expect(workerTaskRunner.run(taskMock)).andReturn(Futures.immediateFuture(taskStatus)).once();
+    EasyMock.replay(taskMock, workerTaskRunner);
+    Assert.assertEquals(taskStatus, 
kubernetesAndWorkerTaskRunner.run(taskMock).get());
+    EasyMock.verify(taskMock, workerTaskRunner);
+    EasyMock.reset(taskMock, workerTaskRunner);
+
+    EasyMock.expect(taskMock.getType()).andReturn("compact").once();
+    
EasyMock.expect(kubernetesTaskRunner.run(taskMock)).andReturn(Futures.immediateFuture(taskStatus)).once();
+    EasyMock.replay(taskMock, kubernetesTaskRunner);
+    Assert.assertEquals(taskStatus, 
kubernetesAndWorkerTaskRunner.run(taskMock).get());
+    EasyMock.verify(taskMock, kubernetesTaskRunner);
+  }
+
   @Test
   public void test_getUsedCapacity()
   {
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategyTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategyTest.java
new file mode 100644
index 00000000000..880d5528ac7
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/KubernetesRunnerStrategyTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import org.apache.druid.indexing.common.task.Task;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class KubernetesRunnerStrategyTest extends EasyMockSupport
+{
+  @Mock
+  Task task;
+
+  @Test
+  public void test_kubernetesRunnerStrategy_returnsCorrectRunnerType()
+  {
+    KubernetesRunnerStrategy runnerStrategy = new KubernetesRunnerStrategy();
+
+    Assert.assertEquals(RunnerStrategy.RunnerType.KUBERNETES_RUNNER_TYPE, 
runnerStrategy.getRunnerTypeForTask(task));
+  }
+}
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategyTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategyTest.java
new file mode 100644
index 00000000000..a32630ed614
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/TaskTypeRunnerStrategyTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.k8s.overlord.KubernetesTaskRunnerFactory;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class TaskTypeRunnerStrategyTest extends EasyMockSupport
+{
+  @Mock
+  Task task;
+
+  @Test
+  public void test_taskTypeRunnerStrategy_returnsCorrectRunnerType()
+  {
+    TaskTypeRunnerStrategy runnerStrategy = new TaskTypeRunnerStrategy("k8s", 
ImmutableMap.of("index_kafka", "worker"));
+    EasyMock.expect(task.getType()).andReturn("index_kafka");
+    EasyMock.expectLastCall().once();
+    EasyMock.expect(task.getType()).andReturn("compact");
+    EasyMock.expectLastCall().once();
+    replayAll();
+    Assert.assertEquals(RunnerStrategy.WORKER_NAME, 
runnerStrategy.getRunnerTypeForTask(task).getType());
+    Assert.assertEquals(KubernetesTaskRunnerFactory.TYPE_NAME, 
runnerStrategy.getRunnerTypeForTask(task).getType());
+    verifyAll();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void test_invalidOverridesConfig_shouldThrowException()
+  {
+    new TaskTypeRunnerStrategy(
+        "k8s",
+        ImmutableMap.of(
+            "index_kafka",
+            "non_exist_runner"
+        )
+    );
+  }
+}
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategyTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategyTest.java
new file mode 100644
index 00000000000..1a3ae34fc6a
--- /dev/null
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/runnerstrategy/WorkerRunnerStrategyTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.k8s.overlord.runnerstrategy;
+
+
+import org.apache.druid.indexing.common.task.Task;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class WorkerRunnerStrategyTest extends EasyMockSupport
+{
+  @Mock
+  Task task;
+
+  @Test
+  public void test_workerRunnerStrategy_returnsCorrectRunnerType()
+  {
+    WorkerRunnerStrategy runnerStrategy = new WorkerRunnerStrategy();
+    Assert.assertEquals(RunnerStrategy.WORKER_NAME, 
runnerStrategy.getRunnerTypeForTask(task).getType());
+  }
+}
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
index 757b07ebda5..43e7414f11f 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/kubernetesAndWorkerTaskRunnerConfig.json
@@ -1,4 +1,4 @@
 {
-  "workerTaskRunnerType": "remote",
-  "sendAllTasksToWorkerTaskRunner": false
+  "runnerStrategy.type": "worker",
+  "runnerStrategy.workerType": "remote"
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to