This is an automated email from the ASF dual-hosted git repository.
abhishekrb19 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d836a8f6bde fix: Fix unhealthy supervisor when a priority-assigned
task dies before discovery (#19405)
d836a8f6bde is described below
commit d836a8f6bdec2a51997acddd16ebab7a2c61588d
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon May 4 23:24:24 2026 -0400
fix: Fix unhealthy supervisor when a priority-assigned task dies before
discovery (#19405)
With #19040, when ioConfig.serverPriorityToReplicas is set, a supervisor
can get stuck throwing
DruidException from computeUnassignedServerPriorities, preventing any new
task replicas with destined replicas from being created until the old tasks
rollover. The
exception is as follows:
Found unassignedServerPriorities[[]] of size[0] < total replicas[1] for
taskGroupId[0].
Task server priorities[[1, 0]] have already been assigned to tasks[[foo]].
The supervisor can remain in this unhealthy state unable to create
additional task replicas until tasks eventually rollover. This can happen when
a task spuriously fails after it's created but before it's discovered in the
runInternal() loop.
This patch removes the additional writer, leaving discoverTasks and
removeTask
as the sole mutators. group.taskIdToServerPriority should now stay in sync
with group.tasks.
The added unit tests fail on master without the fix.
---
.../kafka/supervisor/KafkaSupervisorTest.java | 78 ++++++++++++++
.../supervisor/SeekableStreamSupervisor.java | 8 --
.../SeekableStreamSupervisorStateTest.java | 116 +++++++++++++++++++++
3 files changed, 194 insertions(+), 8 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index b5e00bcaab4..ddd4e6053a8 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -1726,6 +1726,84 @@ public class KafkaSupervisorTest extends EasyMockSupport
Assert.assertEquals(Integer.valueOf(1), retriedTask.getServerPriority());
}
+ /**
+ * Regression test: {@link SeekableStreamSupervisor.TaskGroup#tasks} and
{@link SeekableStreamSupervisor.TaskGroup#taskIdToServerPriority}
+ * must not go out of sync when a newly-submitted task dies before the next
supervisor run observes it. Otherwise, the orphan priority entry
+ * makes {@link SeekableStreamSupervisor#computeUnassignedServerPriorities}
throw on the replacement attempt.
+ */
+ @Test
+ public void testReplacementSubmittedWhenPriorityTaskDiesBeforeDiscovery()
throws Exception
+ {
+ // replicas=2, taskCount=1, priorities {0:1, 1:1}
+ supervisor = getTestableSupervisor(null, 2, 1, true, true, "PT1H", null,
null, false, kafkaHost, null, Map.of(0, 1, 1, 1));
+ addSomeEvents(1);
+
+ Capture<Task> captured = Capture.newInstance(CaptureType.ALL);
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.emptyList()).anyTimes();
+
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes();
+ EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+ .anyTimes();
+ EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+ .anyTimes();
+
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
+ new KafkaDataSourceMetadata(null)
+ ).anyTimes();
+ // Run 1 submits the initial 2 replicas. (Run 2's 1 replacement is
programmed after the reset below.)
+
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
+
+ // discoverTasks() on run 2 calls verifyAndMergeCheckpoints ->
taskClient.getCheckpointsAsync on the survivor.
+ EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.anyString(),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(new TreeMap<>()))
+ .anyTimes();
+
+ taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class),
EasyMock.anyObject(Executor.class));
+ replayAll();
+
+ supervisor.start();
+ supervisor.runInternal();
+ verifyAll();
+
+ final List<Task> run1Tasks = captured.getValues();
+ Assert.assertEquals(2, run1Tasks.size());
+ final KafkaIndexTask orphan = (KafkaIndexTask) run1Tasks.get(0);
+ final KafkaIndexTask survivor = (KafkaIndexTask) run1Tasks.get(1);
+ // Sanity check: the two replicas must hold the two configured priorities.
+ Assert.assertEquals(
+ Set.of(0, 1),
+ Set.of(orphan.getServerPriority(), survivor.getServerPriority())
+ );
+
+ // Run 2: the first replica died before discoverTasks() could observe it,
so activeTaskMap contains only
+ // the survivor. Prior to the fix, createTasksForGroup had already written
the orphan's priority into
+ // taskIdToServerPriority during run 1, so run 2's
computeUnassignedServerPriorities would see both
+ // priorities as "assigned" and throw.
+ EasyMock.reset(taskStorage);
+ EasyMock.reset(taskQueue);
+ EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE))
+ .andReturn(toMap(List.of(survivor)))
+ .anyTimes();
+ EasyMock.expect(taskStorage.getStatus(survivor.getId()))
+ .andReturn(Optional.of(TaskStatus.running(survivor.getId())))
+ .anyTimes();
+
EasyMock.expect(taskStorage.getTask(survivor.getId())).andReturn(Optional.of(survivor)).anyTimes();
+ final Capture<Task> replacementCapture = Capture.newInstance();
+
EasyMock.expect(taskQueue.add(EasyMock.capture(replacementCapture))).andReturn(true);
+ EasyMock.replay(taskStorage);
+ EasyMock.replay(taskQueue);
+
+ // Must not throw.
+ supervisor.runInternal();
+ verifyAll();
+
+ // The replacement task should take the orphan's priority, not duplicate
the survivor's or block.
+ final KafkaIndexTask replacement = (KafkaIndexTask)
replacementCapture.getValue();
+ Assert.assertEquals(orphan.getServerPriority(),
replacement.getServerPriority());
+ }
+
@Test
public void testRequeueAdoptedTaskWhenFailed() throws Exception
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index f857cfaa436..395b5da90ce 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -4417,14 +4417,6 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
for (SeekableStreamIndexTask indexTask : taskList) {
- final String taskId = indexTask.getId();
- final Integer serverPriority = indexTask.getServerPriority();
-
- if (serverPriority != null) {
- log.info("Adding serverPriority[%d] for task[%s] and groupId[%s]",
serverPriority, taskId, groupId);
- group.taskIdToServerPriority.put(taskId, serverPriority);
- }
-
Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
try {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 5f6986551a7..cb2f2b451c1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -3596,6 +3596,122 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
verifyAll();
}
+ /**
+ * Regression test: {@link SeekableStreamSupervisor.TaskGroup#tasks} and
{@link SeekableStreamSupervisor.TaskGroup#taskIdToServerPriority}
+ * must not go out of sync when a newly-submitted task dies before the next
supervisor run observes it. Otherwise, the orphan priority entry
+ * makes {@link SeekableStreamSupervisor#computeUnassignedServerPriorities}
throw on the replacement attempt.
+ */
+ @Test
+ public void testReplacementSubmittedWhenPriorityTaskDiesBeforeDiscovery()
+ {
+ // replicas=2, taskCount=1, priorities {0:1, 1:1}
+ final SeekableStreamSupervisorIOConfig ioConfig =
createSupervisorIOConfig(1, Map.of(0, 1, 1, 1));
+
+ Assert.assertEquals(2, (int) ioConfig.getReplicas());
+
+ EasyMock.reset(spec);
+ EasyMock.expect(spec.getId()).andReturn(SUPERVISOR_ID).anyTimes();
+
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+ EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+ EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
+ EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+
+
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+
+ // Run 1 starts with no active tasks in the overlord. After runInternal()
we reset & replay the mock
+ // below to simulate run 2, where one replica is missing from
activeTaskMap.
+ EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE))
+ .andReturn(Map.of())
+ .anyTimes();
+
+ EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.anyString(),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(new TreeMap<>()))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.getStatusAsync(EasyMock.anyString()))
+
.andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
+ .anyTimes();
+ EasyMock.expect(indexTaskClient.getStartTimeAsync(EasyMock.anyString()))
+ .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+ .anyTimes();
+
EasyMock.expect(indexTaskClient.getCurrentOffsetsAsync(EasyMock.anyString(),
EasyMock.anyBoolean()))
+ .andReturn(Futures.immediateFuture(ImmutableMap.of(SHARD_ID, "5")))
+ .anyTimes();
+
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(ImmutableList.of()).anyTimes();
+
+ final Capture<Task> submittedTasks = Capture.newInstance(CaptureType.ALL);
+
EasyMock.expect(taskQueue.add(EasyMock.capture(submittedTasks))).andReturn(true).anyTimes();
+
+ replayAll();
+
+ // Custom supervisor that honors serverPrioritiesToAssign when creating
tasks so run 1 produces
+ // two replicas carrying priorities
+ final TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor()
+ {
+ @Override
+ protected List<SeekableStreamIndexTask<String, String, ByteEntity>>
createIndexTasks(
+ int replicas,
+ String baseSequenceName,
+ ObjectMapper sortingMapper,
+ TreeMap<Integer, Map<String, String>> sequenceOffsets,
+ SeekableStreamIndexTaskIOConfig taskIoConfig,
+ SeekableStreamIndexTaskTuningConfig taskTuningConfig,
+ RowIngestionMetersFactory rowIngestionMetersFactory,
+ @Nullable List<Integer> serverPrioritiesToAssign
+ )
+ {
+ final List<SeekableStreamIndexTask<String, String, ByteEntity>> tasks
= new ArrayList<>();
+ for (int i = 0; i < replicas; i++) {
+ final Integer priority = serverPrioritiesToAssign == null ? null :
serverPrioritiesToAssign.get(i);
+ tasks.add(createTestTask(
+ baseSequenceName + "_replica_" + i + "_p" + priority,
+ "0",
+ priority,
+ taskIoConfig,
+ recordSupplier
+ ));
+ }
+ return tasks;
+ }
+ };
+
+ supervisor.start();
+ supervisor.runInternal();
+
+ // Run 1 should have submitted 2 replicas.
+ final List<Task> run1Tasks = submittedTasks.getValues();
+ Assert.assertEquals(2, run1Tasks.size());
+ final TestSeekableStreamIndexTask orphan = (TestSeekableStreamIndexTask)
run1Tasks.get(0);
+ final TestSeekableStreamIndexTask survivor = (TestSeekableStreamIndexTask)
run1Tasks.get(1);
+ Assert.assertEquals(
+ Set.of(0, 1),
+ Set.of(orphan.getServerPriority(), survivor.getServerPriority())
+ );
+
+ // Run 2: only the survivor is still active. The orphan died before
discoverTasks() could observe it,
+ // so with the old eager-write bug it would linger in
taskIdToServerPriority and block replacement.
+ EasyMock.reset(taskQueue, taskStorage);
+ EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE))
+ .andReturn(Map.of(survivor.getId(), survivor))
+ .anyTimes();
+ EasyMock.expect(taskStorage.getStatus(survivor.getId()))
+ .andReturn(Optional.of(TaskStatus.running(survivor.getId())))
+ .anyTimes();
+
EasyMock.expect(taskStorage.getTask(survivor.getId())).andReturn(Optional.of(survivor)).anyTimes();
+ final Capture<Task> replacementCapture = Capture.newInstance();
+
EasyMock.expect(taskQueue.add(EasyMock.capture(replacementCapture))).andReturn(true).once();
+ EasyMock.replay(taskQueue, taskStorage);
+
+ // Must not throw.
+ supervisor.runInternal();
+
+ // Replacement task should carry the orphan's missing priority, not
duplicate the survivor's.
+ final TestSeekableStreamIndexTask replacement =
(TestSeekableStreamIndexTask) replacementCapture.getValue();
+ Assert.assertEquals(orphan.getServerPriority(),
replacement.getServerPriority());
+ }
+
@Test
public void testDynamicAllocationScaleUpAllowedWhenCooldownElapsed()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]