ableegoldman commented on a change in pull request #8856: URL: https://github.com/apache/kafka/pull/8856#discussion_r441049817
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -409,32 +407,28 @@ public void resume() { return committableOffsets; } + /** + * This should only be called if the attempted commit succeeded for this task + */ @Override public void postCommit() { commitRequested = false; commitNeeded = false; switch (state()) { case RESTORING: - writeCheckpointIfNeed(); + case SUSPENDED: + maybeWriteCheckpoint(); break; case RUNNING: - if (!eosEnabled) { // if RUNNING, checkpoint only for non-eos - writeCheckpointIfNeed(); + if (!eosEnabled) { + maybeWriteCheckpoint(); } break; - case SUSPENDED: - writeCheckpointIfNeed(); - // we cannot `clear()` the `PartitionGroup` in `suspend()` already, but only after committing, - // because otherwise we loose the partition-time information - partitionGroup.clear(); Review comment: I see. So we should only clear it here, and not in `close` Just curious, why do we "forget" the current offset? I mean, haven't we just committed the current offset before suspending (and if that failed we would close all tasks right away). Maybe I'm misunderstanding what you mean ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org