This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7035f02a61a MINOR: Use wildcard generics instead of defensive copies
in TaskManager (#21528)
7035f02a61a is described below
commit 7035f02a61a2d6c1c8182c4509b8ccadc723fa70
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Feb 23 11:15:30 2026 -0500
MINOR: Use wildcard generics instead of defensive copies in TaskManager
(#21528)
Use `Collection<? extends Task>` return type with wildcard bounds
instead of creating `ArrayList` copies to handle generic covariance.
Reviewers: PoAn Yang <[email protected]>, Matthias Sax
<[email protected]>
---
.../kafka/streams/processor/internals/TaskManager.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index f94060d45e0..3888e4384ba 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -318,7 +318,7 @@ public class TaskManager {
}
}
- private Collection<Task> assignActiveTaskFromStartupState(final
Map<TaskId, Set<TopicPartition>> tasksToAssign) {
+ private Collection<? extends Task> assignActiveTaskFromStartupState(final
Map<TaskId, Set<TopicPartition>> tasksToAssign) {
if (stateDirectory.hasStartupTasks()) {
final Map<TaskId, Set<TopicPartition>> assignedTasks = new
HashMap<>(tasksToAssign.size());
for (final Map.Entry<TaskId, Set<TopicPartition>> entry :
tasksToAssign.entrySet()) {
@@ -327,13 +327,13 @@ public class TaskManager {
assignedTasks.put(taskId, entry.getValue());
}
}
- return new ArrayList<>(activeTaskCreator.createTasks(mainConsumer,
assignedTasks));
+ return activeTaskCreator.createTasks(mainConsumer, assignedTasks);
} else {
return Collections.emptySet();
}
}
- private Collection<Task> assignStartupTasks(final Map<TaskId,
Set<TopicPartition>> tasksToAssign) {
+ private Collection<? extends Task> assignStartupTasks(final Map<TaskId,
Set<TopicPartition>> tasksToAssign) {
if (stateDirectory.hasStartupTasks()) {
final Map<TaskId, Set<TopicPartition>> assignedTasks = new
HashMap<>(tasksToAssign.size());
for (final Map.Entry<TaskId, Set<TopicPartition>> entry :
tasksToAssign.entrySet()) {
@@ -343,7 +343,7 @@ public class TaskManager {
assignedTasks.put(taskId, inputPartitions);
}
}
- return new
ArrayList<>(standbyTaskCreator.createTasks(assignedTasks));
+ return standbyTaskCreator.createTasks(assignedTasks);
} else {
return Collections.emptySet();
}
@@ -512,11 +512,11 @@ public class TaskManager {
private void handleExistingStateForTasks(final Map<TaskId,
Set<TopicPartition>> activeTasksToCreate,
final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate) {
- final Collection<Task> activeTasks =
assignActiveTaskFromStartupState(activeTasksToCreate);
+ final Collection<? extends Task> activeTasks =
assignActiveTaskFromStartupState(activeTasksToCreate);
for (final Task activeTask : activeTasks) {
activeTasksToCreate.remove(activeTask.id());
}
- final Collection<Task> standbyTasks =
assignStartupTasks(standbyTasksToCreate);
+ final Collection<? extends Task> standbyTasks =
assignStartupTasks(standbyTasksToCreate);
for (final Task standbyTask : standbyTasks) {
standbyTasksToCreate.remove(standbyTask.id());
}