This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 9c94183c247 MINOR: rename TaskRegistry methods to better reflect their
purpose. (#21448) (#21501)
9c94183c247 is described below
commit 9c94183c24735df86b11d7b3ededd4cfbf50abd1
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Wed Feb 18 14:58:26 2026 -0800
MINOR: rename TaskRegistry methods to better reflect their purpose.
(#21448) (#21501)
Cherry-pick of https://github.com/apache/kafka/pull/21448
Changed the name of method that work only with initialized tasks(not
pending) to better reflect their purpose.
Reviewers: Matthias J. Sax <[email protected]>, Lucas Brutschy
<[email protected]>
Conflicts:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
---
.../streams/processor/internals/TaskExecutor.java | 6 +-
.../streams/processor/internals/TaskManager.java | 62 +++----
.../kafka/streams/processor/internals/Tasks.java | 25 +--
.../streams/processor/internals/TasksRegistry.java | 22 +--
.../internals/tasks/DefaultTaskManager.java | 16 +-
.../processor/internals/TaskExecutorTest.java | 4 +-
.../processor/internals/TaskManagerTest.java | 194 ++++++++++-----------
.../streams/processor/internals/TasksTest.java | 52 +++---
.../internals/tasks/DefaultTaskManagerTest.java | 74 ++++----
9 files changed, 228 insertions(+), 227 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index 91deab0dd9d..8294bf407d0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -68,7 +68,7 @@ public class TaskExecutor {
int totalProcessed = 0;
Task lastProcessed = null;
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.activeInitializedTasks()) {
final long now = time.milliseconds();
try {
if (executionMetadata.canProcessTask(task, now)) {
@@ -233,7 +233,7 @@ public class TaskExecutor {
private void updateTaskCommitMetadata(final Map<TopicPartition,
OffsetAndMetadata> allOffsets) {
if (!allOffsets.isEmpty()) {
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.activeInitializedTasks()) {
if (task instanceof StreamTask) {
for (final TopicPartition topicPartition :
task.inputPartitions()) {
if (allOffsets.containsKey(topicPartition)) {
@@ -261,7 +261,7 @@ public class TaskExecutor {
int punctuate() {
int punctuated = 0;
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.activeInitializedTasks()) {
try {
if (executionMetadata.canPunctuateTask(task)) {
if (task.maybePunctuateStreamTime()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index ecf23470dd9..6486e9e6e99 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -203,7 +203,7 @@ public class TaskManager {
mainConsumer.pause(mainConsumer.assignment());
} else {
// All tasks that are owned by the task manager are ready and do
not need to be paused
- final Set<TopicPartition> partitionsNotToPause =
tasks.allNonFailedTasks()
+ final Set<TopicPartition> partitionsNotToPause =
tasks.allNonFailedInitializedTasks()
.stream()
.flatMap(task -> task.inputPartitions().stream())
.collect(Collectors.toSet());
@@ -221,7 +221,7 @@ public class TaskManager {
* @throws TaskMigratedException
*/
boolean handleCorruption(final Set<TaskId> corruptedTasks) {
- final Set<TaskId> activeTasks = new HashSet<>(tasks.activeTaskIds());
+ final Set<TaskId> activeTasks = new
HashSet<>(tasks.activeInitializedTaskIds());
// We need to stop all processing, since we need to commit
non-corrupted tasks as well.
maybeLockTasks(activeTasks);
@@ -230,7 +230,7 @@ public class TaskManager {
final Set<Task> corruptedStandbyTasks = new HashSet<>();
for (final TaskId taskId : corruptedTasks) {
- final Task task = tasks.task(taskId);
+ final Task task = tasks.initializedTask(taskId);
if (task.isActive()) {
corruptedActiveTasks.add(task);
} else {
@@ -244,7 +244,7 @@ public class TaskManager {
// We need to commit before closing the corrupted active tasks since
this will force the ongoing txn to abort
try {
- final Collection<Task> tasksToCommit = tasks.allTasksPerId()
+ final Collection<Task> tasksToCommit =
tasks.allInitializedTasksPerId()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING)
@@ -254,10 +254,10 @@ public class TaskManager {
} catch (final TaskCorruptedException e) {
log.info("Some additional tasks were found corrupted while trying
to commit, these will be added to the " +
"tasks to clean and revive: {}", e.corruptedTasks());
- corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
+
corruptedActiveTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
} catch (final TimeoutException e) {
log.info("Hit TimeoutException when committing all non-corrupted
tasks, these will be closed and revived");
- final Collection<Task> uncorruptedTasks = new
HashSet<>(tasks.activeTasks());
+ final Collection<Task> uncorruptedTasks = new
HashSet<>(tasks.activeInitializedTasks());
uncorruptedTasks.removeAll(corruptedActiveTasks);
// Those tasks which just timed out can just be closed dirty
without marking changelogs as corrupted
closeDirtyAndRevive(uncorruptedTasks, false);
@@ -361,7 +361,7 @@ public class TaskManager {
final Set<Task> tasksToCloseClean = new
TreeSet<>(Comparator.comparing(Task::id));
final Set<TaskId> tasksToLock =
- tasks.allTaskIds().stream()
+ tasks.allInitializedTaskIds().stream()
.filter(x -> activeTasksToCreate.containsKey(x) ||
standbyTasksToCreate.containsKey(x))
.collect(Collectors.toSet());
@@ -464,7 +464,7 @@ public class TaskManager {
final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
final Map<Task,
Set<TopicPartition>> tasksToRecycle,
final Set<Task>
tasksToCloseClean) {
- for (final Task task : tasks.allTasks()) {
+ for (final Task task : tasks.allInitializedTasks()) {
final TaskId taskId = task.id();
if (activeTasksToCreate.containsKey(taskId)) {
if (task.isActive()) {
@@ -539,7 +539,7 @@ public class TaskManager {
final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
final Map<Task,
Set<TopicPartition>> tasksToRecycle,
final Set<Task>
tasksToCloseClean) {
- for (final Task task : tasks.allNonFailedTasks()) {
+ for (final Task task : tasks.allNonFailedInitializedTasks()) {
if (!task.isActive()) {
throw new IllegalStateException("Standby tasks should only be
managed by the state updater, " +
"but standby task " + task.id() + " is managed by the
stream thread");
@@ -739,7 +739,7 @@ public class TaskManager {
while (iter.hasNext()) {
final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
final TaskId taskId = entry.getKey();
- final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)
+ final boolean taskIsOwned =
tasks.allInitializedTaskIds().contains(taskId)
|| (stateUpdater != null &&
stateUpdater.tasks().stream().anyMatch(task -> task.id() == taskId));
if (taskId.topologyName() != null && !taskIsOwned &&
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
log.info("Cannot create the assigned task {} since it's
topology name cannot be recognized, will put it " +
@@ -868,7 +868,7 @@ public class TaskManager {
changelogReader.enforceRestoreActive();
final List<Task> activeTasks = new LinkedList<>();
- for (final Task task : tasks.allTasks()) {
+ for (final Task task : tasks.allInitializedTasks()) {
try {
task.initializeIfNeeded();
task.clearTaskTimeout();
@@ -1160,7 +1160,7 @@ public class TaskManager {
e.corruptedTasks());
// If we hit a TaskCorruptedException it must be EOS, just handle
the cleanup for those corrupted tasks right here
- dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+ dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
closeDirtyAndRevive(dirtyTasks, true);
} catch (final TimeoutException e) {
log.warn("Timed out while trying to commit all tasks during
revocation, these will be cleaned and revived");
@@ -1276,8 +1276,8 @@ public class TaskManager {
}
private void closeRunningTasksDirty() {
- final Set<Task> allTask = tasks.allTasks();
- final Set<TaskId> allTaskIds = tasks.allTaskIds();
+ final Set<Task> allTask = tasks.allInitializedTasks();
+ final Set<TaskId> allTaskIds = tasks.allInitializedTaskIds();
maybeLockTasks(allTaskIds);
for (final Task task : allTask) {
// Even though we've apparently dropped out of the group, we can
continue safely to maintain our
@@ -1499,10 +1499,10 @@ public class TaskManager {
// TODO: change type to `StreamTask`
final Set<Task> activeTasks = new
TreeSet<>(Comparator.comparing(Task::id));
- activeTasks.addAll(tasks.activeTasks());
+ activeTasks.addAll(tasks.activeInitializedTasks());
// TODO: change type to `StandbyTask`
final Set<Task> standbyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
- standbyTasks.addAll(tasks.standbyTasks());
+ standbyTasks.addAll(tasks.standbyInitializedTasks());
final Set<Task> pendingActiveTasks =
tasks.drainPendingActiveTasksToInit();
activeTasks.addAll(pendingActiveTasks);
@@ -1759,11 +1759,11 @@ public class TaskManager {
// if any outside code modifies the map or the tasks, it would be a
severe transgression.
if (stateUpdater != null) {
final Map<TaskId, Task> ret =
stateUpdater.tasks().stream().collect(Collectors.toMap(Task::id, x -> x));
- ret.putAll(tasks.allTasksPerId());
+ ret.putAll(tasks.allInitializedTasksPerId());
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
x -> x)));
return ret;
} else {
- return tasks.allTasksPerId();
+ return tasks.allInitializedTasksPerId();
}
}
@@ -1777,7 +1777,7 @@ public class TaskManager {
Map<TaskId, Task> allOwnedTasks() {
// not bothering with an unmodifiable map, since the tasks themselves
are mutable, but
// if any outside code modifies the map or the tasks, it would be a
severe transgression.
- return tasks.allTasksPerId();
+ return tasks.allInitializedTasksPerId();
}
Set<Task> readOnlyAllTasks() {
@@ -1785,15 +1785,15 @@ public class TaskManager {
// if any outside code modifies the map or the tasks, it would be a
severe transgression.
if (stateUpdater != null) {
final HashSet<Task> ret = new HashSet<>(stateUpdater.tasks());
- ret.addAll(tasks.allTasks());
+ ret.addAll(tasks.allInitializedTasks());
return Collections.unmodifiableSet(ret);
} else {
- return Collections.unmodifiableSet(tasks.allTasks());
+ return Collections.unmodifiableSet(tasks.allInitializedTasks());
}
}
Map<TaskId, Task> notPausedTasks() {
- return Collections.unmodifiableMap(tasks.allTasks()
+ return Collections.unmodifiableMap(tasks.allInitializedTasks()
.stream()
.filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
.collect(Collectors.toMap(Task::id, v -> v)));
@@ -1822,7 +1822,7 @@ public class TaskManager {
}
private Stream<Task> activeRunningTaskStream() {
- return tasks.allTasks().stream().filter(Task::isActive);
+ return tasks.allInitializedTasks().stream().filter(Task::isActive);
}
Map<TaskId, Task> standbyTaskMap() {
@@ -1834,7 +1834,7 @@ public class TaskManager {
}
private Stream<Task> standbyTaskStream() {
- final Stream<Task> standbyTasksInTaskRegistry =
tasks.allTasks().stream().filter(t -> !t.isActive());
+ final Stream<Task> standbyTasksInTaskRegistry =
tasks.allInitializedTasks().stream().filter(t -> !t.isActive());
if (stateUpdater != null) {
return Stream.concat(
stateUpdater.standbyTasks().stream(),
@@ -1846,7 +1846,7 @@ public class TaskManager {
}
// For testing only.
int commitAll() {
- return commit(tasks.allTasks());
+ return commit(tasks.allInitializedTasks());
}
/**
@@ -1854,7 +1854,7 @@ public class TaskManager {
* the corresponding record queues have capacity (again).
*/
public void resumePollingForPartitionsWithAvailableSpace() {
- for (final Task t: tasks.activeTasks()) {
+ for (final Task t: tasks.activeInitializedTasks()) {
t.resumePollingForPartitionsWithAvailableSpace();
}
}
@@ -1863,7 +1863,7 @@ public class TaskManager {
* Fetches up-to-date lag information from the consumer.
*/
public void updateLags() {
- for (final Task t: tasks.activeTasks()) {
+ for (final Task t: tasks.activeInitializedTasks()) {
t.updateLags();
}
}
@@ -1913,7 +1913,7 @@ public class TaskManager {
}
private Task getActiveTask(final TopicPartition partition) {
- final Task activeTask = tasks.activeTasksForInputPartition(partition);
+ final Task activeTask =
tasks.activeInitializedTasksForInputPartition(partition);
if (activeTask == null) {
log.error("Unable to locate active task for received-record
partition {}. Current tasks: {}",
@@ -2017,7 +2017,7 @@ public class TaskManager {
}
public void updateTaskEndMetadata(final TopicPartition topicPartition,
final Long offset) {
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.activeInitializedTasks()) {
if (task instanceof StreamTask) {
if (task.inputPartitions().contains(topicPartition)) {
((StreamTask) task).updateEndOffsets(topicPartition,
offset);
@@ -2048,7 +2048,7 @@ public class TaskManager {
try {
final Set<Task> activeTasksToRemove = new HashSet<>();
final Set<Task> standbyTasksToRemove = new HashSet<>();
- for (final Task task : tasks.allTasks()) {
+ for (final Task task : tasks.allInitializedTasks()) {
if
(!currentNamedTopologies.contains(task.id().topologyName())) {
if (task.isActive()) {
activeTasksToRemove.add(task);
@@ -2137,7 +2137,7 @@ public class TaskManager {
stringBuilder.append("TaskManager\n");
stringBuilder.append(indent).append("\tMetadataState:\n");
stringBuilder.append(indent).append("\tTasks:\n");
- for (final Task task : tasks.allTasks()) {
+ for (final Task task : tasks.allInitializedTasks()) {
stringBuilder.append(indent)
.append("\t\t")
.append(task.id())
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 82bd2a9103c..fc191dec164 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -291,6 +291,7 @@ class Tasks implements TasksRegistry {
@Override
public synchronized void clear() {
pendingTasksToInit.clear();
+ pendingTasksToClose.clear();
pendingActiveTasksToCreate.clear();
pendingStandbyTasksToCreate.clear();
activeTasksPerId.clear();
@@ -301,7 +302,7 @@ class Tasks implements TasksRegistry {
// TODO: change return type to `StreamTask`
@Override
- public Task activeTasksForInputPartition(final TopicPartition partition) {
+ public Task activeInitializedTasksForInputPartition(final TopicPartition
partition) {
return activeTasksPerPartition.get(partition);
}
@@ -316,7 +317,7 @@ class Tasks implements TasksRegistry {
}
@Override
- public Task task(final TaskId taskId) {
+ public Task initializedTask(final TaskId taskId) {
final Task task = getTask(taskId);
if (task != null)
@@ -326,26 +327,26 @@ class Tasks implements TasksRegistry {
}
@Override
- public Collection<Task> tasks(final Collection<TaskId> taskIds) {
+ public Collection<Task> initializedTasks(final Collection<TaskId> taskIds)
{
final Set<Task> tasks = new HashSet<>();
for (final TaskId taskId : taskIds) {
- tasks.add(task(taskId));
+ tasks.add(initializedTask(taskId));
}
return tasks;
}
@Override
- public synchronized Collection<TaskId> activeTaskIds() {
+ public synchronized Collection<TaskId> activeInitializedTaskIds() {
return Collections.unmodifiableCollection(activeTasksPerId.keySet());
}
@Override
- public synchronized Collection<Task> activeTasks() {
+ public synchronized Collection<Task> activeInitializedTasks() {
return Collections.unmodifiableCollection(activeTasksPerId.values());
}
@Override
- public synchronized Collection<Task> standbyTasks() {
+ public synchronized Collection<Task> standbyInitializedTasks() {
return Collections.unmodifiableCollection(standbyTasksPerId.values());
}
@@ -354,12 +355,12 @@ class Tasks implements TasksRegistry {
* and the returned task could be modified by other threads concurrently
*/
@Override
- public synchronized Set<Task> allTasks() {
+ public synchronized Set<Task> allInitializedTasks() {
return union(HashSet::new, new HashSet<>(activeTasksPerId.values()),
new HashSet<>(standbyTasksPerId.values()));
}
@Override
- public synchronized Set<Task> allNonFailedTasks() {
+ public synchronized Set<Task> allNonFailedInitializedTasks() {
final Set<Task> nonFailedActiveTasks =
activeTasksPerId.values().stream()
.filter(task -> !failedTaskIds.contains(task.id()))
.collect(Collectors.toSet());
@@ -370,12 +371,12 @@ class Tasks implements TasksRegistry {
}
@Override
- public synchronized Set<TaskId> allTaskIds() {
+ public synchronized Set<TaskId> allInitializedTaskIds() {
return union(HashSet::new, activeTasksPerId.keySet(),
standbyTasksPerId.keySet());
}
@Override
- public synchronized Map<TaskId, Task> allTasksPerId() {
+ public synchronized Map<TaskId, Task> allInitializedTasksPerId() {
final Map<TaskId, Task> ret = new HashMap<>();
ret.putAll(activeTasksPerId);
ret.putAll(standbyTasksPerId);
@@ -383,7 +384,7 @@ class Tasks implements TasksRegistry {
}
@Override
- public boolean contains(final TaskId taskId) {
+ public boolean containsInitialized(final TaskId taskId) {
return getTask(taskId) != null;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index 9e13d0f8c52..a6e660fabab 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -71,25 +71,25 @@ public interface TasksRegistry {
void clear();
- Task activeTasksForInputPartition(final TopicPartition partition);
+ Task activeInitializedTasksForInputPartition(final TopicPartition
partition);
- Task task(final TaskId taskId);
+ Task initializedTask(final TaskId taskId);
- Collection<Task> tasks(final Collection<TaskId> taskIds);
+ Collection<Task> initializedTasks(final Collection<TaskId> taskIds);
- Collection<TaskId> activeTaskIds();
+ Collection<TaskId> activeInitializedTaskIds();
- Collection<Task> activeTasks();
+ Collection<Task> activeInitializedTasks();
- Collection<Task> standbyTasks();
+ Collection<Task> standbyInitializedTasks();
- Set<Task> allTasks();
+ Set<Task> allInitializedTasks();
- Set<Task> allNonFailedTasks();
+ Set<Task> allNonFailedInitializedTasks();
- Map<TaskId, Task> allTasksPerId();
+ Map<TaskId, Task> allInitializedTasksPerId();
- Set<TaskId> allTaskIds();
+ Set<TaskId> allInitializedTaskIds();
- boolean contains(final TaskId taskId);
+ boolean containsInitialized(final TaskId taskId);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
index 51ec80d05ac..2259f7768c2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -102,7 +102,7 @@ public final class DefaultTaskManager implements
TaskManager {
}
// the most naive scheduling algorithm for now: give the next
unlocked, unassigned, and processable task
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.activeInitializedTasks()) {
if (!assignedTasks.containsKey(task.id()) &&
!lockedTasks.contains(task.id()) &&
canProgress((StreamTask) task, time.milliseconds()) &&
@@ -126,7 +126,7 @@ public final class DefaultTaskManager implements
TaskManager {
@Override
public void awaitProcessableTasks(final Supplier<Boolean> isShuttingDown)
throws InterruptedException {
final boolean interrupted = returnWithTasksLocked(() -> {
- for (final Task task : tasks.activeTasks()) {
+ for (final Task task : tasks.activeInitializedTasks()) {
if (!assignedTasks.containsKey(task.id()) &&
!lockedTasks.contains(task.id()) &&
canProgress((StreamTask) task, time.milliseconds()) &&
@@ -200,7 +200,7 @@ public final class DefaultTaskManager implements
TaskManager {
final Set<TaskId> remainingTaskIds = new
ConcurrentSkipListSet<>(taskIds);
for (final TaskId taskId : taskIds) {
- final Task task = tasks.task(taskId);
+ final Task task = tasks.initializedTask(taskId);
if (task == null) {
throw new IllegalArgumentException("Trying to lock task "
+ taskId + " but it's not owned");
@@ -243,7 +243,7 @@ public final class DefaultTaskManager implements
TaskManager {
@Override
public KafkaFuture<Void> lockAllTasks() {
return returnWithTasksLocked(() ->
-
lockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet()))
+
lockTasks(tasks.activeInitializedTasks().stream().map(Task::id).collect(Collectors.toSet()))
);
}
@@ -263,7 +263,7 @@ public final class DefaultTaskManager implements
TaskManager {
@Override
public void unlockAllTasks() {
- executeWithTasksLocked(() ->
unlockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet())));
+ executeWithTasksLocked(() ->
unlockTasks(tasks.activeInitializedTasks().stream().map(Task::id).collect(Collectors.toSet())));
}
@Override
@@ -290,11 +290,11 @@ public final class DefaultTaskManager implements
TaskManager {
throw new IllegalArgumentException("The task to remove is not
locked yet by the task manager");
}
- if (!tasks.contains(taskId)) {
+ if (!tasks.containsInitialized(taskId)) {
throw new IllegalArgumentException("The task to remove is not
owned by the task manager");
}
- tasks.removeTask(tasks.task(taskId));
+ tasks.removeTask(tasks.initializedTask(taskId));
});
log.info("Removed task {} from the task manager", taskId);
@@ -302,7 +302,7 @@ public final class DefaultTaskManager implements
TaskManager {
@Override
public Set<ReadOnlyTask> getTasks() {
- return returnWithTasksLocked(() ->
tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
+ return returnWithTasksLocked(() ->
tasks.activeInitializedTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
}
@Override
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
index 8d9ef70c5e3..568998931c9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
@@ -38,7 +38,7 @@ public class TaskExecutorTest {
final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager,
metadata, new LogContext());
taskExecutor.punctuate();
- verify(tasks).activeTasks();
+ verify(tasks).activeInitializedTasks();
}
@Test
@@ -59,4 +59,4 @@ public class TaskExecutorTest {
verify(producer).commitTransaction(Collections.emptyMap(),
groupMetadata);
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index fae4afbdbac..579b5e1fc6a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -271,8 +271,8 @@ public class TaskManagerTest {
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
- when(tasks.task(taskId00)).thenReturn(activeTask1);
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
+ when(tasks.initializedTask(taskId00)).thenReturn(activeTask1);
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -306,7 +306,7 @@ public class TaskManagerTest {
public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+ when(tasks.allInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -329,7 +329,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(activeTask1,
activeTask2));
final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
@@ -365,7 +365,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(activeTask1,
activeTask2));
taskManager.resumePollingForPartitionsWithAvailableSpace();
@@ -383,7 +383,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(activeTask1,
activeTask2));
taskManager.updateLags();
@@ -662,7 +662,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(reassignedActiveTask));
taskManager.handleAssignment(
mkMap(mkEntry(reassignedActiveTask.id(),
reassignedActiveTask.inputPartitions())),
@@ -701,7 +701,7 @@ public class TaskManagerTest {
assertEquals(taskException, exception.getCause());
verify(tasks).addFailedTask(failedActiveTaskToRecycle);
verify(tasks, never()).addTask(failedActiveTaskToRecycle);
- verify(tasks).allNonFailedTasks();
+ verify(tasks).allNonFailedInitializedTasks();
verify(standbyTaskCreator,
never()).createStandbyTaskFromActive(failedActiveTaskToRecycle,
taskId03Partitions);
}
@@ -731,7 +731,7 @@ public class TaskManagerTest {
assertEquals(taskException, exception.getCause());
verify(tasks).addFailedTask(failedStandbyTaskToRecycle);
verify(tasks, never()).addTask(failedStandbyTaskToRecycle);
- verify(tasks).allNonFailedTasks();
+ verify(tasks).allNonFailedInitializedTasks();
verify(activeTaskCreator,
never()).createActiveTaskFromStandby(failedStandbyTaskToRecycle,
taskId03Partitions, consumer);
}
@@ -761,7 +761,7 @@ public class TaskManagerTest {
assertEquals(taskException, exception.getCause());
verify(tasks).addFailedTask(failedActiveTaskToReassign);
verify(tasks, never()).addTask(failedActiveTaskToReassign);
- verify(tasks).allNonFailedTasks();
+ verify(tasks).allNonFailedInitializedTasks();
verify(tasks,
never()).updateActiveTaskInputPartitions(failedActiveTaskToReassign,
taskId00Partitions);
}
@@ -775,7 +775,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask1));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(reassignedActiveTask1));
when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask2));
when(stateUpdater.remove(reassignedActiveTask2.id()))
.thenReturn(CompletableFuture.completedFuture(new
StateUpdater.RemovedTaskResult(reassignedActiveTask2)));
@@ -880,7 +880,7 @@ public class TaskManagerTest {
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskInStateUpdater));
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
runningActiveTask)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
runningActiveTask)));
when(tasks.pendingTasksToInit()).thenReturn(Set.of(activeTaskToInit));
assertEquals(
taskManager.allTasks(),
@@ -903,7 +903,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
activeTask)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
activeTask)));
assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03,
activeTask)));
}
@@ -955,7 +955,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions)
.inState(State.CREATED).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToRecycle));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToRecycle));
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId01Partitions))
.thenReturn(standbyTask);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -975,7 +975,7 @@ public class TaskManagerTest {
.inState(State.RUNNING)
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
final IllegalStateException illegalStateException = assertThrows(
@@ -998,7 +998,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToClose));
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
@@ -1016,7 +1016,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToClose));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(standbyTaskToClose));
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -1036,7 +1036,7 @@ public class TaskManagerTest {
final Set<TopicPartition> newInputPartitions = taskId02Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions,
newInputPartitions)).thenReturn(true);
taskManager.handleAssignment(
@@ -1056,7 +1056,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToResume));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToResume.id(),
activeTaskToResume.inputPartitions())),
@@ -1074,7 +1074,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId03Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToResume));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToResume.id(),
activeTaskToResume.inputPartitions())),
@@ -1096,7 +1096,7 @@ public class TaskManagerTest {
final Set<TopicPartition> newInputPartitions = taskId03Partitions;
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
final IllegalStateException illegalStateException = assertThrows(
IllegalStateException.class,
@@ -1121,7 +1121,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToClose));
taskManager.handleAssignment(
mkMap(mkEntry(activeTaskToCreate.id(),
activeTaskToCreate.inputPartitions())),
@@ -1802,7 +1802,7 @@ public class TaskManagerTest {
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(statefulTask0));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(statefulTask0));
final Set<TopicPartition> assigned = Set.of(t1p0, t1p1);
when(consumer.assignment()).thenReturn(assigned);
@@ -1824,9 +1824,9 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask,
restoringStatefulTask));
-
when(tasks.allNonFailedTasks()).thenReturn(Set.of(runningStatefulTask));
+
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(runningStatefulTask));
expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
makeTaskFolders(
@@ -1858,7 +1858,7 @@ public class TaskManagerTest {
when(runningStatefulTask.changelogOffsets()).thenReturn(changelogOffsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
assertThat(
taskManager.taskOffsetSums(),
@@ -1944,7 +1944,7 @@ public class TaskManagerTest {
.thenReturn(mkMap(mkEntry(t1p2changelog,
changelogOffsetOfRestoringStandbyTask)));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask,
restoringStatefulTask));
assertThat(
@@ -1969,7 +1969,7 @@ public class TaskManagerTest {
));
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01,
restoringStatefulTask)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId01,
restoringStatefulTask)));
when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
assertThat(
@@ -2041,7 +2041,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, task)));
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
task)));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -2101,7 +2101,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -2122,7 +2122,7 @@ public class TaskManagerTest {
doThrow(new RuntimeException("KABOOM!")).when(task00).closeClean();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -2153,8 +2153,8 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
- when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
+ when(tasks.allInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
final ArrayList<TaskDirectory> taskFolders = new ArrayList<>(2);
taskFolders.add(new
TaskDirectory(testFolder.resolve(taskId00.toString()).toFile(), null));
@@ -2215,8 +2215,8 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
- when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
+ when(tasks.initializedTask(taskId03)).thenReturn(corruptedActiveTask);
+ when(tasks.initializedTask(taskId02)).thenReturn(corruptedStandbyTask);
taskManager.handleCorruption(Set.of(corruptedActiveTask.id(),
corruptedStandbyTask.id()));
@@ -2241,9 +2241,9 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(task00);
- when(tasks.allTasksPerId()).thenReturn(singletonMap(taskId00, task00));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00));
+ when(tasks.initializedTask(taskId00)).thenReturn(task00);
+
when(tasks.allInitializedTasksPerId()).thenReturn(singletonMap(taskId00,
task00));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00));
when(task00.prepareCommit(false)).thenReturn(emptyMap());
doNothing().when(task00).postCommit(anyBoolean());
@@ -2274,9 +2274,9 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(task00);
- when(tasks.allTasksPerId()).thenReturn(singletonMap(taskId00, task00));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00));
+ when(tasks.initializedTask(taskId00)).thenReturn(task00);
+
when(tasks.allInitializedTasksPerId()).thenReturn(singletonMap(taskId00,
task00));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00));
when(task00.prepareCommit(false)).thenReturn(emptyMap());
when(task00.changelogPartitions()).thenReturn(taskId00ChangelogPartitions);
@@ -2309,12 +2309,12 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(corruptedTask);
- when(tasks.allTasksPerId()).thenReturn(mkMap(
+ when(tasks.initializedTask(taskId00)).thenReturn(corruptedTask);
+ when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, corruptedTask),
mkEntry(taskId01, nonCorruptedTask)
));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
when(nonCorruptedTask.commitNeeded()).thenReturn(true);
when(nonCorruptedTask.prepareCommit(true)).thenReturn(emptyMap());
@@ -2349,8 +2349,8 @@ public class TaskManagerTest {
.withInputPartitions(taskId02Partitions)
.inState(State.RUNNING).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02,
corruptedTask)));
- when(tasks.task(taskId02)).thenReturn(corruptedTask);
+
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId02,
corruptedTask)));
+ when(tasks.initializedTask(taskId02)).thenReturn(corruptedTask);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
when(consumer.assignment()).thenReturn(intersection(HashSet::new,
taskId00Partitions, taskId01Partitions, taskId02Partitions));
@@ -2374,12 +2374,12 @@ public class TaskManagerTest {
.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(corruptedStandby);
- when(tasks.allTasksPerId()).thenReturn(mkMap(
+ when(tasks.initializedTask(taskId00)).thenReturn(corruptedStandby);
+ when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, corruptedStandby),
mkEntry(taskId01, runningNonCorruptedActive)
));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId01));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId01));
when(runningNonCorruptedActive.commitNeeded()).thenReturn(true);
when(runningNonCorruptedActive.prepareCommit(true))
@@ -2420,12 +2420,12 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(corruptedActive);
- when(tasks.allTasksPerId()).thenReturn(mkMap(
+ when(tasks.initializedTask(taskId00)).thenReturn(corruptedActive);
+ when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, corruptedActive),
mkEntry(taskId01, uncorruptedActive)
));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
when(uncorruptedActive.commitNeeded()).thenReturn(true);
when(uncorruptedActive.prepareCommit(true)).thenReturn(emptyMap());
@@ -2466,12 +2466,12 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(corruptedActive);
- when(tasks.allTasksPerId()).thenReturn(mkMap(
+ when(tasks.initializedTask(taskId00)).thenReturn(corruptedActive);
+ when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, corruptedActive),
mkEntry(taskId01, uncorruptedActive)
));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -2481,7 +2481,7 @@ public class TaskManagerTest {
// mock uncorrupted task to indicate that it needs commit and will
return offsets
final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
-
when(tasks.tasks(singleton(taskId01))).thenReturn(Set.of(uncorruptedActive));
+
when(tasks.initializedTasks(singleton(taskId01))).thenReturn(Set.of(uncorruptedActive));
when(uncorruptedActive.commitNeeded()).thenReturn(true);
when(uncorruptedActive.prepareCommit(true)).thenReturn(offsets);
when(uncorruptedActive.prepareCommit(false)).thenReturn(emptyMap());
@@ -2546,13 +2546,13 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.task(taskId00)).thenReturn(corruptedActive);
- when(tasks.allTasksPerId()).thenReturn(mkMap(
+ when(tasks.initializedTask(taskId00)).thenReturn(corruptedActive);
+ when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
mkEntry(taskId00, corruptedActive),
mkEntry(taskId01, uncorruptedActive)
));
- when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
- when(tasks.activeTasks()).thenReturn(Set.of(corruptedActive,
uncorruptedActive));
+ when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00,
taskId01));
+
when(tasks.activeInitializedTasks()).thenReturn(Set.of(corruptedActive,
uncorruptedActive));
// we need to mock uncorrupted task to indicate that it needs commit
and will return offsets
final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
@@ -2626,7 +2626,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(revokedActiveTask,
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(revokedActiveTask,
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
when(consumer.assignment()).thenReturn(union(HashSet::new,
taskId00Partitions, taskId01Partitions, taskId02Partitions));
@@ -2713,8 +2713,8 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(revokedActiveTask,
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
- when(tasks.tasks(Set.of(taskId00,
taskId01))).thenReturn(Set.of(revokedActiveTask,
unrevokedActiveTaskWithCommit));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(revokedActiveTask,
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
+ when(tasks.initializedTasks(Set.of(taskId00,
taskId01))).thenReturn(Set.of(revokedActiveTask,
unrevokedActiveTaskWithCommit));
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -2827,7 +2827,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
when(tasks.drainPendingTasksToInit()).thenReturn(emptySet());
when(tasks.hasPendingTasksToInit()).thenReturn(false);
@@ -2863,7 +2863,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final Set<TopicPartition> newPartitionsSet = Set.of(t1p1);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
when(tasks.drainPendingTasksToInit()).thenReturn(emptySet());
when(tasks.hasPendingTasksToInit()).thenReturn(false);
when(tasks.updateActiveTaskInputPartitions(task00,
newPartitionsSet)).thenReturn(true);
@@ -2999,7 +2999,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final Map<TopicPartition, OffsetAndMetadata> offsets =
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
@@ -3044,7 +3044,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02,
task10));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01,
task02, task10));
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -3136,7 +3136,7 @@ public class TaskManagerTest {
expectedCommittedOffsets.putAll(offsets01);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02,
task03));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01,
task02, task03));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3187,7 +3187,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01,
task02));
when(task00.commitNeeded()).thenReturn(false);
when(task01.commitNeeded()).thenReturn(true); // only task01 needs
commit
@@ -3218,7 +3218,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
when(stateUpdater.tasks()).thenReturn(Set.of(task01));
final Map<TaskId, Set<TopicPartition>> assignmentActive =
singletonMap(taskId00, taskId00Partitions);
@@ -3252,7 +3252,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+ when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
when(stateUpdater.tasks()).thenReturn(Set.of(task01));
// mock to remove standby task from state updater
@@ -3320,7 +3320,7 @@ public class TaskManagerTest {
doThrow(new RuntimeException("KABOOM!")).when(task00).suspend();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3360,7 +3360,7 @@ public class TaskManagerTest {
doThrow(new RuntimeException("oops"))
.when(task02).suspend();
- when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00, task01,
task02));
final RuntimeException exception = assertThrows(
RuntimeException.class,
@@ -3397,7 +3397,7 @@ public class TaskManagerTest {
doThrow(new
RuntimeException("whatever")).when(activeTaskCreator).close();
- when(tasks.activeTasks()).thenReturn(Set.of(task00));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
final RuntimeException exception = assertThrows(
RuntimeException.class,
@@ -3489,7 +3489,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01,
task02));
final RuntimeException thrown = assertThrows(RuntimeException.class,
() -> taskManager.handleRevocation(union(HashSet::new,
taskId01Partitions, taskId02Partitions)));
@@ -3580,7 +3580,7 @@ public class TaskManagerTest {
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask00)).thenReturn(Set.of());
when(stateUpdater.standbyTasks()).thenReturn(Set.of(standbyTask00));
- when(tasks.standbyTasks()).thenReturn(Set.of(standbyTask00));
+
when(tasks.standbyInitializedTasks()).thenReturn(Set.of(standbyTask00));
final CompletableFuture<StateUpdater.RemovedTaskResult>
futureForStandbyTask = new CompletableFuture<>();
when(stateUpdater.remove(taskId00)).thenReturn(futureForStandbyTask);
@@ -3800,7 +3800,7 @@ public class TaskManagerTest {
when(task01.prepareCommit(true)).thenReturn(emptyMap());
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3896,7 +3896,7 @@ public class TaskManagerTest {
when(task00.prepareCommit(true)).thenReturn(emptyMap());
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3924,7 +3924,7 @@ public class TaskManagerTest {
when(task01.commitNeeded()).thenReturn(true);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3953,7 +3953,7 @@ public class TaskManagerTest {
when(task01.prepareCommit(true)).thenReturn(offsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -3979,7 +3979,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task01, task02));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task01, task02));
final StreamsProducer producer = mock(StreamsProducer.class);
when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -4025,7 +4025,7 @@ public class TaskManagerTest {
when(task00.prepareCommit(true)).thenThrow(new
RuntimeException("opsh."));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4048,7 +4048,7 @@ public class TaskManagerTest {
when(task01.prepareCommit(true)).thenThrow(new
RuntimeException("opsh."));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4080,7 +4080,7 @@ public class TaskManagerTest {
.thenReturn(singletonMap(t1p1, 17L));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4110,7 +4110,7 @@ public class TaskManagerTest {
.thenReturn(singletonMap(t1p1, 17L));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4138,7 +4138,7 @@ public class TaskManagerTest {
when(task00.purgeableOffsets()).thenReturn(singletonMap(t1p1, 5L));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4195,7 +4195,7 @@ public class TaskManagerTest {
expectedCommittedOffsets.putAll(offsets1);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02,
task03));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01,
task02, task03));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4255,7 +4255,7 @@ public class TaskManagerTest {
.thenReturn(false); // no more records
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00, task01));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00,
task01));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4300,7 +4300,7 @@ public class TaskManagerTest {
.thenReturn(false);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00, task01,
task02));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4327,7 +4327,7 @@ public class TaskManagerTest {
.thenThrow(new TaskMigratedException("migrated", new
RuntimeException("cause")));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4344,7 +4344,7 @@ public class TaskManagerTest {
when(task00.process(anyLong())).thenThrow(new
RuntimeException("oops"));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4365,7 +4365,7 @@ public class TaskManagerTest {
.thenThrow(new TaskMigratedException("migrated", new
RuntimeException("cause")));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4382,7 +4382,7 @@ public class TaskManagerTest {
when(task00.maybePunctuateStreamTime()).thenThrow(new
KafkaException("oops"));
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4400,7 +4400,7 @@ public class TaskManagerTest {
when(task00.maybePunctuateSystemTime()).thenReturn(true);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.activeTasks()).thenReturn(Set.of(task00));
+ when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4436,7 +4436,7 @@ public class TaskManagerTest {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
- when(tasks.allTasks()).thenReturn(Set.of(task00));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(TaskManager.class)) {
appender.setClassLogger(TaskManager.class, Level.DEBUG);
@@ -4644,7 +4644,7 @@ public class TaskManagerTest {
when(task01.prepareCommit(true)).thenReturn(offsets);
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -4875,7 +4875,7 @@ public class TaskManagerTest {
.build();
final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+ when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
final TaskManager taskManager =
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index 0887c982873..bb2865d223c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -75,25 +75,25 @@ public class TasksTest {
tasks.addActiveTasks(Set.of(statefulTask, statelessTask));
tasks.addStandbyTasks(Collections.singletonList(standbyTask));
- assertEquals(statefulTask, tasks.task(statefulTask.id()));
- assertEquals(statelessTask, tasks.task(statelessTask.id()));
- assertEquals(standbyTask, tasks.task(standbyTask.id()));
-
- assertEquals(Set.of(statefulTask, statelessTask), new
HashSet<>(tasks.activeTasks()));
- assertEquals(Set.of(standbyTask), new HashSet<>(tasks.standbyTasks()));
- assertEquals(Set.of(statefulTask, statelessTask, standbyTask),
tasks.allTasks());
- assertEquals(Set.of(statefulTask, standbyTask),
tasks.tasks(Set.of(statefulTask.id(), standbyTask.id())));
- assertEquals(Set.of(statefulTask.id(), statelessTask.id(),
standbyTask.id()), tasks.allTaskIds());
+ assertEquals(statefulTask, tasks.initializedTask(statefulTask.id()));
+ assertEquals(statelessTask, tasks.initializedTask(statelessTask.id()));
+ assertEquals(standbyTask, tasks.initializedTask(standbyTask.id()));
+
+ assertEquals(Set.of(statefulTask, statelessTask), new
HashSet<>(tasks.activeInitializedTasks()));
+ assertEquals(Set.of(standbyTask), new
HashSet<>(tasks.standbyInitializedTasks()));
+ assertEquals(Set.of(statefulTask, statelessTask, standbyTask),
tasks.allInitializedTasks());
+ assertEquals(Set.of(statefulTask, standbyTask),
tasks.initializedTasks(Set.of(statefulTask.id(), standbyTask.id())));
+ assertEquals(Set.of(statefulTask.id(), statelessTask.id(),
standbyTask.id()), tasks.allInitializedTaskIds());
assertEquals(
mkMap(
mkEntry(statefulTask.id(), statefulTask),
mkEntry(statelessTask.id(), statelessTask),
mkEntry(standbyTask.id(), standbyTask)
),
- tasks.allTasksPerId());
- assertTrue(tasks.contains(statefulTask.id()));
- assertTrue(tasks.contains(statelessTask.id()));
- assertTrue(tasks.contains(statefulTask.id()));
+ tasks.allInitializedTasksPerId());
+ assertTrue(tasks.containsInitialized(statefulTask.id()));
+ assertTrue(tasks.containsInitialized(statelessTask.id()));
+ assertTrue(tasks.containsInitialized(statefulTask.id()));
}
@Test
@@ -192,12 +192,12 @@ public class TasksTest {
tasks.addFailedTask(activeTask1);
- assertEquals(activeTask1, tasks.task(TASK_0_0));
- assertEquals(activeTask2, tasks.task(TASK_0_1));
- assertTrue(tasks.allTasks().contains(activeTask1));
- assertTrue(tasks.allTasks().contains(activeTask2));
- assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
- assertTrue(tasks.allNonFailedTasks().contains(activeTask2));
+ assertEquals(activeTask1, tasks.initializedTask(TASK_0_0));
+ assertEquals(activeTask2, tasks.initializedTask(TASK_0_1));
+ assertTrue(tasks.allInitializedTasks().contains(activeTask1));
+ assertTrue(tasks.allInitializedTasks().contains(activeTask2));
+
assertFalse(tasks.allNonFailedInitializedTasks().contains(activeTask1));
+ assertTrue(tasks.allNonFailedInitializedTasks().contains(activeTask2));
}
@Test
@@ -207,11 +207,11 @@ public class TasksTest {
tasks.addFailedTask(activeTask1);
tasks.removeTask(activeTask1);
- assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
- assertFalse(tasks.allTasks().contains(activeTask1));
+
assertFalse(tasks.allNonFailedInitializedTasks().contains(activeTask1));
+ assertFalse(tasks.allInitializedTasks().contains(activeTask1));
tasks.addTask(activeTask1);
- assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+ assertTrue(tasks.allNonFailedInitializedTasks().contains(activeTask1));
}
@Test
@@ -221,11 +221,11 @@ public class TasksTest {
tasks.addFailedTask(activeTask1);
tasks.clear();
- assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
- assertFalse(tasks.allTasks().contains(activeTask1));
+
assertFalse(tasks.allNonFailedInitializedTasks().contains(activeTask1));
+ assertFalse(tasks.allInitializedTasks().contains(activeTask1));
tasks.addTask(activeTask1);
- assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+ assertTrue(tasks.allNonFailedInitializedTasks().contains(activeTask1));
}
@Test
@@ -260,6 +260,6 @@ public class TasksTest {
tasks.removeTask(activeTask1);
assertFalse(tasks.pendingTasksToInit().contains(activeTask1));
- assertFalse(tasks.allTasks().contains(activeTask1));
+ assertFalse(tasks.allInitializedTasks().contains(activeTask1));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
index 0261ac99720..879f62f0b08 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
@@ -69,8 +69,8 @@ public class DefaultTaskManagerTest {
@BeforeEach
public void setUp() {
when(task.isProcessable(anyLong())).thenReturn(true);
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
- when(tasks.task(taskId)).thenReturn(task);
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
+ when(tasks.initializedTask(taskId)).thenReturn(task);
}
@Test
@@ -94,14 +94,14 @@ public class DefaultTaskManagerTest {
taskManager.add(Collections.singleton(task));
verify(tasks).addTask(task);
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
assertEquals(1, taskManager.getTasks().size());
}
@Test
public void shouldAssignTaskThatCanBeProcessed() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -145,7 +145,7 @@ public class DefaultTaskManagerTest {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
- verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+ verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
awaitingThread.interrupt();
@@ -160,7 +160,7 @@ public class DefaultTaskManagerTest {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
- verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+ verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
taskManager.signalTaskExecutors();
@@ -180,7 +180,7 @@ public class DefaultTaskManagerTest {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
- verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+ verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
taskManager.unassignTask(task, taskExecutor);
@@ -195,7 +195,7 @@ public class DefaultTaskManagerTest {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
- verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+ verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
taskManager.add(Collections.singleton(task));
@@ -212,7 +212,7 @@ public class DefaultTaskManagerTest {
final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
final Thread awaitingThread = new Thread(awaitingRunnable);
awaitingThread.start();
- verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+ verify(tasks,
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
taskManager.unlockAllTasks();
@@ -225,7 +225,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldAssignTasksThatCanBeSystemTimePunctuated() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
when(task.canPunctuateSystemTime()).thenReturn(true);
@@ -236,7 +236,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldAssignTasksThatCanBeStreamTimePunctuated() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(true);
when(task.canPunctuateStreamTime()).thenReturn(true);
@@ -247,7 +247,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotAssignTasksIfUncaughtExceptionPresent() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
ensureTaskMakesProgress();
taskManager.assignNextTask(taskExecutor);
taskManager.setUncaughtException(new StreamsException("Exception"),
taskId);
@@ -259,7 +259,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotAssignTasksForPunctuationIfPunctuationDisabled() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(false);
when(task.canPunctuateStreamTime()).thenReturn(true);
when(task.canPunctuateSystemTime()).thenReturn(true);
@@ -270,7 +270,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotAssignTasksForProcessingIfProcessingDisabled() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(false);
when(task.isProcessable(anyLong())).thenReturn(true);
@@ -280,7 +280,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldUnassignTask() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -292,7 +292,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotUnassignNotOwnedTask() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -319,16 +319,16 @@ public class DefaultTaskManagerTest {
@Test
public void shouldRemoveTask() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
- when(tasks.task(task.id())).thenReturn(task);
- when(tasks.contains(task.id())).thenReturn(true);
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
+ when(tasks.initializedTask(task.id())).thenReturn(task);
+ when(tasks.containsInitialized(task.id())).thenReturn(true);
taskManager.lockTasks(Collections.singleton(task.id()));
taskManager.remove(task.id());
verify(tasks).removeTask(task);
reset(tasks);
- when(tasks.activeTasks()).thenReturn(Collections.emptySet());
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.emptySet());
assertEquals(0, taskManager.getTasks().size());
}
@@ -336,10 +336,10 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotAssignLockedTask() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
- when(tasks.task(task.id())).thenReturn(task);
- when(tasks.contains(task.id())).thenReturn(true);
+ when(tasks.initializedTask(task.id())).thenReturn(task);
+ when(tasks.containsInitialized(task.id())).thenReturn(true);
assertTrue(taskManager.lockTasks(Collections.singleton(task.id())).isDone());
@@ -349,10 +349,10 @@ public class DefaultTaskManagerTest {
@Test
public void shouldLockAnEmptySetOfTasks() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
- when(tasks.task(task.id())).thenReturn(task);
- when(tasks.contains(task.id())).thenReturn(true);
+ when(tasks.initializedTask(task.id())).thenReturn(task);
+ when(tasks.containsInitialized(task.id())).thenReturn(true);
assertTrue(taskManager.lockTasks(Collections.emptySet()).isDone());
@@ -365,10 +365,10 @@ public class DefaultTaskManagerTest {
final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
- when(tasks.task(task.id())).thenReturn(task);
- when(tasks.contains(task.id())).thenReturn(true);
+ when(tasks.initializedTask(task.id())).thenReturn(task);
+ when(tasks.containsInitialized(task.id())).thenReturn(true);
when(taskExecutor.unassign()).thenReturn(future);
assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -386,10 +386,10 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotAssignAnyLockedTask() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
- when(tasks.task(task.id())).thenReturn(task);
- when(tasks.contains(task.id())).thenReturn(true);
+ when(tasks.initializedTask(task.id())).thenReturn(task);
+ when(tasks.containsInitialized(task.id())).thenReturn(true);
assertTrue(taskManager.lockAllTasks().isDone());
@@ -407,7 +407,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldNotSetUncaughtExceptionsTwice() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
taskManager.assignNextTask(taskExecutor);
taskManager.setUncaughtException(exception, task.id());
@@ -419,7 +419,7 @@ public class DefaultTaskManagerTest {
@Test
public void shouldReturnAndClearExceptionsOnDrainExceptions() {
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);
taskManager.assignNextTask(taskExecutor);
taskManager.setUncaughtException(exception, task.id());
@@ -433,9 +433,9 @@ public class DefaultTaskManagerTest {
final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
taskManager.add(Collections.singleton(task));
- when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
- when(tasks.task(task.id())).thenReturn(task);
- when(tasks.contains(task.id())).thenReturn(true);
+
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
+ when(tasks.initializedTask(task.id())).thenReturn(task);
+ when(tasks.containsInitialized(task.id())).thenReturn(true);
when(taskExecutor.unassign()).thenReturn(future);
when(taskExecutionMetadata.canProcessTask(eq(task),
anyLong())).thenReturn(true);