mjsax commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r441047360
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -474,26 +468,24 @@ public void update(final Set<TopicPartition> topicPartitions, final Map<String, @Override public void closeAndRecycleState() { - suspend(); - prepareCommit(); - writeCheckpointIfNeed(); - switch (state()) { - case CREATED: case SUSPENDED: stateMgr.recycle(); recordCollector.close(); break; - case RESTORING: // we should have transitioned to `SUSPENDED` already - case RUNNING: // we should have transitioned to `SUSPENDED` already + case CREATED: + case RESTORING: + case RUNNING: case CLOSED: throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + id); default: throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + id); } + // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing, + // because otherwise we loose the partition-time information partitionGroup.clear(); Review comment: As we always suspend a task before closing (even for unclean closing), I think we can actually remove this call? (We only needed it before, because SUSPEND could be skipped. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org