lucasbru commented on code in PR #14281: URL: https://github.com/apache/kafka/pull/14281#discussion_r1340120008
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java: ########## @@ -168,9 +192,10 @@ private StreamTask unassignCurrentTask() { if (currentTask == null) throw new IllegalStateException("Does not own any task while being ask to unassign from task manager"); - // flush the task before giving it back to task manager - // TODO: we can add a separate function in StreamTask to just flush and not return offsets - currentTask.prepareCommit(); + // flush the task before giving it back to task manager, if we are not handing it back because of an error. + if (!taskManager.hasUncaughtException(currentTask.id())) { + currentTask.flush(); Review Comment: Yeah, this is basically a design choice that Guozhang did, and I'm unsure about his precise intentions. I kept it like this for now, but I'll definitely consider leaving this work in the polling task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org