This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new dba356c6206 Add log for new task count computation in 
`CostBasedAutoScaler` (#18929)
dba356c6206 is described below

commit dba356c62061f7c1d61ff3163ac7283a97005214
Author: Sasha Syrotenko <[email protected]>
AuthorDate: Tue Jan 20 11:02:36 2026 +0200

    Add log for new task count computation in `CostBasedAutoScaler` (#18929)
    
    * Add intemediate log for new task count computation in CostBasedAutoScaler
    
    * Fix incorrect test in rollover test
    
    * Review
    
    * Final self-review, more logs
---
 .../supervisor/autoscaler/CostBasedAutoScaler.java |   9 +-
 ...treamSupervisorScaleDuringTaskRolloverTest.java |   5 +-
 .../autoscaler/CostBasedAutoScalerMockTest.java    | 284 +++++++++++++++++++++
 3 files changed, 295 insertions(+), 3 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
index 14e3ca2cf5e..598a56e41d1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
@@ -139,7 +139,14 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
     final int currentTaskCount = lastKnownMetrics.getCurrentTaskCount();
 
     // Perform only scale-up actions
-    return optimalTaskCount >= currentTaskCount ? optimalTaskCount : -1;
+    int taskCount = -1;
+    if (optimalTaskCount > currentTaskCount) {
+      taskCount = optimalTaskCount;
+      log.info("New task count [%d] on supervisor [%s]", taskCount, 
supervisorId);
+    } else {
+      log.info("No scaling required for supervisor [%s]", supervisorId);
+    }
+    return taskCount;
   }
 
   public CostBasedAutoScalerConfig getConfig()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
index 3b9ad016244..2c4275a99ec 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
@@ -48,7 +48,7 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
   public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale()
   {
     // Given
-    setupSpecExpectations(createIOConfig(5, null));
+    setupSpecExpectations(createIOConfig(DEFAULT_TASK_COUNT, null));
     
EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
     EasyMock.replay(spec);
 
@@ -111,7 +111,7 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
     supervisor.start();
     supervisor.createAutoscaler(spec);
 
-    Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
+    Assert.assertEquals(DEFAULT_TASK_COUNT, (int) 
supervisor.getIoConfig().getTaskCount());
 
     // When
     supervisor.maybeScaleDuringTaskRollover();
@@ -203,6 +203,7 @@ public class 
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
     return CostBasedAutoScalerConfig.builder()
                                     .taskCountMax(100)
                                     .taskCountMin(1)
+                                    .taskCountStart(DEFAULT_TASK_COUNT)
                                     .enableTaskAutoScaler(true)
                                     .lagWeight(0.25)
                                     .idleWeight(0.75)
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
new file mode 100644
index 00000000000..4147a50163d
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerMockTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class CostBasedAutoScalerMockTest
+{
+  private static final String SUPERVISOR_ID = "test-supervisor";
+  private static final String STREAM_NAME = "test-stream";
+  private static final int PARTITION_COUNT = 100;
+  private static final long TASK_DURATION_SECONDS = 3600;
+  private static final double AVG_PROCESSING_RATE = 1000.0;
+
+  private SupervisorSpec mockSpec;
+  private SeekableStreamSupervisor mockSupervisor;
+  private ServiceEmitter mockEmitter;
+  private SeekableStreamSupervisorIOConfig mockIoConfig;
+  private CostBasedAutoScalerConfig config;
+
+  @Before
+  public void setUp()
+  {
+    mockSpec = Mockito.mock(SupervisorSpec.class);
+    mockSupervisor = Mockito.mock(SeekableStreamSupervisor.class);
+    mockEmitter = Mockito.mock(ServiceEmitter.class);
+    mockIoConfig = Mockito.mock(SeekableStreamSupervisorIOConfig.class);
+
+    when(mockSpec.getId()).thenReturn(SUPERVISOR_ID);
+    when(mockSpec.isSuspended()).thenReturn(false);
+    when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig);
+    when(mockIoConfig.getStream()).thenReturn(STREAM_NAME);
+    
when(mockIoConfig.getTaskDuration()).thenReturn(Duration.standardSeconds(TASK_DURATION_SECONDS));
+
+    config = CostBasedAutoScalerConfig.builder()
+                                      .taskCountMax(100)
+                                      .taskCountMin(1)
+                                      .enableTaskAutoScaler(true)
+                                      .lagWeight(0.6)
+                                      .idleWeight(0.4)
+                                      .build();
+  }
+
+  @Test
+  public void testScaleUpWhenOptimalGreaterThanCurrent()
+  {
+    // Setup: current = 10, optimal should be higher due to high lag and low 
idle
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+
+    int currentTaskCount = 10;
+    int expectedOptimalCount = 17; // Higher than current
+
+    CostMetrics metrics = createMetrics(5000.0, currentTaskCount, 
PARTITION_COUNT, 0.1);
+
+    
doReturn(expectedOptimalCount).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 5000.0, 0.1);
+
+    int result = autoScaler.computeTaskCountForScaleAction();
+
+    Assert.assertEquals(
+        "Should return optimal count when it's greater than current 
(scale-up)",
+        expectedOptimalCount,
+        result
+    );
+  }
+
+  @Test
+  public void testNoOpWhenOptimalEqualsCurrent()
+  {
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+
+    int currentTaskCount = 25;
+    int optimalCount = 25; // Same as current
+
+    doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 100.0, 0.5);
+
+    int result = autoScaler.computeTaskCountForScaleAction();
+
+    Assert.assertEquals("Should return -1 when it equals current (no change 
needed)", -1, result);
+  }
+
+  @Test
+  public void testScaleDownBlockedReturnsMinusOne()
+  {
+    // Scale-down is blocked in computeTaskCountForScaleAction
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+
+    int currentTaskCount = 50;
+    int optimalCount = 30; // Lower than current (scale-down scenario)
+
+    doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.9);
+
+    int result = autoScaler.computeTaskCountForScaleAction();
+
+    Assert.assertEquals(
+        "Should return -1 when optimal is less than current (scale-down 
blocked)",
+        -1,
+        result
+    );
+  }
+
+  @Test
+  public void testReturnsMinusOneWhenMetricsCollectionFails()
+  {
+    // When the supervisor is suspended, collectMetrics returns null which 
causes
+    // computeOptimalTaskCount to return -1
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+
+    int currentTaskCount = 10;
+
+    // Mock computeOptimalTaskCount to return -1 (simulating null metrics 
scenario)
+    doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 100.0, 0.5);
+
+    int result = autoScaler.computeTaskCountForScaleAction();
+
+    Assert.assertEquals(
+        "Should return -1 when computeOptimalTaskCount returns -1 (e.g., due 
to invalid metrics)",
+        -1,
+        result
+    );
+  }
+
+  @Test
+  public void testReturnsMinusOneWhenLagStatsNull()
+  {
+    // When lag stats are null, computeOptimalTaskCount returns -1
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+
+    int currentTaskCount = 10;
+
+    // Mock computeOptimalTaskCount to return -1 (simulating null lag stats 
scenario)
+    doReturn(-1).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 100.0, 0.5);
+
+    int result = autoScaler.computeTaskCountForScaleAction();
+
+    Assert.assertEquals(
+        "Should return -1 when computeOptimalTaskCount returns -1 (e.g., due 
to null lag stats)",
+        -1,
+        result
+    );
+  }
+
+  @Test
+  public void testScaleUpFromMinimumTasks()
+  {
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+
+    int currentTaskCount = 1;
+    int expectedOptimalCount = 5;
+
+    
doReturn(expectedOptimalCount).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 10000.0, 0.0);
+
+    int result = autoScaler.computeTaskCountForScaleAction();
+
+    Assert.assertEquals(
+        "Should allow scale-up from the minimum task count",
+        expectedOptimalCount,
+        result
+    );
+  }
+
+  @Test
+  public void testScaleUpToMaximumTasks()
+  {
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+
+    int currentTaskCount = 90;
+    int expectedOptimalCount = 100; // Maximum allowed
+
+    
doReturn(expectedOptimalCount).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 50000.0, 0.0);
+
+    int result = autoScaler.computeTaskCountForScaleAction();
+
+    Assert.assertEquals(
+        "Should allow scale-up to maximum task count",
+        expectedOptimalCount,
+        result
+    );
+  }
+
+  @Test
+  public void testBoundaryConditionOptimalEqualsCurrentPlusOne()
+  {
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+
+    int currentTaskCount = 25;
+    int expectedOptimalCount = 26; // Just one more than current
+
+    
doReturn(expectedOptimalCount).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 1000.0, 0.2);
+
+    int result = autoScaler.computeTaskCountForScaleAction();
+
+    Assert.assertEquals(
+        "Should allow scale-up by exactly one task",
+        expectedOptimalCount,
+        result
+    );
+  }
+
+  @Test
+  public void testBoundaryConditionOptimalEqualsCurrentMinusOne()
+  {
+    CostBasedAutoScaler autoScaler = spy(new 
CostBasedAutoScaler(mockSupervisor, config, mockSpec, mockEmitter));
+
+    int currentTaskCount = 25;
+    int optimalCount = 24; // Just one less than current
+
+    doReturn(optimalCount).when(autoScaler).computeOptimalTaskCount(any());
+    setupMocksForMetricsCollection(currentTaskCount, 10.0, 0.8);
+
+    int result = autoScaler.computeTaskCountForScaleAction();
+
+    Assert.assertEquals(
+        "Should block scale-down even by one task",
+        -1,
+        result
+    );
+  }
+
+  private void setupMocksForMetricsCollection(int taskCount, double avgLag, 
double pollIdleRatio)
+  {
+    when(mockSupervisor.computeLagStats()).thenReturn(new LagStats(0, (long) 
avgLag * 2, (long) avgLag));
+    when(mockIoConfig.getTaskCount()).thenReturn(taskCount);
+    when(mockSupervisor.getPartitionCount()).thenReturn(PARTITION_COUNT);
+    when(mockSupervisor.getStats()).thenReturn(Collections.emptyMap());
+  }
+
+  private CostMetrics createMetrics(
+      double avgPartitionLag,
+      int currentTaskCount,
+      int partitionCount,
+      double pollIdleRatio
+  )
+  {
+    return new CostMetrics(
+        avgPartitionLag,
+        currentTaskCount,
+        partitionCount,
+        pollIdleRatio,
+        TASK_DURATION_SECONDS,
+        AVG_PROCESSING_RATE
+    );
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to