jolshan commented on code in PR #14985: URL: https://github.com/apache/kafka/pull/14985#discussion_r1433329835
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java: ########## @@ -2414,6 +2424,67 @@ public void testReplayWithTombstone() { assertNull(context.offsetMetadataManager.offset("foo", "bar", 0)); } + @Test + public void testReplayTransactionEndMarkerWithCommit() { + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); + + // Add pending transactional commit for producer id 5. + verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new OffsetAndMetadata( + 100L, + OptionalInt.empty(), + "small", + context.time.milliseconds(), + OptionalLong.empty() + )); + + // Add pending transactional commit for producer id 6. + verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new OffsetAndMetadata( + 200L, + OptionalInt.empty(), + "small", + context.time.milliseconds(), + OptionalLong.empty() + )); + + // Replaying an end marker with an unknown producer id should not fail. + context.replayEndTransactionMarker(1L, TransactionResult.COMMIT); + + // Replaying an end marker to commit transaction of producer id 5. + context.replayEndTransactionMarker(5L, TransactionResult.COMMIT); + + // The pending offset is removed... + assertNull(context.offsetMetadataManager.pendingTransactionalOffset( + 5L, + "foo", + "bar", + 0 + )); + + // ... and added to the main offset storage. + assertEquals(new OffsetAndMetadata( + 100L, + OptionalInt.empty(), + "small", + context.time.milliseconds(), + OptionalLong.empty() + ), context.offsetMetadataManager.offset( + "foo", + "bar", + 0 + )); + + // Replaying an end marker to abort transaction of producer id 6. + context.replayEndTransactionMarker(6L, TransactionResult.ABORT); + + // The pending offset is removed. + assertNull(context.offsetMetadataManager.pendingTransactionalOffset( Review Comment: should we check that we didn't add to main offset storage? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org