This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new aec5c4a4b65 Fix duplicate actions in auto-scaler history (#18715)
aec5c4a4b65 is described below
commit aec5c4a4b658a010d4fc9b3c50046ebc9750dea6
Author: Andreas Maechler <[email protected]>
AuthorDate: Fri Nov 7 22:30:40 2025 -0700
Fix duplicate actions in auto-scaler history (#18715)
Changes:
- Do not clear `pendingCompletionTaskGroups` in `clearAllocationInfo`
- Add unit test
---
.../kafka/supervisor/KafkaSupervisorTest.java | 216 +++++++++++++++++++++
.../supervisor/SeekableStreamSupervisor.java | 10 +-
2 files changed, 224 insertions(+), 2 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index f3f01cc5071..f6a026e6aa7 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -71,6 +71,7 @@ import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningCon
import
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
@@ -122,6 +123,7 @@ import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
@@ -134,6 +136,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
@@ -5047,6 +5050,201 @@ public class KafkaSupervisorTest extends EasyMockSupport
EasyMock.replay(differentTaskType);
}
+ @Test
+ public void
test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublishing()
throws Exception
+ {
+ final TaskLocation location1 = TaskLocation.create("testHost1", 1234, -1);
+ final TaskLocation location2 = TaskLocation.create("testHost2", 1235, -1);
+ final TaskLocation location3 = TaskLocation.create("testHost3", 1236, -1);
+ final DateTime startTime = DateTimes.nowUtc();
+
+ // Create supervisor with 3 task groups
+ supervisor = getTestableSupervisor(1, 3, true, "PT1H", null, null);
+ final KafkaSupervisorTuningConfig tuningConfig =
supervisor.getTuningConfig();
+ addSomeEvents(100);
+
+ // Manually create 3 tasks (one for each partition/task group)
+ Task task1 = createKafkaIndexTask(
+ "task1",
+ DATASOURCE,
+ 0,
+ new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 0, 0L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 0, Long.MAX_VALUE)
+ ),
+ null,
+ null,
+ tuningConfig
+ );
+
+ Task task2 = createKafkaIndexTask(
+ "task2",
+ DATASOURCE,
+ 1,
+ new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 1, 0L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 1, Long.MAX_VALUE)
+ ),
+ null,
+ null,
+ tuningConfig
+ );
+
+ Task task3 = createKafkaIndexTask(
+ "task3",
+ DATASOURCE,
+ 2,
+ new SeekableStreamStartSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 2, 0L),
+ ImmutableSet.of()
+ ),
+ new SeekableStreamEndSequenceNumbers<>(
+ topic,
+ singlePartitionMap(topic, 2, Long.MAX_VALUE)
+ ),
+ null,
+ null,
+ tuningConfig
+ );
+
+ Collection workItems = new ArrayList<>();
+ workItems.add(new TestTaskRunnerWorkItem(task1, null, location1));
+ workItems.add(new TestTaskRunnerWorkItem(task2, null, location2));
+ workItems.add(new TestTaskRunnerWorkItem(task3, null, location3));
+
+ // Setup mocks for initial discovery of tasks
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(task1.getId())).andReturn(location1).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(task2.getId())).andReturn(location2).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(task3.getId())).andReturn(location3).anyTimes();
+
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(toMap(task1,
task2, task3)).anyTimes();
+
EasyMock.expect(taskStorage.getStatus(task1.getId())).andReturn(Optional.of(TaskStatus.running(task1.getId()))).anyTimes();
+
EasyMock.expect(taskStorage.getStatus(task2.getId())).andReturn(Optional.of(TaskStatus.running(task2.getId()))).anyTimes();
+
EasyMock.expect(taskStorage.getStatus(task3.getId())).andReturn(Optional.of(TaskStatus.running(task3.getId()))).anyTimes();
+
EasyMock.expect(taskStorage.getTask(task1.getId())).andReturn(Optional.of(task1)).anyTimes();
+
EasyMock.expect(taskStorage.getTask(task2.getId())).andReturn(Optional.of(task2)).anyTimes();
+
EasyMock.expect(taskStorage.getTask(task3.getId())).andReturn(Optional.of(task3)).anyTimes();
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new
KafkaDataSourceMetadata(null)).anyTimes();
+
EasyMock.expect(taskClient.getStatusAsync(task1.getId())).andReturn(Futures.immediateFuture(Status.READING));
+
EasyMock.expect(taskClient.getStatusAsync(task2.getId())).andReturn(Futures.immediateFuture(Status.READING));
+
EasyMock.expect(taskClient.getStatusAsync(task3.getId())).andReturn(Futures.immediateFuture(Status.READING));
+
EasyMock.expect(taskClient.getStartTimeAsync(task1.getId())).andReturn(Futures.immediateFuture(startTime));
+
EasyMock.expect(taskClient.getStartTimeAsync(task2.getId())).andReturn(Futures.immediateFuture(startTime));
+
EasyMock.expect(taskClient.getStartTimeAsync(task3.getId())).andReturn(Futures.immediateFuture(startTime));
+
+ TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints1 = new
TreeMap<>();
+ checkpoints1.put(0, singlePartitionMap(topic, 0, 0L));
+ TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints2 = new
TreeMap<>();
+ checkpoints2.put(0, singlePartitionMap(topic, 1, 0L));
+ TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints3 = new
TreeMap<>();
+ checkpoints3.put(0, singlePartitionMap(topic, 2, 0L));
+
+ EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task1.getId()),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints1))
+ .times(1);
+ EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task2.getId()),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints2))
+ .times(1);
+ EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.eq(task3.getId()),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(checkpoints3))
+ .times(1);
+
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
+ replayAll();
+
+ // Start supervisor and discover the 3 existing tasks
+ supervisor.start();
+ supervisor.runInternal();
+ verifyAll();
+
+ // Verify we have 3 actively reading task groups after discovery
+ Assert.assertEquals(
+ "Should have 3 actively reading task groups after discovery",
+ 3,
+ supervisor.getActivelyReadingTaskGroupsCount()
+ );
+
+ // Reset and setup mocks for gracefulShutdownInternal
+ EasyMock.reset(taskRunner, taskClient, taskQueue);
+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(task1.getId())).andReturn(location1).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(task2.getId())).andReturn(location2).anyTimes();
+
EasyMock.expect(taskRunner.getTaskLocation(task3.getId())).andReturn(location3).anyTimes();
+
EasyMock.expect(taskClient.pauseAsync(task1.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic,
0, 100L)));
+
EasyMock.expect(taskClient.pauseAsync(task2.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic,
1, 100L)));
+
EasyMock.expect(taskClient.pauseAsync(task3.getId())).andReturn(Futures.immediateFuture(singlePartitionMap(topic,
2, 100L)));
+ EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task1.getId()),
EasyMock.anyObject(),
EasyMock.eq(true))).andReturn(Futures.immediateFuture(true));
+ EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task2.getId()),
EasyMock.anyObject(),
EasyMock.eq(true))).andReturn(Futures.immediateFuture(true));
+ EasyMock.expect(taskClient.setEndOffsetsAsync(EasyMock.eq(task3.getId()),
EasyMock.anyObject(),
EasyMock.eq(true))).andReturn(Futures.immediateFuture(true));
+
+ EasyMock.replay(taskRunner, taskClient, taskQueue);
+
+ // Simulate autoscaler scale-down by calling gracefulShutdownInternal()
+ // This should move tasks from activelyReadingTaskGroups to
pendingCompletionTaskGroups
+ supervisor.gracefulShutdownInternal();
+
+ verifyAll();
+
+ // After gracefulShutdownInternal, tasks should be moved to
pendingCompletionTaskGroups
+ Assert.assertEquals(
+ "activelyReadingTaskGroups should be empty after
gracefulShutdownInternal",
+ 0,
+ supervisor.getActivelyReadingTaskGroupsCount()
+ );
+
+ // Verify pendingCompletionTaskGroups is NOT empty (tasks were moved there)
+ boolean hasPendingTasks = false;
+ for (int groupId = 0; groupId < 3; groupId++) {
+ if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) {
+ hasPendingTasks = true;
+ break;
+ }
+ }
+ Assert.assertTrue(
+ "pendingCompletionTaskGroups should contain task groups after
gracefulShutdownInternal",
+ hasPendingTasks
+ );
+
+ // Now call clearAllocationInfo() - this is where the bug was
+ // The bug was that this method cleared pendingCompletionTaskGroups
+ supervisor.testClearAllocationInfo();
+
+ // THE KEY ASSERTION: Verify pendingCompletionTaskGroups is still NOT
empty after clearAllocationInfo
+ // This is the fix - clearAllocationInfo should preserve
pendingCompletionTaskGroups
+ boolean stillHasPendingTasks = false;
+ for (int groupId = 0; groupId < 3; groupId++) {
+ if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) {
+ stillHasPendingTasks = true;
+ break;
+ }
+ }
+ Assert.assertTrue(
+ "pendingCompletionTaskGroups should be preserved after
clearAllocationInfo() " +
+ "to prevent autoscaler from creating duplicate history entries",
+ stillHasPendingTasks
+ );
+
+ // Verify activelyReadingTaskGroups is still empty
+ Assert.assertEquals(
+ "activelyReadingTaskGroups should remain empty after
clearAllocationInfo",
+ 0,
+ supervisor.getActivelyReadingTaskGroupsCount()
+ );
+ }
+
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
// create topic manually
@@ -5753,6 +5951,24 @@ public class KafkaSupervisorTest extends EasyMockSupport
{
return stateManager;
}
+
+ public int getActivelyReadingTaskGroupsCount()
+ {
+ return getActiveTaskGroupsCount();
+ }
+
+ public int getPendingCompletionTaskGroupsCount(int groupId)
+ {
+ CopyOnWriteArrayList<?> groups = getPendingCompletionTaskGroups(groupId);
+ return groups != null ? groups.size() : 0;
+ }
+
+ public void testClearAllocationInfo() throws Exception
+ {
+ Method method =
SeekableStreamSupervisor.class.getDeclaredMethod("clearAllocationInfo");
+ method.setAccessible(true);
+ method.invoke(this);
+ }
}
private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends
TestableKafkaSupervisor
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 0faa0b84134..ef65d6d22ca 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -608,13 +608,19 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
+ /**
+ * Clears allocation information including active task groups, partition
groups, partition offsets, and partition IDs.
+ * <p>
+ * Note: Does not clear {@link #pendingCompletionTaskGroups} so that the
supervisor remembers that these
+ * tasks are publishing and auto-scaler does not repeatedly attempt a scale
down until these tasks
+ * complete. If this is cleared, the next {@link #discoverTasks()} might add
these tasks to
+ * {@link #activelyReadingTaskGroups}.
+ */
private void clearAllocationInfo()
{
activelyReadingTaskGroups.clear();
partitionGroups.clear();
partitionOffsets.clear();
-
- pendingCompletionTaskGroups.clear();
partitionIds.clear();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]