ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r460194062
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -243,18 +242,24 @@ public void handleAssignment(final Map<TaskId,
Set<TopicPartition>> activeTasks,
for (final Task task : tasksToClose) {
try {
- if (task.isActive()) {
- // Active tasks are revoked and suspended/committed during
#handleRevocation
- if (!task.state().equals(State.SUSPENDED)) {
- log.error("Active task {} should be suspended prior to
attempting to close but was in {}",
- task.id(), task.state());
- throw new IllegalStateException("Active task " +
task.id() + " should have been suspended");
- }
- } else {
- task.suspend();
- task.prepareCommit();
- task.postCommit();
+ // Always try to first suspend and commit the task before
closing it;
+ // some tasks may already be suspended which should be a no-op.
+ //
+ // Also since active tasks should already be suspended /
committed and
+ // standby tasks should have no offsets to commit, we should
expect nothing to commit
+ task.suspend();
+
+ final Map<TopicPartition, OffsetAndMetadata> offsets =
task.prepareCommit();
+
+ if (!offsets.isEmpty()) {
+ log.error("Task {} should has been committed prior to
attempting to close, but it reports non-empty offsets {} to commit",
Review comment:
I thought we would catch and save the first exception thrown by a
rebalance listener callback, and then rethrow after all rebalance callbacks
have been invoked? In this case that would mean `handleAssignment` would still
get called, and then we would throw an IllegalStateException and bail on the
rest of `handleAssignment` for no reason.
The IllegalStateException itself is not the problem, since only the first
exception (the TaskMigrated) would ultimately be thrown up to `poll`. But we
should still go through the rest of `handleAssignment` in order to properly
clean up the active tasks and manage the standbys (since we don't need to close
standbys in case of TaskMigrated)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]