guozhangwang commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r1147064683
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -342,7 +379,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, maybeThrowTaskExceptions(taskCloseExceptions); - createNewTasks(activeTasksToCreate, standbyTasksToCreate); + final Collection<Task> newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate); + final Set<Task> activeTasksNeedCommit; + // Find new active tasks which need commit. + if (newActiveTasks == null || newActiveTasks.isEmpty()) { + activeTasksNeedCommit = new HashSet<>(); + } else { + activeTasksNeedCommit = newActiveTasks.stream().filter(Task::commitNeeded).collect(Collectors.toSet()); Review Comment: For newly created tasks, they should not have `commitNeeded` flag set. I think @ableegoldman 's comment was that the conditions are `(a) EOS is used (since this is a txn-based issue to begin with) and (b) if we have newly-added active tasks (as discussed previously)`, but maybe `(c) there is already a txn ongoing)`. Note that for EOS-v1, and EOS-v2, condition (c) is different (and that's why I was originally thinking to wait until we only have EOS-v2, but after some thinking I'm feeling more towards narrowing the scope of fix to EOS-v2 since it is the commonly used config now, and fixing EOS-v1 may incur unnecessary complexity with little value). For EOS-v2, we just need to check `threadProducer.hasInflighTxn`. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -298,6 +299,42 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina } } + private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) { Review Comment: Could we reuse `commitTasksAndMaybeUpdateCommittableOffsets` instead, to avoid code duplications? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org