cadonna commented on a change in pull request #11712: URL: https://github.com/apache/kafka/pull/11712#discussion_r797611602
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ########## @@ -270,6 +278,23 @@ Task task(final TaskId taskId) { return readOnlyActiveTasks; } + List<Task> orderedActiveTasks() { + return Collections.unmodifiableList(orderedActiveTasks); + } + + void moveActiveTasksToTailFor(final String topologyName) { + final List<Task> tasksToMove = new LinkedList<>(); + final Iterator<Task> iterator = orderedActiveTasks.iterator(); + while (iterator.hasNext()) { + final Task task = iterator.next(); + if (task.id().topologyName().equals(topologyName)) { + iterator.remove(); + tasksToMove.add(task); + } + } + orderedActiveTasks.addAll(tasksToMove); Review comment: Are you proposing to commit tasks each time an exception occurs irrespectively of whether it is time to commit or not? Wouldn't it be simpler to commit when an exception happens instead of when the processing after an exception re-starts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org