This is an automated email from the ASF dual-hosted git repository.
georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5d7c7a87ec8 Add maximumCapacity to taskRunner (#17107)
5d7c7a87ec8 is described below
commit 5d7c7a87ec88a9e837b3a1b2779ac2c5a0c03e73
Author: George Shiqi Wu <[email protected]>
AuthorDate: Mon Oct 7 15:03:51 2024 -0400
Add maximumCapacity to taskRunner (#17107)
* Add maximumCapacity to taskRunner
* fix tests
* pr comments
---
.../overlord/KubernetesAndWorkerTaskRunner.java | 6 +
.../KubernetesAndWorkerTaskRunnerTest.java | 10 ++
.../druid/indexing/overlord/RemoteTaskRunner.java | 30 +++++
.../druid/indexing/overlord/TaskQueryTool.java | 42 +------
.../apache/druid/indexing/overlord/TaskRunner.java | 9 ++
.../overlord/hrtr/HttpRemoteTaskRunner.java | 31 +++++
.../indexing/overlord/RemoteTaskRunnerTest.java | 43 +++++++
.../overlord/RemoteTaskRunnerTestUtils.java | 17 ++-
.../overlord/TestProvisioningStrategy.java | 57 +++++++++
.../overlord/hrtr/HttpRemoteTaskRunnerTest.java | 76 ++++++++++++
.../overlord/http/OverlordResourceTest.java | 137 +++------------------
11 files changed, 297 insertions(+), 161 deletions(-)
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
index 2c45a0ec7b8..767afaa2fd3 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java
@@ -268,6 +268,12 @@ public class KubernetesAndWorkerTaskRunner implements
TaskLogStreamer, WorkerTas
return Math.max(0, k8sCapacity) + Math.max(0, workerCapacity);
}
+ @Override
+ public int getMaximumCapacityWithAutoscale()
+ {
+ return workerTaskRunner.getMaximumCapacityWithAutoscale() +
kubernetesTaskRunner.getMaximumCapacityWithAutoscale();
+ }
+
@Override
public int getUsedCapacity()
{
diff --git
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
index 3ab515cc6e5..1cc3be34e38 100644
---
a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
+++
b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java
@@ -371,4 +371,14 @@ public class KubernetesAndWorkerTaskRunnerTest extends
EasyMockSupport
runner.updateLocation(task, TaskLocation.unknown());
verifyAll();
}
+
+ @Test
+ public void test_getMaximumCapacity()
+ {
+
EasyMock.expect(kubernetesTaskRunner.getMaximumCapacityWithAutoscale()).andReturn(1);
+
EasyMock.expect(workerTaskRunner.getMaximumCapacityWithAutoscale()).andReturn(1);
+ replayAll();
+ Assert.assertEquals(2, runner.getMaximumCapacityWithAutoscale());
+ verifyAll();
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index 8ef718e97be..1d06321ddc4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -58,6 +58,7 @@ import
org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
@@ -1648,6 +1649,35 @@ public class RemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
return getWorkers().stream().mapToInt(workerInfo ->
workerInfo.getWorker().getCapacity()).sum();
}
+ /**
+ * Retrieves the maximum capacity of the task runner when autoscaling is
enabled.*
+ * @return The maximum capacity as an integer value. Returns -1 if the
maximum
+ * capacity cannot be determined or if autoscaling is not enabled.
+ */
+ @Override
+ public int getMaximumCapacityWithAutoscale()
+ {
+ int maximumCapacity = -1;
+ WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
+ if (workerBehaviorConfig == null) {
+ // Auto scale not setup
+ log.debug("Cannot calculate maximum worker capacity as worker behavior
config is not configured");
+ maximumCapacity = -1;
+ } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
+ DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig =
(DefaultWorkerBehaviorConfig) workerBehaviorConfig;
+ if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
+ // Auto scale not setup
+ log.debug("Cannot calculate maximum worker capacity as auto scaler not
configured");
+ maximumCapacity = -1;
+ } else {
+ int maxWorker =
defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
+ int expectedWorkerCapacity =
provisioningStrategy.getExpectedWorkerCapacity(getWorkers());
+ maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker *
expectedWorkerCapacity;
+ }
+ }
+ return maximumCapacity;
+ }
+
@Override
public int getUsedCapacity()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
index 4c37af7ef16..3613b6fa08d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
@@ -21,7 +21,6 @@ package org.apache.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
@@ -33,7 +32,6 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.http.TaskStateLookup;
import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
-import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
@@ -49,7 +47,6 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -377,40 +374,11 @@ public class TaskQueryTool
}
TaskRunner taskRunner = taskRunnerOptional.get();
- Collection<ImmutableWorkerInfo> workers = taskRunner instanceof
WorkerTaskRunner ?
- ((WorkerTaskRunner)
taskRunner).getWorkers() : ImmutableList.of();
-
- int currentCapacity = taskRunner.getTotalCapacity();
- int usedCapacity = taskRunner.getUsedCapacity();
- // Calculate maximum capacity with auto scale
- int maximumCapacity;
- WorkerBehaviorConfig workerBehaviorConfig = getLatestWorkerConfig();
- if (workerBehaviorConfig == null) {
- // Auto scale not setup
- log.debug("Cannot calculate maximum worker capacity as worker behavior
config is not configured");
- maximumCapacity = -1;
- } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
- DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig =
(DefaultWorkerBehaviorConfig) workerBehaviorConfig;
- if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
- // Auto scale not setup
- log.debug("Cannot calculate maximum worker capacity as auto scaler not
configured");
- maximumCapacity = -1;
- } else {
- int maxWorker =
defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
- int expectedWorkerCapacity =
provisioningStrategy.getExpectedWorkerCapacity(workers);
- maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker *
expectedWorkerCapacity;
- }
- } else {
- // Auto-scale is not using DefaultWorkerBehaviorConfig
- log.debug(
- "Cannot calculate maximum worker capacity as WorkerBehaviorConfig
[%s] of type [%s] does not support getting max capacity",
- workerBehaviorConfig,
- workerBehaviorConfig.getClass().getSimpleName()
- );
- maximumCapacity = -1;
- }
-
- return new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity,
usedCapacity);
+ return new TotalWorkerCapacityResponse(
+ taskRunner.getTotalCapacity(),
+ taskRunner.getMaximumCapacityWithAutoscale(),
+ taskRunner.getUsedCapacity()
+ );
}
public WorkerBehaviorConfig getLatestWorkerConfig()
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
index ac1fd124ef5..2178bc433df 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
@@ -155,6 +155,15 @@ public interface TaskRunner
return -1;
}
+ /**
+ * The maximum number of tasks this TaskRunner can run concurrently with
autoscaling hints.
+ * @return -1 if this method is not implemented or capacity can't be found.
+ */
+ default int getMaximumCapacityWithAutoscale()
+ {
+ return -1;
+ }
+
/**
* The current number of tasks this TaskRunner is running.
* Can return -1 if this method is not implemented or the # of tasks can't
be found.
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index a2edb1eb6d1..72bd9cff174 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -64,6 +64,7 @@ import
org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
@@ -1791,6 +1792,36 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer
return getWorkers().stream().mapToInt(workerInfo ->
workerInfo.getWorker().getCapacity()).sum();
}
+
+ /**
+ * Retrieves the maximum capacity of the task runner when autoscaling is
enabled.*
+ * @return The maximum capacity as an integer value. Returns -1 if the
maximum
+ * capacity cannot be determined or if autoscaling is not enabled.
+ */
+ @Override
+ public int getMaximumCapacityWithAutoscale()
+ {
+ int maximumCapacity = -1;
+ WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
+ if (workerBehaviorConfig == null) {
+ // Auto scale not setup
+ log.debug("Cannot calculate maximum worker capacity as worker behavior
config is not configured");
+ maximumCapacity = -1;
+ } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
+ DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig =
(DefaultWorkerBehaviorConfig) workerBehaviorConfig;
+ if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
+ // Auto scale not setup
+ log.debug("Cannot calculate maximum worker capacity as auto scaler not
configured");
+ maximumCapacity = -1;
+ } else {
+ int maxWorker =
defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
+ int expectedWorkerCapacity =
provisioningStrategy.getExpectedWorkerCapacity(getWorkers());
+ maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker *
expectedWorkerCapacity;
+ }
+ }
+ return maximumCapacity;
+ }
+
@Override
public int getUsedCapacity()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
index c1a77c1f4b4..2ab63cd79ee 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java
@@ -47,6 +47,8 @@ import
org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceActio
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
+import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
+import
org.apache.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
@@ -153,6 +155,7 @@ public class RemoteTaskRunnerTest
Assert.assertEquals(3,
remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0,
remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity());
+ Assert.assertEquals(-1,
remoteTaskRunner.getMaximumCapacityWithAutoscale());
Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity());
@@ -608,6 +611,46 @@ public class RemoteTaskRunnerTest
);
}
+ @Test
+ public void testGetMaximumCapacity_noWorkerConfig()
+ {
+ httpClient = EasyMock.createMock(HttpClient.class);
+ remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
+ new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
+ new TestProvisioningStrategy<>(),
+ httpClient,
+ null
+ );
+ Assert.assertEquals(-1,
remoteTaskRunner.getMaximumCapacityWithAutoscale());
+ }
+
+ @Test
+ public void testGetMaximumCapacity_noAutoScaler()
+ {
+ httpClient = EasyMock.createMock(HttpClient.class);
+ remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
+ new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
+ new TestProvisioningStrategy<>(),
+ httpClient,
+ new DefaultWorkerBehaviorConfig(new
EqualDistributionWorkerSelectStrategy(null), null)
+ );
+ Assert.assertEquals(-1,
remoteTaskRunner.getMaximumCapacityWithAutoscale());
+ }
+
+ @Test
+ public void testGetMaximumCapacity_withAutoScaler()
+ {
+ httpClient = EasyMock.createMock(HttpClient.class);
+ remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
+ new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
+ new TestProvisioningStrategy<>(),
+ httpClient,
+ DefaultWorkerBehaviorConfig.defaultConfig()
+ );
+ // Default autoscaler has max workers of 0
+ Assert.assertEquals(0, remoteTaskRunner.getMaximumCapacityWithAutoscale());
+ }
+
private void doSetup() throws Exception
{
makeWorker();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
index bdf886aa41b..7c579f19676 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java
@@ -117,6 +117,21 @@ public class RemoteTaskRunnerTestUtils
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
HttpClient httpClient
)
+ {
+ return makeRemoteTaskRunner(
+ config,
+ provisioningStrategy,
+ httpClient,
+ DefaultWorkerBehaviorConfig.defaultConfig()
+ );
+ }
+
+ public RemoteTaskRunner makeRemoteTaskRunner(
+ RemoteTaskRunnerConfig config,
+ ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
+ HttpClient httpClient,
+ WorkerBehaviorConfig workerBehaviorConfig
+ )
{
RemoteTaskRunner remoteTaskRunner = new TestableRemoteTaskRunner(
jsonMapper,
@@ -134,7 +149,7 @@ public class RemoteTaskRunnerTestUtils
cf,
new PathChildrenCacheFactory.Builder(),
httpClient,
- DSuppliers.of(new
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+ DSuppliers.of(new AtomicReference<>(workerBehaviorConfig)),
provisioningStrategy
);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java
new file mode 100644
index 00000000000..18cade99b9d
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.druid.indexing.overlord;
+
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
+import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
+import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+
+import javax.annotation.Nonnull;
+import java.util.Collection;
+
+
+public class TestProvisioningStrategy<T extends TaskRunner> implements
ProvisioningStrategy<T>
+{
+ @Override
+ public ProvisioningService makeProvisioningService(T runner)
+ {
+ return new ProvisioningService()
+ {
+ @Override
+ public void close()
+ {
+ // nothing to close
+ }
+
+ @Override
+ public ScalingStats getStats()
+ {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public int getExpectedWorkerCapacity(@Nonnull
Collection<ImmutableWorkerInfo> workers)
+ {
+ return 1;
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
index 1bfac9f42a3..91b0778c950 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java
@@ -42,11 +42,13 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
+import org.apache.druid.indexing.overlord.TestProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
+import
org.apache.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
@@ -147,6 +149,7 @@ public class HttpRemoteTaskRunnerTest
Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size());
Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size());
Assert.assertEquals(4, taskRunner.getTotalCapacity());
+ Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale());
Assert.assertEquals(0, taskRunner.getUsedCapacity());
}
@@ -1778,6 +1781,79 @@ public class HttpRemoteTaskRunnerTest
Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size());
}
+ @Test
+ public void testGetMaximumCapacity_noWorkerConfig()
+ {
+ TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider =
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+ .andReturn(druidNodeDiscovery);
+ EasyMock.replay(druidNodeDiscoveryProvider);
+
+ HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+ TestHelper.makeJsonMapper(),
+ new HttpRemoteTaskRunnerConfig(),
+ EasyMock.createNiceMock(HttpClient.class),
+ DSuppliers.of(new AtomicReference<>(null)),
+ new TestProvisioningStrategy<>(),
+ druidNodeDiscoveryProvider,
+ EasyMock.createMock(TaskStorage.class),
+ EasyMock.createNiceMock(CuratorFramework.class),
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
+ );
+ Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale());
+ }
+
+ @Test
+ public void testGetMaximumCapacity_noAutoScaler()
+ {
+ TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider =
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+ .andReturn(druidNodeDiscovery);
+ EasyMock.replay(druidNodeDiscoveryProvider);
+
+ HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+ TestHelper.makeJsonMapper(),
+ new HttpRemoteTaskRunnerConfig(),
+ EasyMock.createNiceMock(HttpClient.class),
+ DSuppliers.of(new AtomicReference<>(new
DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null),
null))),
+ new TestProvisioningStrategy<>(),
+ druidNodeDiscoveryProvider,
+ EasyMock.createMock(TaskStorage.class),
+ EasyMock.createNiceMock(CuratorFramework.class),
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
+ );
+ Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale());
+ }
+
+ @Test
+ public void testGetMaximumCapacity_withAutoScaler()
+ {
+ TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery();
+ DruidNodeDiscoveryProvider druidNodeDiscoveryProvider =
EasyMock.createMock(DruidNodeDiscoveryProvider.class);
+
EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY))
+ .andReturn(druidNodeDiscovery);
+ EasyMock.replay(druidNodeDiscoveryProvider);
+
+ HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner(
+ TestHelper.makeJsonMapper(),
+ new HttpRemoteTaskRunnerConfig(),
+ EasyMock.createNiceMock(HttpClient.class),
+ DSuppliers.of(new
AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
+ new TestProvisioningStrategy<>(),
+ druidNodeDiscoveryProvider,
+ EasyMock.createMock(TaskStorage.class),
+ EasyMock.createNiceMock(CuratorFramework.class),
+ new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null),
+ new NoopServiceEmitter()
+ );
+ // Default autoscaler has max workers of 0
+ Assert.assertEquals(0, taskRunner.getMaximumCapacityWithAutoscale());
+ }
+
public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated(
TaskStorage taskStorage,
List<Object> listenerNotificationsAccumulator
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index c13a1571765..732d586002f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -43,7 +43,6 @@ import
org.apache.druid.indexing.common.task.KillUnusedSegmentsTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DruidOverlord;
-import org.apache.druid.indexing.overlord.ImmutableWorkerInfo;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskMaster;
@@ -52,14 +51,9 @@ import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
-import org.apache.druid.indexing.overlord.WorkerTaskRunner;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
-import org.apache.druid.indexing.overlord.autoscaling.AutoScaler;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
-import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
-import org.apache.druid.indexing.worker.Worker;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
@@ -1321,6 +1315,7 @@ public class OverlordResourceTest
.andReturn(workerBehaviorConfigAtomicReference);
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
+
EasyMock.expect(taskRunner.getMaximumCapacityWithAutoscale()).andReturn(-1);
EasyMock.expect(overlord.isLeader()).andReturn(true);
replayAll();
@@ -1332,130 +1327,26 @@ public class OverlordResourceTest
}
@Test
- public void
testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfigNotConfigured()
- {
- AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
= new AtomicReference<>(null);
- EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
- EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
- EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
- EasyMock.expect(overlord.isLeader()).andReturn(true);
- replayAll();
-
- final Response response = overlordResource.getTotalWorkerCapacity();
- Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
- Assert.assertEquals(-1, ((TotalWorkerCapacityResponse)
response.getEntity()).getCurrentClusterCapacity());
- Assert.assertEquals(-1, ((TotalWorkerCapacityResponse)
response.getEntity()).getUsedClusterCapacity());
- Assert.assertEquals(-1, ((TotalWorkerCapacityResponse)
response.getEntity()).getMaximumCapacityWithAutoScale());
- }
-
- @Test
- public void
testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigured()
- {
- DefaultWorkerBehaviorConfig workerBehaviorConfig = new
DefaultWorkerBehaviorConfig(null, null);
- AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
= new AtomicReference<>(workerBehaviorConfig);
- EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
- EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1);
- EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1);
- EasyMock.expect(overlord.isLeader()).andReturn(true);
- replayAll();
-
- final Response response = overlordResource.getTotalWorkerCapacity();
- Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
- Assert.assertEquals(-1, ((TotalWorkerCapacityResponse)
response.getEntity()).getCurrentClusterCapacity());
- Assert.assertEquals(-1, ((TotalWorkerCapacityResponse)
response.getEntity()).getUsedClusterCapacity());
- Assert.assertEquals(-1, ((TotalWorkerCapacityResponse)
response.getEntity()).getMaximumCapacityWithAutoScale());
- }
-
- @Test
- public void
testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategySupportExpectedWorkerCapacity()
+ public void testGetTotalWorkerCapacityWithMaximumCapacity()
{
int expectedWorkerCapacity = 3;
- int maxNumWorkers = 2;
- WorkerTaskRunner workerTaskRunner =
EasyMock.createMock(WorkerTaskRunner.class);
- Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
- new ImmutableWorkerInfo(
- new Worker(
- "http", "testWorker", "192.0.0.1", expectedWorkerCapacity,
"v1", WorkerConfig.DEFAULT_CATEGORY
- ),
- 2,
- ImmutableSet.of("grp1", "grp2"),
- ImmutableSet.of("task1", "task2"),
- DateTimes.of("2015-01-01T01:01:01Z")
- )
- );
- EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
-
EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity);
- EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(0);
-
- EasyMock.reset(taskMaster);
- EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
- Optional.of(workerTaskRunner)
- ).anyTimes();
-
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(expectedWorkerCapacity).anyTimes();
- AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class);
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
- EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
- DefaultWorkerBehaviorConfig workerBehaviorConfig = new
DefaultWorkerBehaviorConfig(null, autoScaler);
- AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
= new AtomicReference<>(workerBehaviorConfig);
- EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
- EasyMock.replay(
- workerTaskRunner,
- autoScaler
- );
+ int expectedWorkerCapacityWithAutoscale = 10;
+ WorkerBehaviorConfig workerBehaviorConfig =
EasyMock.createMock(WorkerBehaviorConfig.class);
+ AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
+ = new AtomicReference<>(workerBehaviorConfig);
+ EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class))
+ .andReturn(workerBehaviorConfigAtomicReference);
+
EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity);
+
EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(expectedWorkerCapacity);
+
EasyMock.expect(taskRunner.getMaximumCapacityWithAutoscale()).andReturn(expectedWorkerCapacityWithAutoscale);
EasyMock.expect(overlord.isLeader()).andReturn(true);
replayAll();
- final Response response = overlordResource.getTotalWorkerCapacity();
- Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
- Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse)
response.getEntity()).getCurrentClusterCapacity());
- Assert.assertEquals(0, ((TotalWorkerCapacityResponse)
response.getEntity()).getUsedClusterCapacity());
- Assert.assertEquals(expectedWorkerCapacity * maxNumWorkers,
((TotalWorkerCapacityResponse)
response.getEntity()).getMaximumCapacityWithAutoScale());
- }
- @Test
- public void
testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategyNotSupportExpectedWorkerCapacity()
- {
- int invalidExpectedCapacity = -1;
- int currentTotalCapacity = 3;
- int currentCapacityUsed = 2;
- int maxNumWorkers = 2;
- WorkerTaskRunner workerTaskRunner =
EasyMock.createMock(WorkerTaskRunner.class);
- Collection<ImmutableWorkerInfo> workerInfos = ImmutableList.of(
- new ImmutableWorkerInfo(
- new Worker(
- "http", "testWorker", "192.0.0.1", currentTotalCapacity, "v1",
WorkerConfig.DEFAULT_CATEGORY
- ),
- currentCapacityUsed,
- ImmutableSet.of("grp1", "grp2"),
- ImmutableSet.of("task1", "task2"),
- DateTimes.of("2015-01-01T01:01:01Z")
- )
- );
- EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos);
-
EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(currentTotalCapacity);
-
EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(currentCapacityUsed);
- EasyMock.reset(taskMaster);
- EasyMock.expect(taskMaster.getTaskRunner()).andReturn(
- Optional.of(workerTaskRunner)
- ).anyTimes();
-
EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(invalidExpectedCapacity).anyTimes();
- AutoScaler<?> autoScaler = EasyMock.createMock(AutoScaler.class);
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
- EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers);
- DefaultWorkerBehaviorConfig workerBehaviorConfig = new
DefaultWorkerBehaviorConfig(null, autoScaler);
- AtomicReference<WorkerBehaviorConfig> workerBehaviorConfigAtomicReference
= new AtomicReference<>(workerBehaviorConfig);
- EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference);
- EasyMock.replay(
- workerTaskRunner,
- autoScaler
- );
- EasyMock.expect(overlord.isLeader()).andReturn(true);
- replayAll();
final Response response = overlordResource.getTotalWorkerCapacity();
Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus());
-
Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(),
((TotalWorkerCapacityResponse)
response.getEntity()).getCurrentClusterCapacity());
- Assert.assertEquals(invalidExpectedCapacity,
((TotalWorkerCapacityResponse)
response.getEntity()).getMaximumCapacityWithAutoScale());
- Assert.assertEquals(currentTotalCapacity, ((TotalWorkerCapacityResponse)
response.getEntity()).getCurrentClusterCapacity());
- Assert.assertEquals(currentCapacityUsed, ((TotalWorkerCapacityResponse)
response.getEntity()).getUsedClusterCapacity());
+ Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse)
response.getEntity()).getCurrentClusterCapacity());
+ Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse)
response.getEntity()).getUsedClusterCapacity());
+ Assert.assertEquals(expectedWorkerCapacityWithAutoscale,
((TotalWorkerCapacityResponse)
response.getEntity()).getMaximumCapacityWithAutoScale());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]