This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 9c94183c247 MINOR: rename TaskRegistry methods to better reflect their 
purpose. (#21448) (#21501)
9c94183c247 is described below

commit 9c94183c24735df86b11d7b3ededd4cfbf50abd1
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Wed Feb 18 14:58:26 2026 -0800

    MINOR: rename TaskRegistry methods to better reflect their purpose. 
(#21448) (#21501)
    
    Cherry-pick of https://github.com/apache/kafka/pull/21448
    
    Changed the name of method that work only with initialized tasks(not
    pending) to better reflect their purpose.
    
    Reviewers: Matthias J. Sax <[email protected]>, Lucas Brutschy
     <[email protected]>
    
    Conflicts:
    
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
    
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
---
 .../streams/processor/internals/TaskExecutor.java  |   6 +-
 .../streams/processor/internals/TaskManager.java   |  62 +++----
 .../kafka/streams/processor/internals/Tasks.java   |  25 +--
 .../streams/processor/internals/TasksRegistry.java |  22 +--
 .../internals/tasks/DefaultTaskManager.java        |  16 +-
 .../processor/internals/TaskExecutorTest.java      |   4 +-
 .../processor/internals/TaskManagerTest.java       | 194 ++++++++++-----------
 .../streams/processor/internals/TasksTest.java     |  52 +++---
 .../internals/tasks/DefaultTaskManagerTest.java    |  74 ++++----
 9 files changed, 228 insertions(+), 227 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index 91deab0dd9d..8294bf407d0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -68,7 +68,7 @@ public class TaskExecutor {
         int totalProcessed = 0;
         Task lastProcessed = null;
 
-        for (final Task task : tasks.activeTasks()) {
+        for (final Task task : tasks.activeInitializedTasks()) {
             final long now = time.milliseconds();
             try {
                 if (executionMetadata.canProcessTask(task, now)) {
@@ -233,7 +233,7 @@ public class TaskExecutor {
 
     private void updateTaskCommitMetadata(final Map<TopicPartition, 
OffsetAndMetadata> allOffsets) {
         if (!allOffsets.isEmpty()) {
-            for (final Task task : tasks.activeTasks()) {
+            for (final Task task : tasks.activeInitializedTasks()) {
                 if (task instanceof StreamTask) {
                     for (final TopicPartition topicPartition : 
task.inputPartitions()) {
                         if (allOffsets.containsKey(topicPartition)) {
@@ -261,7 +261,7 @@ public class TaskExecutor {
     int punctuate() {
         int punctuated = 0;
 
-        for (final Task task : tasks.activeTasks()) {
+        for (final Task task : tasks.activeInitializedTasks()) {
             try {
                 if (executionMetadata.canPunctuateTask(task)) {
                     if (task.maybePunctuateStreamTime()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index ecf23470dd9..6486e9e6e99 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -203,7 +203,7 @@ public class TaskManager {
             mainConsumer.pause(mainConsumer.assignment());
         } else {
             // All tasks that are owned by the task manager are ready and do 
not need to be paused
-            final Set<TopicPartition> partitionsNotToPause = 
tasks.allNonFailedTasks()
+            final Set<TopicPartition> partitionsNotToPause = 
tasks.allNonFailedInitializedTasks()
                 .stream()
                 .flatMap(task -> task.inputPartitions().stream())
                 .collect(Collectors.toSet());
@@ -221,7 +221,7 @@ public class TaskManager {
      * @throws TaskMigratedException
      */
     boolean handleCorruption(final Set<TaskId> corruptedTasks) {
-        final Set<TaskId> activeTasks = new HashSet<>(tasks.activeTaskIds());
+        final Set<TaskId> activeTasks = new 
HashSet<>(tasks.activeInitializedTaskIds());
 
         // We need to stop all processing, since we need to commit 
non-corrupted tasks as well.
         maybeLockTasks(activeTasks);
@@ -230,7 +230,7 @@ public class TaskManager {
         final Set<Task> corruptedStandbyTasks = new HashSet<>();
 
         for (final TaskId taskId : corruptedTasks) {
-            final Task task = tasks.task(taskId);
+            final Task task = tasks.initializedTask(taskId);
             if (task.isActive()) {
                 corruptedActiveTasks.add(task);
             } else {
@@ -244,7 +244,7 @@ public class TaskManager {
 
         // We need to commit before closing the corrupted active tasks since 
this will force the ongoing txn to abort
         try {
-            final Collection<Task> tasksToCommit = tasks.allTasksPerId()
+            final Collection<Task> tasksToCommit = 
tasks.allInitializedTasksPerId()
                 .values()
                 .stream()
                 .filter(t -> t.state() == Task.State.RUNNING)
@@ -254,10 +254,10 @@ public class TaskManager {
         } catch (final TaskCorruptedException e) {
             log.info("Some additional tasks were found corrupted while trying 
to commit, these will be added to the " +
                          "tasks to clean and revive: {}", e.corruptedTasks());
-            corruptedActiveTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            
corruptedActiveTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
         } catch (final TimeoutException e) {
             log.info("Hit TimeoutException when committing all non-corrupted 
tasks, these will be closed and revived");
-            final Collection<Task> uncorruptedTasks = new 
HashSet<>(tasks.activeTasks());
+            final Collection<Task> uncorruptedTasks = new 
HashSet<>(tasks.activeInitializedTasks());
             uncorruptedTasks.removeAll(corruptedActiveTasks);
             // Those tasks which just timed out can just be closed dirty 
without marking changelogs as corrupted
             closeDirtyAndRevive(uncorruptedTasks, false);
@@ -361,7 +361,7 @@ public class TaskManager {
         final Set<Task> tasksToCloseClean = new 
TreeSet<>(Comparator.comparing(Task::id));
 
         final Set<TaskId> tasksToLock =
-            tasks.allTaskIds().stream()
+            tasks.allInitializedTaskIds().stream()
                 .filter(x -> activeTasksToCreate.containsKey(x) || 
standbyTasksToCreate.containsKey(x))
                 .collect(Collectors.toSet());
 
@@ -464,7 +464,7 @@ public class TaskManager {
                                                 final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
                                                 final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
                                                 final Set<Task> 
tasksToCloseClean) {
-        for (final Task task : tasks.allTasks()) {
+        for (final Task task : tasks.allInitializedTasks()) {
             final TaskId taskId = task.id();
             if (activeTasksToCreate.containsKey(taskId)) {
                 if (task.isActive()) {
@@ -539,7 +539,7 @@ public class TaskManager {
                                                 final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
                                                 final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
                                                 final Set<Task> 
tasksToCloseClean) {
-        for (final Task task : tasks.allNonFailedTasks()) {
+        for (final Task task : tasks.allNonFailedInitializedTasks()) {
             if (!task.isActive()) {
                 throw new IllegalStateException("Standby tasks should only be 
managed by the state updater, " +
                     "but standby task " + task.id() + " is managed by the 
stream thread");
@@ -739,7 +739,7 @@ public class TaskManager {
         while (iter.hasNext()) {
             final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
             final TaskId taskId = entry.getKey();
-            final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)
+            final boolean taskIsOwned = 
tasks.allInitializedTaskIds().contains(taskId)
                 || (stateUpdater != null && 
stateUpdater.tasks().stream().anyMatch(task -> task.id() == taskId));
             if (taskId.topologyName() != null && !taskIsOwned && 
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
                 log.info("Cannot create the assigned task {} since it's 
topology name cannot be recognized, will put it " +
@@ -868,7 +868,7 @@ public class TaskManager {
         changelogReader.enforceRestoreActive();
 
         final List<Task> activeTasks = new LinkedList<>();
-        for (final Task task : tasks.allTasks()) {
+        for (final Task task : tasks.allInitializedTasks()) {
             try {
                 task.initializeIfNeeded();
                 task.clearTaskTimeout();
@@ -1160,7 +1160,7 @@ public class TaskManager {
                      e.corruptedTasks());
 
             // If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
-            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            dirtyTasks.addAll(tasks.initializedTasks(e.corruptedTasks()));
             closeDirtyAndRevive(dirtyTasks, true);
         } catch (final TimeoutException e) {
             log.warn("Timed out while trying to commit all tasks during 
revocation, these will be cleaned and revived");
@@ -1276,8 +1276,8 @@ public class TaskManager {
     }
 
     private void closeRunningTasksDirty() {
-        final Set<Task> allTask = tasks.allTasks();
-        final Set<TaskId> allTaskIds = tasks.allTaskIds();
+        final Set<Task> allTask = tasks.allInitializedTasks();
+        final Set<TaskId> allTaskIds = tasks.allInitializedTaskIds();
         maybeLockTasks(allTaskIds);
         for (final Task task : allTask) {
             // Even though we've apparently dropped out of the group, we can 
continue safely to maintain our
@@ -1499,10 +1499,10 @@ public class TaskManager {
 
         // TODO: change type to `StreamTask`
         final Set<Task> activeTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
-        activeTasks.addAll(tasks.activeTasks());
+        activeTasks.addAll(tasks.activeInitializedTasks());
         // TODO: change type to `StandbyTask`
         final Set<Task> standbyTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
-        standbyTasks.addAll(tasks.standbyTasks());
+        standbyTasks.addAll(tasks.standbyInitializedTasks());
 
         final Set<Task> pendingActiveTasks = 
tasks.drainPendingActiveTasksToInit();
         activeTasks.addAll(pendingActiveTasks);
@@ -1759,11 +1759,11 @@ public class TaskManager {
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
         if (stateUpdater != null) {
             final Map<TaskId, Task> ret = 
stateUpdater.tasks().stream().collect(Collectors.toMap(Task::id, x -> x));
-            ret.putAll(tasks.allTasksPerId());
+            ret.putAll(tasks.allInitializedTasksPerId());
             
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
 x -> x)));
             return ret;
         } else {
-            return tasks.allTasksPerId();
+            return tasks.allInitializedTasksPerId();
         }
     }
 
@@ -1777,7 +1777,7 @@ public class TaskManager {
     Map<TaskId, Task> allOwnedTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
-        return tasks.allTasksPerId();
+        return tasks.allInitializedTasksPerId();
     }
 
     Set<Task> readOnlyAllTasks() {
@@ -1785,15 +1785,15 @@ public class TaskManager {
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
         if (stateUpdater != null) {
             final HashSet<Task> ret = new HashSet<>(stateUpdater.tasks());
-            ret.addAll(tasks.allTasks());
+            ret.addAll(tasks.allInitializedTasks());
             return Collections.unmodifiableSet(ret);
         } else {
-            return Collections.unmodifiableSet(tasks.allTasks());
+            return Collections.unmodifiableSet(tasks.allInitializedTasks());
         }
     }
 
     Map<TaskId, Task> notPausedTasks() {
-        return Collections.unmodifiableMap(tasks.allTasks()
+        return Collections.unmodifiableMap(tasks.allInitializedTasks()
             .stream()
             .filter(t -> !topologyMetadata.isPaused(t.id().topologyName()))
             .collect(Collectors.toMap(Task::id, v -> v)));
@@ -1822,7 +1822,7 @@ public class TaskManager {
     }
 
     private Stream<Task> activeRunningTaskStream() {
-        return tasks.allTasks().stream().filter(Task::isActive);
+        return tasks.allInitializedTasks().stream().filter(Task::isActive);
     }
 
     Map<TaskId, Task> standbyTaskMap() {
@@ -1834,7 +1834,7 @@ public class TaskManager {
     }
 
     private Stream<Task> standbyTaskStream() {
-        final Stream<Task> standbyTasksInTaskRegistry = 
tasks.allTasks().stream().filter(t -> !t.isActive());
+        final Stream<Task> standbyTasksInTaskRegistry = 
tasks.allInitializedTasks().stream().filter(t -> !t.isActive());
         if (stateUpdater != null) {
             return Stream.concat(
                 stateUpdater.standbyTasks().stream(),
@@ -1846,7 +1846,7 @@ public class TaskManager {
     }
     // For testing only.
     int commitAll() {
-        return commit(tasks.allTasks());
+        return commit(tasks.allInitializedTasks());
     }
 
     /**
@@ -1854,7 +1854,7 @@ public class TaskManager {
      * the corresponding record queues have capacity (again).
      */
     public void resumePollingForPartitionsWithAvailableSpace() {
-        for (final Task t: tasks.activeTasks()) {
+        for (final Task t: tasks.activeInitializedTasks()) {
             t.resumePollingForPartitionsWithAvailableSpace();
         }
     }
@@ -1863,7 +1863,7 @@ public class TaskManager {
      * Fetches up-to-date lag information from the consumer.
      */
     public void updateLags() {
-        for (final Task t: tasks.activeTasks()) {
+        for (final Task t: tasks.activeInitializedTasks()) {
             t.updateLags();
         }
     }
@@ -1913,7 +1913,7 @@ public class TaskManager {
     }
 
     private Task getActiveTask(final TopicPartition partition) {
-        final Task activeTask = tasks.activeTasksForInputPartition(partition);
+        final Task activeTask = 
tasks.activeInitializedTasksForInputPartition(partition);
 
         if (activeTask == null) {
             log.error("Unable to locate active task for received-record 
partition {}. Current tasks: {}",
@@ -2017,7 +2017,7 @@ public class TaskManager {
     }
 
     public void updateTaskEndMetadata(final TopicPartition topicPartition, 
final Long offset) {
-        for (final Task task : tasks.activeTasks()) {
+        for (final Task task : tasks.activeInitializedTasks()) {
             if (task instanceof StreamTask) {
                 if (task.inputPartitions().contains(topicPartition)) {
                     ((StreamTask) task).updateEndOffsets(topicPartition, 
offset);
@@ -2048,7 +2048,7 @@ public class TaskManager {
         try {
             final Set<Task> activeTasksToRemove = new HashSet<>();
             final Set<Task> standbyTasksToRemove = new HashSet<>();
-            for (final Task task : tasks.allTasks()) {
+            for (final Task task : tasks.allInitializedTasks()) {
                 if 
(!currentNamedTopologies.contains(task.id().topologyName())) {
                     if (task.isActive()) {
                         activeTasksToRemove.add(task);
@@ -2137,7 +2137,7 @@ public class TaskManager {
         stringBuilder.append("TaskManager\n");
         stringBuilder.append(indent).append("\tMetadataState:\n");
         stringBuilder.append(indent).append("\tTasks:\n");
-        for (final Task task : tasks.allTasks()) {
+        for (final Task task : tasks.allInitializedTasks()) {
             stringBuilder.append(indent)
                          .append("\t\t")
                          .append(task.id())
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 82bd2a9103c..fc191dec164 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -291,6 +291,7 @@ class Tasks implements TasksRegistry {
     @Override
     public synchronized void clear() {
         pendingTasksToInit.clear();
+        pendingTasksToClose.clear();
         pendingActiveTasksToCreate.clear();
         pendingStandbyTasksToCreate.clear();
         activeTasksPerId.clear();
@@ -301,7 +302,7 @@ class Tasks implements TasksRegistry {
 
     // TODO: change return type to `StreamTask`
     @Override
-    public Task activeTasksForInputPartition(final TopicPartition partition) {
+    public Task activeInitializedTasksForInputPartition(final TopicPartition 
partition) {
         return activeTasksPerPartition.get(partition);
     }
 
@@ -316,7 +317,7 @@ class Tasks implements TasksRegistry {
     }
 
     @Override
-    public Task task(final TaskId taskId) {
+    public Task initializedTask(final TaskId taskId) {
         final Task task = getTask(taskId);
 
         if (task != null)
@@ -326,26 +327,26 @@ class Tasks implements TasksRegistry {
     }
 
     @Override
-    public Collection<Task> tasks(final Collection<TaskId> taskIds) {
+    public Collection<Task> initializedTasks(final Collection<TaskId> taskIds) 
{
         final Set<Task> tasks = new HashSet<>();
         for (final TaskId taskId : taskIds) {
-            tasks.add(task(taskId));
+            tasks.add(initializedTask(taskId));
         }
         return tasks;
     }
 
     @Override
-    public synchronized Collection<TaskId> activeTaskIds() {
+    public synchronized Collection<TaskId> activeInitializedTaskIds() {
         return Collections.unmodifiableCollection(activeTasksPerId.keySet());
     }
 
     @Override
-    public synchronized Collection<Task> activeTasks() {
+    public synchronized Collection<Task> activeInitializedTasks() {
         return Collections.unmodifiableCollection(activeTasksPerId.values());
     }
 
     @Override
-    public synchronized Collection<Task> standbyTasks() {
+    public synchronized Collection<Task> standbyInitializedTasks() {
         return Collections.unmodifiableCollection(standbyTasksPerId.values());
     }
 
@@ -354,12 +355,12 @@ class Tasks implements TasksRegistry {
      * and the returned task could be modified by other threads concurrently
      */
     @Override
-    public synchronized Set<Task> allTasks() {
+    public synchronized Set<Task> allInitializedTasks() {
         return union(HashSet::new, new HashSet<>(activeTasksPerId.values()), 
new HashSet<>(standbyTasksPerId.values()));
     }
 
     @Override
-    public synchronized Set<Task> allNonFailedTasks() {
+    public synchronized Set<Task> allNonFailedInitializedTasks() {
         final Set<Task> nonFailedActiveTasks = 
activeTasksPerId.values().stream()
             .filter(task -> !failedTaskIds.contains(task.id()))
             .collect(Collectors.toSet());
@@ -370,12 +371,12 @@ class Tasks implements TasksRegistry {
     }
 
     @Override
-    public synchronized Set<TaskId> allTaskIds() {
+    public synchronized Set<TaskId> allInitializedTaskIds() {
         return union(HashSet::new, activeTasksPerId.keySet(), 
standbyTasksPerId.keySet());
     }
 
     @Override
-    public synchronized Map<TaskId, Task> allTasksPerId() {
+    public synchronized Map<TaskId, Task> allInitializedTasksPerId() {
         final Map<TaskId, Task> ret = new HashMap<>();
         ret.putAll(activeTasksPerId);
         ret.putAll(standbyTasksPerId);
@@ -383,7 +384,7 @@ class Tasks implements TasksRegistry {
     }
 
     @Override
-    public boolean contains(final TaskId taskId) {
+    public boolean containsInitialized(final TaskId taskId) {
         return getTask(taskId) != null;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index 9e13d0f8c52..a6e660fabab 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -71,25 +71,25 @@ public interface TasksRegistry {
 
     void clear();
 
-    Task activeTasksForInputPartition(final TopicPartition partition);
+    Task activeInitializedTasksForInputPartition(final TopicPartition 
partition);
 
-    Task task(final TaskId taskId);
+    Task initializedTask(final TaskId taskId);
 
-    Collection<Task> tasks(final Collection<TaskId> taskIds);
+    Collection<Task> initializedTasks(final Collection<TaskId> taskIds);
 
-    Collection<TaskId> activeTaskIds();
+    Collection<TaskId> activeInitializedTaskIds();
 
-    Collection<Task> activeTasks();
+    Collection<Task> activeInitializedTasks();
 
-    Collection<Task> standbyTasks();
+    Collection<Task> standbyInitializedTasks();
 
-    Set<Task> allTasks();
+    Set<Task> allInitializedTasks();
 
-    Set<Task> allNonFailedTasks();
+    Set<Task> allNonFailedInitializedTasks();
 
-    Map<TaskId, Task> allTasksPerId();
+    Map<TaskId, Task> allInitializedTasksPerId();
 
-    Set<TaskId> allTaskIds();
+    Set<TaskId> allInitializedTaskIds();
 
-    boolean contains(final TaskId taskId);
+    boolean containsInitialized(final TaskId taskId);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
index 51ec80d05ac..2259f7768c2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java
@@ -102,7 +102,7 @@ public final class DefaultTaskManager implements 
TaskManager {
             }
 
             // the most naive scheduling algorithm for now: give the next 
unlocked, unassigned, and  processable task
-            for (final Task task : tasks.activeTasks()) {
+            for (final Task task : tasks.activeInitializedTasks()) {
                 if (!assignedTasks.containsKey(task.id()) &&
                     !lockedTasks.contains(task.id()) &&
                     canProgress((StreamTask) task, time.milliseconds()) &&
@@ -126,7 +126,7 @@ public final class DefaultTaskManager implements 
TaskManager {
     @Override
     public void awaitProcessableTasks(final Supplier<Boolean> isShuttingDown) 
throws InterruptedException {
         final boolean interrupted = returnWithTasksLocked(() -> {
-            for (final Task task : tasks.activeTasks()) {
+            for (final Task task : tasks.activeInitializedTasks()) {
                 if (!assignedTasks.containsKey(task.id()) &&
                     !lockedTasks.contains(task.id()) &&
                     canProgress((StreamTask) task, time.milliseconds()) &&
@@ -200,7 +200,7 @@ public final class DefaultTaskManager implements 
TaskManager {
             final Set<TaskId> remainingTaskIds = new 
ConcurrentSkipListSet<>(taskIds);
 
             for (final TaskId taskId : taskIds) {
-                final Task task = tasks.task(taskId);
+                final Task task = tasks.initializedTask(taskId);
 
                 if (task == null) {
                     throw new IllegalArgumentException("Trying to lock task " 
+ taskId + " but it's not owned");
@@ -243,7 +243,7 @@ public final class DefaultTaskManager implements 
TaskManager {
     @Override
     public KafkaFuture<Void> lockAllTasks() {
         return returnWithTasksLocked(() ->
-            
lockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet()))
+            
lockTasks(tasks.activeInitializedTasks().stream().map(Task::id).collect(Collectors.toSet()))
         );
     }
 
@@ -263,7 +263,7 @@ public final class DefaultTaskManager implements 
TaskManager {
 
     @Override
     public void unlockAllTasks() {
-        executeWithTasksLocked(() -> 
unlockTasks(tasks.activeTasks().stream().map(Task::id).collect(Collectors.toSet())));
+        executeWithTasksLocked(() -> 
unlockTasks(tasks.activeInitializedTasks().stream().map(Task::id).collect(Collectors.toSet())));
     }
 
     @Override
@@ -290,11 +290,11 @@ public final class DefaultTaskManager implements 
TaskManager {
                 throw new IllegalArgumentException("The task to remove is not 
locked yet by the task manager");
             }
 
-            if (!tasks.contains(taskId)) {
+            if (!tasks.containsInitialized(taskId)) {
                 throw new IllegalArgumentException("The task to remove is not 
owned by the task manager");
             }
 
-            tasks.removeTask(tasks.task(taskId));
+            tasks.removeTask(tasks.initializedTask(taskId));
         });
 
         log.info("Removed task {} from the task manager", taskId);
@@ -302,7 +302,7 @@ public final class DefaultTaskManager implements 
TaskManager {
 
     @Override
     public Set<ReadOnlyTask> getTasks() {
-        return returnWithTasksLocked(() -> 
tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
+        return returnWithTasksLocked(() -> 
tasks.activeInitializedTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet()));
     }
 
     @Override
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
index 8d9ef70c5e3..568998931c9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
@@ -38,7 +38,7 @@ public class TaskExecutorTest {
         final TaskExecutor taskExecutor = new TaskExecutor(tasks, taskManager, 
metadata, new LogContext());
 
         taskExecutor.punctuate();
-        verify(tasks).activeTasks();
+        verify(tasks).activeInitializedTasks();
     }
 
     @Test
@@ -59,4 +59,4 @@ public class TaskExecutorTest {
 
         verify(producer).commitTransaction(Collections.emptyMap(), 
groupMetadata);
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index fae4afbdbac..579b5e1fc6a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -271,8 +271,8 @@ public class TaskManagerTest {
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
-        when(tasks.task(taskId00)).thenReturn(activeTask1);
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
+        when(tasks.initializedTask(taskId00)).thenReturn(activeTask1);
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
 
@@ -306,7 +306,7 @@ public class TaskManagerTest {
     public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+        when(tasks.allInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
 
@@ -329,7 +329,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(activeTask1, 
activeTask2));
         final KafkaFuture<Void> mockFuture = KafkaFuture.completedFuture(null);
         when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture);
 
@@ -365,7 +365,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(activeTask1, 
activeTask2));
 
         taskManager.resumePollingForPartitionsWithAvailableSpace();
 
@@ -383,7 +383,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(activeTask1, 
activeTask2));
 
         taskManager.updateLags();
 
@@ -662,7 +662,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(reassignedActiveTask));
 
         taskManager.handleAssignment(
             mkMap(mkEntry(reassignedActiveTask.id(), 
reassignedActiveTask.inputPartitions())),
@@ -701,7 +701,7 @@ public class TaskManagerTest {
         assertEquals(taskException, exception.getCause());
         verify(tasks).addFailedTask(failedActiveTaskToRecycle);
         verify(tasks, never()).addTask(failedActiveTaskToRecycle);
-        verify(tasks).allNonFailedTasks();
+        verify(tasks).allNonFailedInitializedTasks();
         verify(standbyTaskCreator, 
never()).createStandbyTaskFromActive(failedActiveTaskToRecycle, 
taskId03Partitions);
     }
 
@@ -731,7 +731,7 @@ public class TaskManagerTest {
         assertEquals(taskException, exception.getCause());
         verify(tasks).addFailedTask(failedStandbyTaskToRecycle);
         verify(tasks, never()).addTask(failedStandbyTaskToRecycle);
-        verify(tasks).allNonFailedTasks();
+        verify(tasks).allNonFailedInitializedTasks();
         verify(activeTaskCreator, 
never()).createActiveTaskFromStandby(failedStandbyTaskToRecycle, 
taskId03Partitions, consumer);
     }
 
@@ -761,7 +761,7 @@ public class TaskManagerTest {
         assertEquals(taskException, exception.getCause());
         verify(tasks).addFailedTask(failedActiveTaskToReassign);
         verify(tasks, never()).addTask(failedActiveTaskToReassign);
-        verify(tasks).allNonFailedTasks();
+        verify(tasks).allNonFailedInitializedTasks();
         verify(tasks, 
never()).updateActiveTaskInputPartitions(failedActiveTaskToReassign, 
taskId00Partitions);
     }
 
@@ -775,7 +775,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask1));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(reassignedActiveTask1));
         when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask2));
         when(stateUpdater.remove(reassignedActiveTask2.id()))
             .thenReturn(CompletableFuture.completedFuture(new 
StateUpdater.RemovedTaskResult(reassignedActiveTask2)));
@@ -880,7 +880,7 @@ public class TaskManagerTest {
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskInStateUpdater));
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
runningActiveTask)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
runningActiveTask)));
         when(tasks.pendingTasksToInit()).thenReturn(Set.of(activeTaskToInit));
         assertEquals(
             taskManager.allTasks(),
@@ -903,7 +903,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
activeTask)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
activeTask)));
         assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, 
activeTask)));
     }
 
@@ -955,7 +955,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions)
             .inState(State.CREATED).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToRecycle));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToRecycle));
         
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
taskId01Partitions))
             .thenReturn(standbyTask);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
@@ -975,7 +975,7 @@ public class TaskManagerTest {
             .inState(State.RUNNING)
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(standbyTaskToRecycle));
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         final IllegalStateException illegalStateException = assertThrows(
@@ -998,7 +998,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToClose));
 
         taskManager.handleAssignment(Collections.emptyMap(), 
Collections.emptyMap());
 
@@ -1016,7 +1016,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToClose));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(standbyTaskToClose));
 
         final IllegalStateException illegalStateException = assertThrows(
             IllegalStateException.class,
@@ -1036,7 +1036,7 @@ public class TaskManagerTest {
         final Set<TopicPartition> newInputPartitions = taskId02Partitions;
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions));
         
when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, 
newInputPartitions)).thenReturn(true);
 
         taskManager.handleAssignment(
@@ -1056,7 +1056,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToResume));
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToResume.id(), 
activeTaskToResume.inputPartitions())),
@@ -1074,7 +1074,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId03Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToResume));
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToResume.id(), 
activeTaskToResume.inputPartitions())),
@@ -1096,7 +1096,7 @@ public class TaskManagerTest {
         final Set<TopicPartition> newInputPartitions = taskId03Partitions;
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions));
 
         final IllegalStateException illegalStateException = assertThrows(
             IllegalStateException.class,
@@ -1121,7 +1121,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(activeTaskToClose));
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions())),
@@ -1802,7 +1802,7 @@ public class TaskManagerTest {
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(statefulTask0));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(statefulTask0));
         final Set<TopicPartition> assigned = Set.of(t1p0, t1p1);
         when(consumer.assignment()).thenReturn(assigned);
 
@@ -1824,9 +1824,9 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask, 
restoringStatefulTask));
-        
when(tasks.allNonFailedTasks()).thenReturn(Set.of(runningStatefulTask));
+        
when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(runningStatefulTask));
         expectLockObtainedFor(taskId00, taskId01, taskId02, taskId03);
         expectDirectoryNotEmpty(taskId00, taskId01, taskId02, taskId03);
         makeTaskFolders(
@@ -1858,7 +1858,7 @@ public class TaskManagerTest {
         
when(runningStatefulTask.changelogOffsets()).thenReturn(changelogOffsets);
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
 
         assertThat(
             taskManager.taskOffsetSums(),
@@ -1944,7 +1944,7 @@ public class TaskManagerTest {
             .thenReturn(mkMap(mkEntry(t1p2changelog, 
changelogOffsetOfRestoringStandbyTask)));
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask, 
restoringStatefulTask));
 
         assertThat(
@@ -1969,7 +1969,7 @@ public class TaskManagerTest {
             ));
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, 
restoringStatefulTask)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, 
restoringStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
 
         assertThat(
@@ -2041,7 +2041,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, task)));
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
task)));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -2101,7 +2101,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -2122,7 +2122,7 @@ public class TaskManagerTest {
         doThrow(new RuntimeException("KABOOM!")).when(task00).closeClean();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -2153,8 +2153,8 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
-        when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
+        when(tasks.allInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
 
         final ArrayList<TaskDirectory> taskFolders = new ArrayList<>(2);
         taskFolders.add(new 
TaskDirectory(testFolder.resolve(taskId00.toString()).toFile(), null));
@@ -2215,8 +2215,8 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.task(taskId03)).thenReturn(corruptedActiveTask);
-        when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask);
+        when(tasks.initializedTask(taskId03)).thenReturn(corruptedActiveTask);
+        when(tasks.initializedTask(taskId02)).thenReturn(corruptedStandbyTask);
 
         taskManager.handleCorruption(Set.of(corruptedActiveTask.id(), 
corruptedStandbyTask.id()));
 
@@ -2241,9 +2241,9 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(task00);
-        when(tasks.allTasksPerId()).thenReturn(singletonMap(taskId00, task00));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00));
+        when(tasks.initializedTask(taskId00)).thenReturn(task00);
+        
when(tasks.allInitializedTasksPerId()).thenReturn(singletonMap(taskId00, 
task00));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00));
 
         when(task00.prepareCommit(false)).thenReturn(emptyMap());
         doNothing().when(task00).postCommit(anyBoolean());
@@ -2274,9 +2274,9 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(task00);
-        when(tasks.allTasksPerId()).thenReturn(singletonMap(taskId00, task00));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00));
+        when(tasks.initializedTask(taskId00)).thenReturn(task00);
+        
when(tasks.allInitializedTasksPerId()).thenReturn(singletonMap(taskId00, 
task00));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00));
 
         when(task00.prepareCommit(false)).thenReturn(emptyMap());
         
when(task00.changelogPartitions()).thenReturn(taskId00ChangelogPartitions);
@@ -2309,12 +2309,12 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(corruptedTask);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(
+        when(tasks.initializedTask(taskId00)).thenReturn(corruptedTask);
+        when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
             mkEntry(taskId00, corruptedTask),
             mkEntry(taskId01, nonCorruptedTask)
         ));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
 
         when(nonCorruptedTask.commitNeeded()).thenReturn(true);
         when(nonCorruptedTask.prepareCommit(true)).thenReturn(emptyMap());
@@ -2349,8 +2349,8 @@ public class TaskManagerTest {
             .withInputPartitions(taskId02Partitions)
             .inState(State.RUNNING).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, 
corruptedTask)));
-        when(tasks.task(taskId02)).thenReturn(corruptedTask);
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, 
corruptedTask)));
+        when(tasks.initializedTask(taskId02)).thenReturn(corruptedTask);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(consumer.assignment()).thenReturn(intersection(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
@@ -2374,12 +2374,12 @@ public class TaskManagerTest {
             .withInputPartitions(taskId01Partitions).build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(corruptedStandby);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(
+        when(tasks.initializedTask(taskId00)).thenReturn(corruptedStandby);
+        when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
             mkEntry(taskId00, corruptedStandby),
             mkEntry(taskId01, runningNonCorruptedActive)
         ));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId01));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId01));
 
         when(runningNonCorruptedActive.commitNeeded()).thenReturn(true);
         when(runningNonCorruptedActive.prepareCommit(true))
@@ -2420,12 +2420,12 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(corruptedActive);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(
+        when(tasks.initializedTask(taskId00)).thenReturn(corruptedActive);
+        when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
             mkEntry(taskId00, corruptedActive),
             mkEntry(taskId01, uncorruptedActive)
         ));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
 
         when(uncorruptedActive.commitNeeded()).thenReturn(true);
         when(uncorruptedActive.prepareCommit(true)).thenReturn(emptyMap());
@@ -2466,12 +2466,12 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(corruptedActive);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(
+        when(tasks.initializedTask(taskId00)).thenReturn(corruptedActive);
+        when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
             mkEntry(taskId00, corruptedActive),
             mkEntry(taskId01, uncorruptedActive)
         ));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
 
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -2481,7 +2481,7 @@ public class TaskManagerTest {
 
         // mock uncorrupted task to indicate that it needs commit and will 
return offsets
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
-        
when(tasks.tasks(singleton(taskId01))).thenReturn(Set.of(uncorruptedActive));
+        
when(tasks.initializedTasks(singleton(taskId01))).thenReturn(Set.of(uncorruptedActive));
         when(uncorruptedActive.commitNeeded()).thenReturn(true);
         when(uncorruptedActive.prepareCommit(true)).thenReturn(offsets);
         when(uncorruptedActive.prepareCommit(false)).thenReturn(emptyMap());
@@ -2546,13 +2546,13 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.task(taskId00)).thenReturn(corruptedActive);
-        when(tasks.allTasksPerId()).thenReturn(mkMap(
+        when(tasks.initializedTask(taskId00)).thenReturn(corruptedActive);
+        when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(
             mkEntry(taskId00, corruptedActive),
             mkEntry(taskId01, uncorruptedActive)
         ));
-        when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01));
-        when(tasks.activeTasks()).thenReturn(Set.of(corruptedActive, 
uncorruptedActive));
+        when(tasks.activeInitializedTaskIds()).thenReturn(Set.of(taskId00, 
taskId01));
+        
when(tasks.activeInitializedTasks()).thenReturn(Set.of(corruptedActive, 
uncorruptedActive));
 
         // we need to mock uncorrupted task to indicate that it needs commit 
and will return offsets
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p1, new OffsetAndMetadata(0L, null));
@@ -2626,7 +2626,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(revokedActiveTask, 
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(revokedActiveTask, 
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
 
         when(consumer.assignment()).thenReturn(union(HashSet::new, 
taskId00Partitions, taskId01Partitions, taskId02Partitions));
 
@@ -2713,8 +2713,8 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(revokedActiveTask, 
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
-        when(tasks.tasks(Set.of(taskId00, 
taskId01))).thenReturn(Set.of(revokedActiveTask, 
unrevokedActiveTaskWithCommit));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(revokedActiveTask, 
unrevokedActiveTaskWithCommit, unrevokedActiveTaskWithoutCommit));
+        when(tasks.initializedTasks(Set.of(taskId00, 
taskId01))).thenReturn(Set.of(revokedActiveTask, 
unrevokedActiveTaskWithCommit));
 
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -2827,7 +2827,7 @@ public class TaskManagerTest {
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
 
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
 
         when(tasks.drainPendingTasksToInit()).thenReturn(emptySet());
         when(tasks.hasPendingTasksToInit()).thenReturn(false);
@@ -2863,7 +2863,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final Set<TopicPartition> newPartitionsSet = Set.of(t1p1);
 
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
         when(tasks.drainPendingTasksToInit()).thenReturn(emptySet());
         when(tasks.hasPendingTasksToInit()).thenReturn(false);
         when(tasks.updateActiveTaskInputPartitions(task00, 
newPartitionsSet)).thenReturn(true);
@@ -2999,7 +2999,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
 
@@ -3044,7 +3044,7 @@ public class TaskManagerTest {
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
 
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, 
task10));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02, task10));
 
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -3136,7 +3136,7 @@ public class TaskManagerTest {
         expectedCommittedOffsets.putAll(offsets01);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, 
task03));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02, task03));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3187,7 +3187,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02));
 
         when(task00.commitNeeded()).thenReturn(false);
         when(task01.commitNeeded()).thenReturn(true); // only task01 needs 
commit
@@ -3218,7 +3218,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
         when(stateUpdater.tasks()).thenReturn(Set.of(task01));
 
         final Map<TaskId, Set<TopicPartition>> assignmentActive = 
singletonMap(taskId00, taskId00Partitions);
@@ -3252,7 +3252,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        when(tasks.allNonFailedTasks()).thenReturn(Set.of(task00));
+        when(tasks.allNonFailedInitializedTasks()).thenReturn(Set.of(task00));
         when(stateUpdater.tasks()).thenReturn(Set.of(task01));
 
         // mock to remove standby task from state updater
@@ -3320,7 +3320,7 @@ public class TaskManagerTest {
         doThrow(new RuntimeException("KABOOM!")).when(task00).suspend();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3360,7 +3360,7 @@ public class TaskManagerTest {
         doThrow(new RuntimeException("oops"))
             .when(task02).suspend();
 
-        when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02));
 
         final RuntimeException exception = assertThrows(
             RuntimeException.class,
@@ -3397,7 +3397,7 @@ public class TaskManagerTest {
 
         doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).close();
 
-        when(tasks.activeTasks()).thenReturn(Set.of(task00));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
 
         final RuntimeException exception = assertThrows(
             RuntimeException.class,
@@ -3489,7 +3489,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02));
 
         final RuntimeException thrown = assertThrows(RuntimeException.class,
             () -> taskManager.handleRevocation(union(HashSet::new, 
taskId01Partitions, taskId02Partitions)));
@@ -3580,7 +3580,7 @@ public class TaskManagerTest {
 
         
when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask00)).thenReturn(Set.of());
         when(stateUpdater.standbyTasks()).thenReturn(Set.of(standbyTask00));
-        when(tasks.standbyTasks()).thenReturn(Set.of(standbyTask00));
+        
when(tasks.standbyInitializedTasks()).thenReturn(Set.of(standbyTask00));
 
         final CompletableFuture<StateUpdater.RemovedTaskResult> 
futureForStandbyTask = new CompletableFuture<>();
         when(stateUpdater.remove(taskId00)).thenReturn(futureForStandbyTask);
@@ -3800,7 +3800,7 @@ public class TaskManagerTest {
         when(task01.prepareCommit(true)).thenReturn(emptyMap());
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3896,7 +3896,7 @@ public class TaskManagerTest {
         when(task00.prepareCommit(true)).thenReturn(emptyMap());
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3924,7 +3924,7 @@ public class TaskManagerTest {
         when(task01.commitNeeded()).thenReturn(true);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3953,7 +3953,7 @@ public class TaskManagerTest {
         when(task01.prepareCommit(true)).thenReturn(offsets);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -3979,7 +3979,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task01, task02));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task01, task02));
 
         final StreamsProducer producer = mock(StreamsProducer.class);
         when(activeTaskCreator.streamsProducer()).thenReturn(producer);
@@ -4025,7 +4025,7 @@ public class TaskManagerTest {
         when(task00.prepareCommit(true)).thenThrow(new 
RuntimeException("opsh."));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4048,7 +4048,7 @@ public class TaskManagerTest {
         when(task01.prepareCommit(true)).thenThrow(new 
RuntimeException("opsh."));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4080,7 +4080,7 @@ public class TaskManagerTest {
             .thenReturn(singletonMap(t1p1, 17L));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4110,7 +4110,7 @@ public class TaskManagerTest {
             .thenReturn(singletonMap(t1p1, 17L));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4138,7 +4138,7 @@ public class TaskManagerTest {
         when(task00.purgeableOffsets()).thenReturn(singletonMap(t1p1, 5L));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4195,7 +4195,7 @@ public class TaskManagerTest {
         expectedCommittedOffsets.putAll(offsets1);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01, task02, 
task03));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02, task03));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4255,7 +4255,7 @@ public class TaskManagerTest {
             .thenReturn(false); // no more records
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00, task01));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00, 
task01));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4300,7 +4300,7 @@ public class TaskManagerTest {
             .thenReturn(false);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00, task01, task02));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00, task01, 
task02));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4327,7 +4327,7 @@ public class TaskManagerTest {
             .thenThrow(new TaskMigratedException("migrated", new 
RuntimeException("cause")));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4344,7 +4344,7 @@ public class TaskManagerTest {
         when(task00.process(anyLong())).thenThrow(new 
RuntimeException("oops"));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4365,7 +4365,7 @@ public class TaskManagerTest {
             .thenThrow(new TaskMigratedException("migrated", new 
RuntimeException("cause")));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4382,7 +4382,7 @@ public class TaskManagerTest {
         when(task00.maybePunctuateStreamTime()).thenThrow(new 
KafkaException("oops"));
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4400,7 +4400,7 @@ public class TaskManagerTest {
         when(task00.maybePunctuateSystemTime()).thenReturn(true);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.activeTasks()).thenReturn(Set.of(task00));
+        when(tasks.activeInitializedTasks()).thenReturn(Set.of(task00));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4436,7 +4436,7 @@ public class TaskManagerTest {
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
-        when(tasks.allTasks()).thenReturn(Set.of(task00));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00));
 
         try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(TaskManager.class)) {
             appender.setClassLogger(TaskManager.class, Level.DEBUG);
@@ -4644,7 +4644,7 @@ public class TaskManagerTest {
         when(task01.prepareCommit(true)).thenReturn(offsets);
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task01));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
@@ -4875,7 +4875,7 @@ public class TaskManagerTest {
             .build();
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(task00, task01));
+        when(tasks.allInitializedTasks()).thenReturn(Set.of(task00, task01));
 
         final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index 0887c982873..bb2865d223c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -75,25 +75,25 @@ public class TasksTest {
         tasks.addActiveTasks(Set.of(statefulTask, statelessTask));
         tasks.addStandbyTasks(Collections.singletonList(standbyTask));
 
-        assertEquals(statefulTask, tasks.task(statefulTask.id()));
-        assertEquals(statelessTask, tasks.task(statelessTask.id()));
-        assertEquals(standbyTask, tasks.task(standbyTask.id()));
-
-        assertEquals(Set.of(statefulTask, statelessTask), new 
HashSet<>(tasks.activeTasks()));
-        assertEquals(Set.of(standbyTask), new HashSet<>(tasks.standbyTasks()));
-        assertEquals(Set.of(statefulTask, statelessTask, standbyTask), 
tasks.allTasks());
-        assertEquals(Set.of(statefulTask, standbyTask), 
tasks.tasks(Set.of(statefulTask.id(), standbyTask.id())));
-        assertEquals(Set.of(statefulTask.id(), statelessTask.id(), 
standbyTask.id()), tasks.allTaskIds());
+        assertEquals(statefulTask, tasks.initializedTask(statefulTask.id()));
+        assertEquals(statelessTask, tasks.initializedTask(statelessTask.id()));
+        assertEquals(standbyTask, tasks.initializedTask(standbyTask.id()));
+
+        assertEquals(Set.of(statefulTask, statelessTask), new 
HashSet<>(tasks.activeInitializedTasks()));
+        assertEquals(Set.of(standbyTask), new 
HashSet<>(tasks.standbyInitializedTasks()));
+        assertEquals(Set.of(statefulTask, statelessTask, standbyTask), 
tasks.allInitializedTasks());
+        assertEquals(Set.of(statefulTask, standbyTask), 
tasks.initializedTasks(Set.of(statefulTask.id(), standbyTask.id())));
+        assertEquals(Set.of(statefulTask.id(), statelessTask.id(), 
standbyTask.id()), tasks.allInitializedTaskIds());
         assertEquals(
             mkMap(
                 mkEntry(statefulTask.id(), statefulTask),
                 mkEntry(statelessTask.id(), statelessTask),
                 mkEntry(standbyTask.id(), standbyTask)
             ),
-            tasks.allTasksPerId());
-        assertTrue(tasks.contains(statefulTask.id()));
-        assertTrue(tasks.contains(statelessTask.id()));
-        assertTrue(tasks.contains(statefulTask.id()));
+            tasks.allInitializedTasksPerId());
+        assertTrue(tasks.containsInitialized(statefulTask.id()));
+        assertTrue(tasks.containsInitialized(statelessTask.id()));
+        assertTrue(tasks.containsInitialized(statefulTask.id()));
     }
 
     @Test
@@ -192,12 +192,12 @@ public class TasksTest {
 
         tasks.addFailedTask(activeTask1);
 
-        assertEquals(activeTask1, tasks.task(TASK_0_0));
-        assertEquals(activeTask2, tasks.task(TASK_0_1));
-        assertTrue(tasks.allTasks().contains(activeTask1));
-        assertTrue(tasks.allTasks().contains(activeTask2));
-        assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
-        assertTrue(tasks.allNonFailedTasks().contains(activeTask2));
+        assertEquals(activeTask1, tasks.initializedTask(TASK_0_0));
+        assertEquals(activeTask2, tasks.initializedTask(TASK_0_1));
+        assertTrue(tasks.allInitializedTasks().contains(activeTask1));
+        assertTrue(tasks.allInitializedTasks().contains(activeTask2));
+        
assertFalse(tasks.allNonFailedInitializedTasks().contains(activeTask1));
+        assertTrue(tasks.allNonFailedInitializedTasks().contains(activeTask2));
     }
 
     @Test
@@ -207,11 +207,11 @@ public class TasksTest {
         tasks.addFailedTask(activeTask1);
 
         tasks.removeTask(activeTask1);
-        assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
-        assertFalse(tasks.allTasks().contains(activeTask1));
+        
assertFalse(tasks.allNonFailedInitializedTasks().contains(activeTask1));
+        assertFalse(tasks.allInitializedTasks().contains(activeTask1));
 
         tasks.addTask(activeTask1);
-        assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+        assertTrue(tasks.allNonFailedInitializedTasks().contains(activeTask1));
     }
 
     @Test
@@ -221,11 +221,11 @@ public class TasksTest {
         tasks.addFailedTask(activeTask1);
 
         tasks.clear();
-        assertFalse(tasks.allNonFailedTasks().contains(activeTask1));
-        assertFalse(tasks.allTasks().contains(activeTask1));
+        
assertFalse(tasks.allNonFailedInitializedTasks().contains(activeTask1));
+        assertFalse(tasks.allInitializedTasks().contains(activeTask1));
 
         tasks.addTask(activeTask1);
-        assertTrue(tasks.allNonFailedTasks().contains(activeTask1));
+        assertTrue(tasks.allNonFailedInitializedTasks().contains(activeTask1));
     }
 
     @Test
@@ -260,6 +260,6 @@ public class TasksTest {
 
         tasks.removeTask(activeTask1);
         assertFalse(tasks.pendingTasksToInit().contains(activeTask1));
-        assertFalse(tasks.allTasks().contains(activeTask1));
+        assertFalse(tasks.allInitializedTasks().contains(activeTask1));
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
index 0261ac99720..879f62f0b08 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java
@@ -69,8 +69,8 @@ public class DefaultTaskManagerTest {
     @BeforeEach
     public void setUp() {
         when(task.isProcessable(anyLong())).thenReturn(true);
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
-        when(tasks.task(taskId)).thenReturn(task);
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
+        when(tasks.initializedTask(taskId)).thenReturn(task);
     }
 
     @Test
@@ -94,14 +94,14 @@ public class DefaultTaskManagerTest {
         taskManager.add(Collections.singleton(task));
 
         verify(tasks).addTask(task);
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         assertEquals(1, taskManager.getTasks().size());
     }
 
     @Test
     public void shouldAssignTaskThatCanBeProcessed() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
 
         assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -145,7 +145,7 @@ public class DefaultTaskManagerTest {
         final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
         final Thread awaitingThread = new Thread(awaitingRunnable);
         awaitingThread.start();
-        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
 
         awaitingThread.interrupt();
 
@@ -160,7 +160,7 @@ public class DefaultTaskManagerTest {
         final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
         final Thread awaitingThread = new Thread(awaitingRunnable);
         awaitingThread.start();
-        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
 
         taskManager.signalTaskExecutors();
 
@@ -180,7 +180,7 @@ public class DefaultTaskManagerTest {
         final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
         final Thread awaitingThread = new Thread(awaitingRunnable);
         awaitingThread.start();
-        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
 
         taskManager.unassignTask(task, taskExecutor);
 
@@ -195,7 +195,7 @@ public class DefaultTaskManagerTest {
         final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
         final Thread awaitingThread = new Thread(awaitingRunnable);
         awaitingThread.start();
-        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
 
         taskManager.add(Collections.singleton(task));
 
@@ -212,7 +212,7 @@ public class DefaultTaskManagerTest {
         final AwaitingRunnable awaitingRunnable = new AwaitingRunnable();
         final Thread awaitingThread = new Thread(awaitingRunnable);
         awaitingThread.start();
-        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeTasks();
+        verify(tasks, 
timeout(VERIFICATION_TIMEOUT).atLeastOnce()).activeInitializedTasks();
 
         taskManager.unlockAllTasks();
 
@@ -225,7 +225,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldAssignTasksThatCanBeSystemTimePunctuated() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
         when(task.canPunctuateSystemTime()).thenReturn(true);
 
@@ -236,7 +236,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldAssignTasksThatCanBeStreamTimePunctuated() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(true);
         when(task.canPunctuateStreamTime()).thenReturn(true);
 
@@ -247,7 +247,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotAssignTasksIfUncaughtExceptionPresent() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         ensureTaskMakesProgress();
         taskManager.assignNextTask(taskExecutor);
         taskManager.setUncaughtException(new StreamsException("Exception"), 
taskId);
@@ -259,7 +259,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotAssignTasksForPunctuationIfPunctuationDisabled() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(false);
         when(task.canPunctuateStreamTime()).thenReturn(true);
         when(task.canPunctuateSystemTime()).thenReturn(true);
@@ -270,7 +270,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotAssignTasksForProcessingIfProcessingDisabled() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(false);
         when(task.isProcessable(anyLong())).thenReturn(true);
 
@@ -280,7 +280,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldUnassignTask() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
 
         assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -292,7 +292,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotUnassignNotOwnedTask() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
 
         assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -319,16 +319,16 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldRemoveTask() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
-        when(tasks.task(task.id())).thenReturn(task);
-        when(tasks.contains(task.id())).thenReturn(true);
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
+        when(tasks.initializedTask(task.id())).thenReturn(task);
+        when(tasks.containsInitialized(task.id())).thenReturn(true);
 
         taskManager.lockTasks(Collections.singleton(task.id()));
         taskManager.remove(task.id());
 
         verify(tasks).removeTask(task);
         reset(tasks);
-        when(tasks.activeTasks()).thenReturn(Collections.emptySet());
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.emptySet());
 
         assertEquals(0, taskManager.getTasks().size());
     }
@@ -336,10 +336,10 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotAssignLockedTask() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
-        when(tasks.task(task.id())).thenReturn(task);
-        when(tasks.contains(task.id())).thenReturn(true);
+        when(tasks.initializedTask(task.id())).thenReturn(task);
+        when(tasks.containsInitialized(task.id())).thenReturn(true);
 
         
assertTrue(taskManager.lockTasks(Collections.singleton(task.id())).isDone());
 
@@ -349,10 +349,10 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldLockAnEmptySetOfTasks() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
-        when(tasks.task(task.id())).thenReturn(task);
-        when(tasks.contains(task.id())).thenReturn(true);
+        when(tasks.initializedTask(task.id())).thenReturn(task);
+        when(tasks.containsInitialized(task.id())).thenReturn(true);
 
         assertTrue(taskManager.lockTasks(Collections.emptySet()).isDone());
 
@@ -365,10 +365,10 @@ public class DefaultTaskManagerTest {
         final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
 
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
-        when(tasks.task(task.id())).thenReturn(task);
-        when(tasks.contains(task.id())).thenReturn(true);
+        when(tasks.initializedTask(task.id())).thenReturn(task);
+        when(tasks.containsInitialized(task.id())).thenReturn(true);
         when(taskExecutor.unassign()).thenReturn(future);
 
         assertEquals(task, taskManager.assignNextTask(taskExecutor));
@@ -386,10 +386,10 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotAssignAnyLockedTask() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
-        when(tasks.task(task.id())).thenReturn(task);
-        when(tasks.contains(task.id())).thenReturn(true);
+        when(tasks.initializedTask(task.id())).thenReturn(task);
+        when(tasks.containsInitialized(task.id())).thenReturn(true);
 
         assertTrue(taskManager.lockAllTasks().isDone());
 
@@ -407,7 +407,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldNotSetUncaughtExceptionsTwice() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
         taskManager.assignNextTask(taskExecutor);
         taskManager.setUncaughtException(exception, task.id());
@@ -419,7 +419,7 @@ public class DefaultTaskManagerTest {
     @Test
     public void shouldReturnAndClearExceptionsOnDrainExceptions() {
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
         taskManager.assignNextTask(taskExecutor);
         taskManager.setUncaughtException(exception, task.id());
@@ -433,9 +433,9 @@ public class DefaultTaskManagerTest {
         final KafkaFutureImpl<StreamTask> future = new KafkaFutureImpl<>();
 
         taskManager.add(Collections.singleton(task));
-        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
-        when(tasks.task(task.id())).thenReturn(task);
-        when(tasks.contains(task.id())).thenReturn(true);
+        
when(tasks.activeInitializedTasks()).thenReturn(Collections.singleton(task));
+        when(tasks.initializedTask(task.id())).thenReturn(task);
+        when(tasks.containsInitialized(task.id())).thenReturn(true);
         when(taskExecutor.unassign()).thenReturn(future);
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
 


Reply via email to