guozhangwang commented on code in PR #12279: URL: https://github.com/apache/kafka/pull/12279#discussion_r905553649
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -206,6 +206,47 @@ public void shouldThrowIfCommittingOnIllegalState() { assertThrows(IllegalStateException.class, task::prepareCommit); } + + @Test + public void shouldAlwaysCheckpointStateIfEnforced() { + stateManager.flush(); + EasyMock.expectLastCall().once(); + stateManager.checkpoint(); + EasyMock.expectLastCall().once(); + EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes(); + EasyMock.replay(stateManager); + + task = createStandbyTask(); + + task.initializeIfNeeded(); + task.maybeCheckpoint(true); + + EasyMock.verify(stateManager); + } + + @Test + public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); + stateManager.flush(); + EasyMock.expectLastCall(); + stateManager.checkpoint(); + EasyMock.expectLastCall().once(); + EasyMock.expect(stateManager.changelogOffsets()) + .andReturn(Collections.singletonMap(partition, 50L)) + .andReturn(Collections.singletonMap(partition, 11000L)) + .andReturn(Collections.singletonMap(partition, 11000L)); + EasyMock.replay(stateManager); + + task = createStandbyTask(); + task.initializeIfNeeded(); + + task.maybeCheckpoint(false); // this should not checkpoint + task.maybeCheckpoint(false); // this should checkpoint + task.maybeCheckpoint(false); // this should not checkpoint Review Comment: Ack. I figured out a way without relying on `reset()`, and instead just checking on `offsetSnapshotSinceLastFlush` which is only updated if checkpoint is indeed executed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org