This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c57bd3b4388 supervisor/autoscaler: Skip scaling when partitions are
less than minTaskCount (#17335)
c57bd3b4388 is described below
commit c57bd3b4388cad4ec0087d0951b07648ea378cab
Author: Adithya Chakilam <[email protected]>
AuthorDate: Tue Oct 15 16:12:53 2024 -0500
supervisor/autoscaler: Skip scaling when partitions are less than
minTaskCount (#17335)
---
.../supervisor/autoscaler/LagBasedAutoScaler.java | 16 +++----
.../SeekableStreamSupervisorSpecTest.java | 52 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 8 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index ec81c5f9f99..22e36841199 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -216,17 +216,16 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
int desiredActiveTaskCount;
+ int partitionCount = supervisor.getPartitionCount();
+ if (partitionCount <= 0) {
+ log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+ return -1;
+ }
if (beyondProportion >=
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
// Do Scale out
int taskCount = currentActiveTaskCount +
lagBasedAutoScalerConfig.getScaleOutStep();
- int partitionCount = supervisor.getPartitionCount();
- if (partitionCount <= 0) {
- log.warn("Partition number for [%s] <= 0 ? how can it be?",
dataSource);
- return -1;
- }
-
int actualTaskCountMax =
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
if (currentActiveTaskCount == actualTaskCountMax) {
log.warn("CurrentActiveTaskCount reached task count Max limit,
skipping scale out action for dataSource [%s].",
@@ -248,7 +247,8 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
if (withinProportion >=
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
// Do Scale in
int taskCount = currentActiveTaskCount -
lagBasedAutoScalerConfig.getScaleInStep();
- if (currentActiveTaskCount ==
lagBasedAutoScalerConfig.getTaskCountMin()) {
+ int actualTaskCountMin =
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
+ if (currentActiveTaskCount == actualTaskCountMin) {
log.warn("CurrentActiveTaskCount reached task count Min limit,
skipping scale in action for dataSource [%s].",
dataSource
);
@@ -260,7 +260,7 @@ public class LagBasedAutoScaler implements
SupervisorTaskAutoScaler
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC,
taskCount));
return -1;
} else {
- desiredActiveTaskCount = Math.max(taskCount,
lagBasedAutoScalerConfig.getTaskCountMin());
+ desiredActiveTaskCount = Math.max(taskCount, actualTaskCountMin);
}
return desiredActiveTaskCount;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index baff5fc765b..3281360f580 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -992,6 +992,58 @@ public class SeekableStreamSupervisorSpecTest extends
EasyMockSupport
autoScaler.stop();
}
+ @Test
+ public void
testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanPartitions()
throws InterruptedException
+ {
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2,
false)).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+ EasyMock.replay(spec);
+
+
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
+ EasyMock.replay(ingestionSchema);
+
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
+ EasyMock.replay(taskMaster);
+
+ TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(10);
+ Map<String, Object> modifiedScaleInProps = getScaleInProperties();
+
+ modifiedScaleInProps.put("taskCountMax", 20);
+ modifiedScaleInProps.put("taskCountMin", 15);
+
+ LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
+ supervisor,
+ DATASOURCE,
+ mapper.convertValue(
+ modifiedScaleInProps,
+ LagBasedAutoScalerConfig.class
+ ),
+ spec,
+ emitter
+ );
+
+ // enable autoscaler so that taskcount config will be ignored and init
value of taskCount will use taskCountMin.
+ Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
+ supervisor.getIoConfig().setTaskCount(2);
+ supervisor.start();
+ autoScaler.start();
+ supervisor.runInternal();
+
+ Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount());
+ Thread.sleep(2000);
+ Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount());
+
+ autoScaler.reset();
+ autoScaler.stop();
+ }
+
@Test
public void testSeekableStreamSupervisorSpecWithScaleInAlreadyAtMin() throws
InterruptedException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]