ableegoldman commented on a change in pull request #11712:
URL: https://github.com/apache/kafka/pull/11712#discussion_r800029186



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##########
@@ -270,6 +278,23 @@ Task task(final TaskId taskId) {
         return readOnlyActiveTasks;
     }
 
+    List<Task> orderedActiveTasks() {
+        return Collections.unmodifiableList(orderedActiveTasks);
+    }
+
+    void moveActiveTasksToTailFor(final String topologyName) {
+        final List<Task> tasksToMove = new LinkedList<>();
+        final Iterator<Task> iterator = orderedActiveTasks.iterator();
+        while (iterator.hasNext()) {
+            final Task task = iterator.next();
+            if (task.id().topologyName().equals(topologyName)) {
+                iterator.remove();
+                tasksToMove.add(task);
+            }
+        }
+        orderedActiveTasks.addAll(tasksToMove);

Review comment:
       > So we need to take the tasks that we know will fail and process all 
the other tasks without them
   
   There's definitely an implicit assumption here about the exception being (a) 
deterministic, and (b) correlated directly to some aspect of that specific task 
(eg de/serialization exception, NPE from input with null field, authorization 
failed on its topics) --and not a system error that happened to hit during that 
task's processing (eg RocksDBException: too many open files, out of memory, etc)
   
   Not saying we need to account for this in the first pass, I do think it's 
reasonable to assume that reprocessing the failed task will result in the same 
error since that's definitely true for what I suspect are the most common or 
large majority of errors: like the de/serialization or invalid timestamp 
errors,  NPEs, etc. But it's worth keeping in mind especially when we roll this 
out and can get actual data on how reasonable these assumptions are
   
   On that note -- I should make sure to add some kind of logging that will 
allow us to count how often a failed task repeated the same error, or any kind 
of error. (Could even be a metric eventually?) In the mid-far future we should 
have some kind of finer-grained error classification implemented that we could 
lean on as a heuristic for whether to retry the task again immediately, backoff 
for a while, or even restart the runtime for fatal system errors (eg OOM)
   
   Anyways I'll file some tickets for all this in the V2+ milestone, just 
wanted to get us thinking about this sort of thing so we have some vision of 
the future optimized error handling mechanism to inform how we lay the 
groundwork now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to