ableegoldman commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r439555399
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -239,54 +240,15 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, } } - if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { - try { - for (final Task task : additionalTasksForCommitting) { - final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.prepareCommit(); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); - } - } - - commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); - - for (final Task task : additionalTasksForCommitting) { - task.postCommit(); - } - } catch (final RuntimeException e) { - log.error("Failed to batch commit tasks, " + - "will close all tasks involved in this commit as dirty by the end", e); - dirtyTasks.addAll(additionalTasksForCommitting); - dirtyTasks.addAll(tasksToClose); - - tasksToClose.clear(); - // Just add first taskId to re-throw by the end. - taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(), e); - } - } - - for (final Task task : tasksToClose) { - try { - completeTaskCloseClean(task); - cleanUpTaskProducer(task, taskCloseExceptions); - tasks.remove(task.id()); - } catch (final RuntimeException e) { - final String uncleanMessage = String.format("Failed to close task %s cleanly. Attempting to close remaining tasks before re-throwing:", task.id()); - log.error(uncleanMessage, e); - taskCloseExceptions.put(task.id(), e); - // We've already recorded the exception (which is the point of clean). - // Now, we should go ahead and complete the close because a half-closed task is no good to anyone. - dirtyTasks.add(task); - } - } - for (final Task oldTask : tasksToRecycle) { final Task newTask; try { if (oldTask.isActive()) { + // If active, the task should have already been suspended and committed during handleRevocation Review comment: Actually I forgot to update this. We should always call `suspend` since some active tasks may not have been suspended during `handleRevocation`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org