This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 923086dba24 KAFKA-19171: Kafka Streams crashes with
UnsupportedOperationException (#19507)
923086dba24 is described below
commit 923086dba246297c916ee43331459ca2ba6a595f
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu May 15 21:37:04 2025 -0700
KAFKA-19171: Kafka Streams crashes with UnsupportedOperationException
(#19507)
This PR fixes a regression bug introduced with KAFKA-17203. We need to
pass in mutable collections into `closeTaskClean(...)`.
Reviewers: Chia-Ping Tsai <[email protected]>, Bruno Cadonna
<[email protected]>, Lucas Brutschy <[email protected]>
---
.../apache/kafka/streams/processor/internals/TaskManager.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 7b360a7606d..8e51c4215ed 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -513,8 +513,14 @@ public class TaskManager {
private void handleTasksPendingInitialization() {
// All tasks pending initialization are not part of the usual
bookkeeping
+
+ final Set<Task> tasksToCloseDirty = new HashSet<>();
+
for (final Task task : tasks.drainPendingTasksToInit()) {
- closeTaskClean(task, Collections.emptySet(),
Collections.emptyMap());
+ closeTaskClean(task, tasksToCloseDirty, new HashMap<>());
+ }
+ for (final Task task : tasksToCloseDirty) {
+ closeTaskDirty(task, false);
}
}
@@ -1245,7 +1251,6 @@ public class TaskManager {
private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() {
if (stateUpdater != null) {
final Map<TaskId,
CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new
LinkedHashMap<>();
- final Map<TaskId, RuntimeException> failedTasksDuringCleanClose =
new HashMap<>();
final Set<Task> tasksToCloseClean = new
HashSet<>(tasks.drainPendingActiveTasksToInit());
final Set<Task> tasksToCloseDirty = new HashSet<>();
for (final Task restoringTask : stateUpdater.tasks()) {
@@ -1256,7 +1261,7 @@ public class TaskManager {
addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
for (final Task task : tasksToCloseClean) {
- closeTaskClean(task, tasksToCloseDirty,
failedTasksDuringCleanClose);
+ closeTaskClean(task, tasksToCloseDirty, new HashMap<>());
}
for (final Task task : tasksToCloseDirty) {
closeTaskDirty(task, false);