ableegoldman commented on a change in pull request #11712: URL: https://github.com/apache/kafka/pull/11712#discussion_r800029186
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ########## @@ -270,6 +278,23 @@ Task task(final TaskId taskId) { return readOnlyActiveTasks; } + List<Task> orderedActiveTasks() { + return Collections.unmodifiableList(orderedActiveTasks); + } + + void moveActiveTasksToTailFor(final String topologyName) { + final List<Task> tasksToMove = new LinkedList<>(); + final Iterator<Task> iterator = orderedActiveTasks.iterator(); + while (iterator.hasNext()) { + final Task task = iterator.next(); + if (task.id().topologyName().equals(topologyName)) { + iterator.remove(); + tasksToMove.add(task); + } + } + orderedActiveTasks.addAll(tasksToMove); Review comment: > So we need to take the tasks that we know will fail and process all the other tasks without them There's definitely an implicit assumption here about the exception being (a) deterministic, and (b) correlated directly to some aspect of that specific task (eg de/serialization exception, NPE from input with null field, authorization failed on its topics) --and not a system error that happened to hit during that task's processing (eg RocksDBException: too many open files, out of memory, etc) Not saying we need to account for this in the first pass, I do think it's reasonable to assume that reprocessing the failed task will result in the same error since that's definitely true for what I suspect are the most common or large majority of errors: like the de/serialization or invalid timestamp errors, NPEs, etc. But it's worth keeping in mind especially when we roll this out and can get actual data on how reasonable these assumptions are On that note -- I should make sure to add some kind of logging that will allow us to count how often a failed task repeated the same error, or any kind of error. (Could even be a metric eventually?) In the mid-far future we should have some kind of finer-grained error classification implemented that we could lean on as a heuristic for whether to retry the task again immediately, backoff for a while, or even restart the runtime for fatal system errors (eg OOM) Anyways I'll file some tickets for all this in the V2+ milestone, just wanted to get us thinking about this sort of thing so we have some vision of the future optimized error handling mechanism to inform how we lay the groundwork now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org