guozhangwang commented on a change in pull request #11362: URL: https://github.com/apache/kafka/pull/11362#discussion_r717027686
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -189,14 +189,13 @@ void handleCorruption(final Set<TaskId> corruptedTasks) { // We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort try { - commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasks() Review comment: No logical change here, just renamings. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ########## @@ -735,7 +735,6 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString()); expectLastCall().anyTimes(); expectRestoreToBeCompleted(consumer, changeLogReader); - consumer.commitSync(eq(emptyMap())); Review comment: These unit tests mocking are changed since originally we would send empty offsets. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -1084,26 +1083,27 @@ int commit(final Collection<Task> tasksToCommit) { * or if the task producer got fenced (EOS) * @throws TimeoutException if committing offsets failed due to TimeoutException (non-EOS) * @throws TaskCorruptedException if committing offsets failed due to TimeoutException (EOS) - * @param consumedOffsetsAndMetadataPerTask an empty map that will be filled in with the prepared offsets + * @param consumedOffsetsAndMetadata an empty map that will be filled in with the prepared offsets * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit */ - private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final Collection<Task> tasksToCommit, - final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask) { + private int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCommit, + final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) { if (rebalanceInProgress) { return -1; } int committed = 0; for (final Task task : tasksToCommit) { + // we need to call commitNeeded first since we need to update committable offsets if (task.commitNeeded()) { final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit(); - if (task.isActive()) { Review comment: We do not need to check on `task.isActive()` since for standby tasks, it would always return empty offset map. Instead we should just make sure that the map is not empty (for active tasks, they are possibly empty depending on their states) before adding to the map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org