This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d7c7a87ec8 Add maximumCapacity to taskRunner (#17107)
5d7c7a87ec8 is described below

commit 5d7c7a87ec88a9e837b3a1b2779ac2c5a0c03e73
Author: George Shiqi Wu <[email protected]>
AuthorDate: Mon Oct 7 15:03:51 2024 -0400

    Add maximumCapacity to taskRunner (#17107)
    
    * Add maximumCapacity to taskRunner
    
    * fix tests
    
    * pr comments
---
 .../overlord/KubernetesAndWorkerTaskRunner.java    |   6 +
 .../KubernetesAndWorkerTaskRunnerTest.java         |  10 ++
 .../druid/indexing/overlord/RemoteTaskRunner.java  |  30 +++++
 .../druid/indexing/overlord/TaskQueryTool.java     |  42 +------
 .../apache/druid/indexing/overlord/TaskRunner.java |   9 ++
 .../overlord/hrtr/HttpRemoteTaskRunner.java        |  31 +++++
 .../indexing/overlord/RemoteTaskRunnerTest.java    |  43 +++++++
 .../overlord/RemoteTaskRunnerTestUtils.java        |  17 ++-
 .../overlord/TestProvisioningStrategy.java         |  57 +++++++++
 .../overlord/hrtr/HttpRemoteTaskRunnerTest.java    |  76 ++++++++++++
 .../overlord/http/OverlordResourceTest.java        | 137 +++------------------
 11 files changed, 297 insertions(+), 161 deletions(-)

diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
index 2c45a0ec7b8..767afaa2fd3 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
@@ -268,6 +268,12 @@ public class KubernetesAndWorkerTaskRunner implements 
TaskLogStreamer, WorkerTas
     return Math.max(0, k8sCapacity) + Math.max(0, workerCapacity);
   }
 
+  @Override
+  public int getMaximumCapacityWithAutoscale()
+  {
+    return workerTaskRunner.getMaximumCapacityWithAutoscale() + 
kubernetesTaskRunner.getMaximumCapacityWithAutoscale();
+  }
+
   @Override
   public int getUsedCapacity()
   {
diff --git 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
index 3ab515cc6e5..1cc3be34e38 100644
--- 
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
+++ 
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
@@ -371,4 +371,14 @@ public class KubernetesAndWorkerTaskRunnerTest extends 
EasyMockSupport
     runner.updateLocation(task, TaskLocation.unknown());
     verifyAll();
   }
+
+  @Test
+  public void test_getMaximumCapacity()
+  {
+    
EasyMock.expect(kubernetesTaskRunner.getMaximumCapacityWithAutoscale()).andReturn(1);
+    
EasyMock.expect(workerTaskRunner.getMaximumCapacityWithAutoscale()).andReturn(1);
+    replayAll();
+    Assert.assertEquals(2, runner.getMaximumCapacityWithAutoscale());
+    verifyAll();
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 8ef718e97be..1d06321ddc4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -58,6 +58,7 @@ import 
org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
 import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
 import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
 import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
 import org.apache.druid.indexing.worker.TaskAnnouncement;
@@ -1648,6 +1649,35 @@ public class RemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
     return getWorkers().stream().mapToInt(workerInfo -> 
workerInfo.getWorker().getCapacity()).sum();
   }
 
+  /**
+   * Retrieves the maximum capacity of the task runner when autoscaling is 
enabled.*
+   * @return The maximum capacity as an integer value. Returns -1 if the 
maximum
+   *         capacity cannot be determined or if autoscaling is not enabled.
+   */
+  @Override
+  public int getMaximumCapacityWithAutoscale()
+  {
+    int maximumCapacity = -1;
+    WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
+    if (workerBehaviorConfig == null) {
+      // Auto scale not setup
+      log.debug("Cannot calculate maximum worker capacity as worker behavior 
config is not configured");
+      maximumCapacity = -1;
+    } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
+      DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = 
(DefaultWorkerBehaviorConfig) workerBehaviorConfig;
+      if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
+        // Auto scale not setup
+        log.debug("Cannot calculate maximum worker capacity as auto scaler not 
configured");
+        maximumCapacity = -1;
+      } else {
+        int maxWorker = 
defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
+        int expectedWorkerCapacity = 
provisioningStrategy.getExpectedWorkerCapacity(getWorkers());
+        maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * 
expectedWorkerCapacity;
+      }
+    }
+    return maximumCapacity;
+  }
+
   @Override
   public int getUsedCapacity()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
index 4c37af7ef16..3613b6fa08d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.overlord;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.inject.Inject;
@@ -33,7 +32,6 @@ import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
 import org.apache.druid.indexing.overlord.http.TaskStateLookup;
 import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
-import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
 import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.IAE;
@@ -49,7 +47,6 @@ import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -377,40 +374,11 @@ public class TaskQueryTool
     }
     TaskRunner taskRunner = taskRunnerOptional.get();
 
-    Collection<ImmutableWorkerInfo> workers = taskRunner instanceof 
WorkerTaskRunner ?
-                                              ((WorkerTaskRunner) 
taskRunner).getWorkers() : ImmutableList.of();
-
-    int currentCapacity = taskRunner.getTotalCapacity();
-    int usedCapacity = taskRunner.getUsedCapacity();
-    // Calculate maximum capacity with auto scale
-    int maximumCapacity;
-    WorkerBehaviorConfig workerBehaviorConfig = getLatestWorkerConfig();
-    if (workerBehaviorConfig == null) {
-      // Auto scale not setup
-      log.debug("Cannot calculate maximum worker capacity as worker behavior 
config is not configured");
-      maximumCapacity = -1;
-    } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
-      DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = 
(DefaultWorkerBehaviorConfig) workerBehaviorConfig;
-      if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
-        // Auto scale not setup
-        log.debug("Cannot calculate maximum worker capacity as auto scaler not 
configured");
-        maximumCapacity = -1;
-      } else {
-        int maxWorker = 
defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
-        int expectedWorkerCapacity = 
provisioningStrategy.getExpectedWorkerCapacity(workers);
-        maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * 
expectedWorkerCapacity;
-      }
-    } else {
-      // Auto-scale is not using DefaultWorkerBehaviorConfig
-      log.debug(
-          "Cannot calculate maximum worker capacity as WorkerBehaviorConfig 
[%s] of type [%s] does not support getting max capacity",
-          workerBehaviorConfig,
-          workerBehaviorConfig.getClass().getSimpleName()
-      );
-      maximumCapacity = -1;
-    }
-
-    return new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, 
usedCapacity);
+    return new TotalWorkerCapacityResponse(
+        taskRunner.getTotalCapacity(),
+        taskRunner.getMaximumCapacityWithAutoscale(),
+        taskRunner.getUsedCapacity()
+    );
   }
 
   public WorkerBehaviorConfig getLatestWorkerConfig()
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
index ac1fd124ef5..2178bc433df 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
@@ -155,6 +155,15 @@ public interface TaskRunner
     return -1;
   }
 
+  /**
+   * The maximum number of tasks this TaskRunner can run concurrently with 
autoscaling hints.
+   * @return -1 if this method is not implemented or capacity can't be found.
+   */
+  default int getMaximumCapacityWithAutoscale()
+  {
+    return -1;
+  }
+
   /**
    * The current number of tasks this TaskRunner is running.
    * Can return -1 if this method is not implemented or the # of tasks can't 
be found.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index a2edb1eb6d1..72bd9cff174 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -64,6 +64,7 @@ import 
org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
 import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
 import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
 import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
 import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
 import org.apache.druid.indexing.worker.TaskAnnouncement;
@@ -1791,6 +1792,36 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer
     return getWorkers().stream().mapToInt(workerInfo -> 
workerInfo.getWorker().getCapacity()).sum();
   }
 
+
+  /**
+   * Retrieves the maximum capacity of the task runner when autoscaling is 
enabled.*
+   * @return The maximum capacity as an integer value. Returns -1 if the 
maximum
+   *         capacity cannot be determined or if autoscaling is not enabled.
+   */
+  @Override
+  public int getMaximumCapacityWithAutoscale()
+  {
+    int maximumCapacity = -1;
+    WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
+    if (workerBehaviorConfig == null) {
+      // Auto scale not setup
+      log.debug("Cannot calculate maximum worker capacity as worker behavior 
config is not configured");
+      maximumCapacity = -1;
+    } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
+      DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = 
(DefaultWorkerBehaviorConfig) workerBehaviorConfig;
+      if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
+        // Auto scale not setup
+        log.debug("Cannot calculate maximum worker capacity as auto scaler not 
configured");
+        maximumCapacity = -1;
+      } else {
+        int maxWorker = 
defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
+        int expectedWorkerCapacity = 
provisioningStrategy.getExpectedWorkerCapacity(getWorkers());
+        maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * 
expectedWorkerCapacity;
+      }
+    }
+    return maximumCapacity;
+  }
+
   @Override
   public int getUsedCapacity()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index c1a77c1f4b4..2ab63cd79ee 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -47,6 +47,8 @@ import 
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceActio
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
+import 
org.apache.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy;
 import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
@@ -153,6 +155,7 @@ public class RemoteTaskRunnerTest
     Assert.assertEquals(3, 
remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
     Assert.assertEquals(0, 
remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
     Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity());
+    Assert.assertEquals(-1, 
remoteTaskRunner.getMaximumCapacityWithAutoscale());
     Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity());
 
 
@@ -608,6 +611,46 @@ public class RemoteTaskRunnerTest
     );
   }
 
+  @Test
+  public void testGetMaximumCapacity_noWorkerConfig()
+  {
+    httpClient = EasyMock.createMock(HttpClient.class);
+    remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
+        new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
+        new TestProvisioningStrategy<>(),
+        httpClient,
+        null
+    );
+    Assert.assertEquals(-1, 
remoteTaskRunner.getMaximumCapacityWithAutoscale());
+  }
+
+  @Test
+  public void testGetMaximumCapacity_noAutoScaler()
+  {
+    httpClient = EasyMock.createMock(HttpClient.class);
+    remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
+        new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
+        new TestProvisioningStrategy<>(),
+        httpClient,
+        new DefaultWorkerBehaviorConfig(new 
EqualDistributionWorkerSelectStrategy(null), null)
+    );
+    Assert.assertEquals(-1, 
remoteTaskRunner.getMaximumCapacityWithAutoscale());
+  }
+
+  @Test
+  public void testGetMaximumCapacity_withAutoScaler()
+  {
+    httpClient = EasyMock.createMock(HttpClient.class);
+    remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
+        new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
+        new TestProvisioningStrategy<>(),
+        httpClient,
+        DefaultWorkerBehaviorConfig.defaultConfig()
+    );
+    // Default autoscaler has max workers of 0
+    Assert.assertEquals(0, remoteTaskRunner.getMaximumCapacityWithAutoscale());
+  }
+
   private void doSetup() throws Exception
   {
     makeWorker();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
index bdf886aa41b..7c579f19676 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
@@ -117,6 +117,21 @@ public class RemoteTaskRunnerTestUtils
       ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
       HttpClient httpClient
   )
+  {
+    return makeRemoteTaskRunner(
+        config,
+        provisioningStrategy,
+        httpClient,
+        DefaultWorkerBehaviorConfig.defaultConfig()
+    );
+  }
+
+  public RemoteTaskRunner makeRemoteTaskRunner(
+      RemoteTaskRunnerConfig config,
+      ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
+      HttpClient httpClient,
+      WorkerBehaviorConfig workerBehaviorConfig
+  )
   {
     RemoteTaskRunner remoteTaskRunner = new TestableRemoteTaskRunner(
         jsonMapper,
@@ -134,7 +149,7 @@ public class RemoteTaskRunnerTestUtils
         cf,
         new PathChildrenCacheFactory.Builder(),
         httpClient,
-        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        DSuppliers.of(new AtomicReference<>(workerBehaviorConfig)),
         provisioningStrategy
     );
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java
new file mode 100644
index 00000000000..18cade99b9d
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.indexing.overlord;
+
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+
+import javax.annotation.Nonnull;
+import java.util.Collection;
+
+
+public class TestProvisioningStrategy<T extends TaskRunner> implements 
ProvisioningStrategy<T>
+{
+  @Override
+  public ProvisioningService makeProvisioningService(T runner)
+  {
+    return new ProvisioningService()
+    {
+      @Override
+      public void close()
+      {
+        // nothing to close
+      }
+
+      @Override
+      public ScalingStats getStats()
+      {
+        return null;
+      }
+    };
+  }
+
+  @Override
+  public int getExpectedWorkerCapacity(@Nonnull 
Collection<ImmutableWorkerInfo> workers)
+  {
+    return 1;
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 1bfac9f42a3..91b0778c950 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -42,11 +42,13 @@ import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.TaskRunnerListener;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.TestProvisioningStrategy;
 import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
 import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
 import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
 import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
 import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
+import 
org.apache.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy;
 import org.apache.druid.indexing.worker.TaskAnnouncement;
 import org.apache.druid.indexing.worker.Worker;
 import org.apache.druid.indexing.worker.config.WorkerConfig;
@@ -147,6 +149,7 @@ public class HttpRemoteTaskRunnerTest
     Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size());
     Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size());
     Assert.assertEquals(4, taskRunner.getTotalCapacity());
+    Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale());
     Assert.assertEquals(0, taskRunner.getUsedCapacity());
   }
 
@@ -1778,6 +1781,79 @@ public class HttpRemoteTaskRunnerTest
     Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size());
   }
 
+  @Test
+  public void testGetMaximumCapacity_noWorkerConfig()
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+        .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig(),
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new AtomicReference<>(null)),
+        new TestProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createMock(TaskStorage.class),
+        EasyMock.createNiceMock(CuratorFramework.class),
+        new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+        new NoopServiceEmitter()
+    );
+    Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale());
+  }
+
+  @Test
+  public void testGetMaximumCapacity_noAutoScaler()
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+        .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig(),
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new AtomicReference<>(new 
DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null), 
null))),
+        new TestProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createMock(TaskStorage.class),
+        EasyMock.createNiceMock(CuratorFramework.class),
+        new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+        new NoopServiceEmitter()
+    );
+    Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale());
+  }
+
+  @Test
+  public void testGetMaximumCapacity_withAutoScaler()
+  {
+    TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+    DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = 
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+    
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+        .andReturn(druidNodeDiscovery);
+    EasyMock.replay(druidNodeDiscoveryProvider);
+
+    HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+        TestHelper.makeJsonMapper(),
+        new HttpRemoteTaskRunnerConfig(),
+        EasyMock.createNiceMock(HttpClient.class),
+        DSuppliers.of(new 
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+        new TestProvisioningStrategy<>(),
+        druidNodeDiscoveryProvider,
+        EasyMock.createMock(TaskStorage.class),
+        EasyMock.createNiceMock(CuratorFramework.class),
+        new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+        new NoopServiceEmitter()
+    );
+    // Default autoscaler has max workers of 0
+    Assert.assertEquals(0, taskRunner.getMaximumCapacityWithAutoscale());
+  }
+
   public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated(
       TaskStorage taskStorage,
       List<Object> listenerNotificationsAccumulator
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index c13a1571765..732d586002f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -43,7 +43,6 @@ import 
org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.DruidOverlord;
-import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
 import org.apache.druid.indexing.overlord.TaskLockbox;
 import org.apache.druid.indexing.overlord.TaskMaster;
@@ -52,14 +51,9 @@ import org.apache.druid.indexing.overlord.TaskQueue;
 import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
-import org.apache.druid.indexing.overlord.WorkerTaskRunner;
 import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
-import org.apache.druid.indexing.overlord.autoscaling.AutoScaler;
 import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
-import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
 import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
-import org.apache.druid.indexing.worker.Worker;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.RE;
@@ -1321,6 +1315,7 @@ public class OverlordResourceTest
             .andReturn(workerBehaviorConfigAtomicReference);
     EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
     EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
+    
EasyMock.expect(taskRunner.getMaximumCapacityWithAutoscale()).andReturn(-1);
     EasyMock.expect(overlord.isLeader()).andReturn(true);
     replayAll();
 
@@ -1332,130 +1327,26 @@ public class OverlordResourceTest
   }
 
   @Test
-  public void 
testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfigNotConfigured()
-  {
-    AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference 
= new AtomicReference<>(null);
-    EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, 
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
-    EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
-    EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
-    EasyMock.expect(overlord.isLeader()).andReturn(true);
-    replayAll();
-
-    final Response response = overlordResource.getTotalWorkerCapacity();
-    Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
-    Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) 
response.getEntity()).getCurrentClusterCapacity());
-    Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) 
response.getEntity()).getUsedClusterCapacity());
-    Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) 
response.getEntity()).getMaximumCapacityWithAutoScale());
-  }
-
-  @Test
-  public void 
testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigured()
-  {
-    DefaultWorkerBehaviorConfig workerBehaviorConfig = new 
DefaultWorkerBehaviorConfig(null, null);
-    AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference 
= new AtomicReference<>(workerBehaviorConfig);
-    EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, 
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
-    EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
-    EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
-    EasyMock.expect(overlord.isLeader()).andReturn(true);
-    replayAll();
-
-    final Response response = overlordResource.getTotalWorkerCapacity();
-    Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
-    Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) 
response.getEntity()).getCurrentClusterCapacity());
-    Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) 
response.getEntity()).getUsedClusterCapacity());
-    Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) 
response.getEntity()).getMaximumCapacityWithAutoScale());
-  }
-
-  @Test
-  public void 
testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategySupportExpectedWorkerCapacity()
+  public void testGetTotalWorkerCapacityWithMaximumCapacity()
   {
     int expectedWorkerCapacity = 3;
-    int maxNumWorkers = 2;
-    WorkerTaskRunner workerTaskRunner = 
EasyMock.createMock(WorkerTaskRunner.class);
-    Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
-        new ImmutableWorkerInfo(
-            new Worker(
-                "http", "testWorker", "192.0.0.1", expectedWorkerCapacity, 
"v1", WorkerConfig.DEFAULT_CATEGORY
-            ),
-            2,
-            ImmutableSet.of("grp1", "grp2"),
-            ImmutableSet.of("task1", "task2"),
-            DateTimes.of("2015-01-01T01:01:01Z")
-        )
-    );
-    EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
-    
EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity);
-    EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(0);
-
-    EasyMock.reset(taskMaster);
-    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
-        Optional.of(workerTaskRunner)
-    ).anyTimes();
-    
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(expectedWorkerCapacity).anyTimes();
-    AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class);
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
-    EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
-    DefaultWorkerBehaviorConfig workerBehaviorConfig = new 
DefaultWorkerBehaviorConfig(null, autoScaler);
-    AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference 
= new AtomicReference<>(workerBehaviorConfig);
-    EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, 
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
-    EasyMock.replay(
-        workerTaskRunner,
-        autoScaler
-    );
+    int expectedWorkerCapacityWithAutoscale = 10;
+    WorkerBehaviorConfig workerBehaviorConfig = 
EasyMock.createMock(WorkerBehaviorConfig.class);
+    AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
+        = new AtomicReference<>(workerBehaviorConfig);
+    EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, 
WorkerBehaviorConfig.class))
+        .andReturn(workerBehaviorConfigAtomicReference);
+    
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity);
+    
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(expectedWorkerCapacity);
+    
EasyMock.expect(taskRunner.getMaximumCapacityWithAutoscale()).andReturn(expectedWorkerCapacityWithAutoscale);
     EasyMock.expect(overlord.isLeader()).andReturn(true);
     replayAll();
-    final Response response = overlordResource.getTotalWorkerCapacity();
-    Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
-    Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) 
response.getEntity()).getCurrentClusterCapacity());
-    Assert.assertEquals(0, ((TotalWorkerCapacityResponse) 
response.getEntity()).getUsedClusterCapacity());
-    Assert.assertEquals(expectedWorkerCapacity * maxNumWorkers, 
((TotalWorkerCapacityResponse) 
response.getEntity()).getMaximumCapacityWithAutoScale());
-  }
 
-  @Test
-  public void 
testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategyNotSupportExpectedWorkerCapacity()
-  {
-    int invalidExpectedCapacity = -1;
-    int currentTotalCapacity = 3;
-    int currentCapacityUsed = 2;
-    int maxNumWorkers = 2;
-    WorkerTaskRunner workerTaskRunner = 
EasyMock.createMock(WorkerTaskRunner.class);
-    Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
-        new ImmutableWorkerInfo(
-            new Worker(
-                "http", "testWorker", "192.0.0.1", currentTotalCapacity, "v1", 
WorkerConfig.DEFAULT_CATEGORY
-            ),
-            currentCapacityUsed,
-            ImmutableSet.of("grp1", "grp2"),
-            ImmutableSet.of("task1", "task2"),
-            DateTimes.of("2015-01-01T01:01:01Z")
-        )
-    );
-    EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
-    
EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(currentTotalCapacity);
-    
EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(currentCapacityUsed);
-    EasyMock.reset(taskMaster);
-    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
-        Optional.of(workerTaskRunner)
-    ).anyTimes();
-    
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(invalidExpectedCapacity).anyTimes();
-    AutoScaler<?> autoScaler = EasyMock.createMock(AutoScaler.class);
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
-    EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
-    DefaultWorkerBehaviorConfig workerBehaviorConfig = new 
DefaultWorkerBehaviorConfig(null, autoScaler);
-    AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference 
= new AtomicReference<>(workerBehaviorConfig);
-    EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, 
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
-    EasyMock.replay(
-        workerTaskRunner,
-        autoScaler
-    );
-    EasyMock.expect(overlord.isLeader()).andReturn(true);
-    replayAll();
     final Response response = overlordResource.getTotalWorkerCapacity();
     Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
-    
Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(),
 ((TotalWorkerCapacityResponse) 
response.getEntity()).getCurrentClusterCapacity());
-    Assert.assertEquals(invalidExpectedCapacity, 
((TotalWorkerCapacityResponse) 
response.getEntity()).getMaximumCapacityWithAutoScale());
-    Assert.assertEquals(currentTotalCapacity, ((TotalWorkerCapacityResponse) 
response.getEntity()).getCurrentClusterCapacity());
-    Assert.assertEquals(currentCapacityUsed, ((TotalWorkerCapacityResponse) 
response.getEntity()).getUsedClusterCapacity());
+    Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) 
response.getEntity()).getCurrentClusterCapacity());
+    Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) 
response.getEntity()).getUsedClusterCapacity());
+    Assert.assertEquals(expectedWorkerCapacityWithAutoscale, 
((TotalWorkerCapacityResponse) 
response.getEntity()).getMaximumCapacityWithAutoScale());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to