jolshan commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1433329835


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -2414,6 +2424,67 @@ public void testReplayWithTombstone() {
         assertNull(context.offsetMetadataManager.offset("foo", "bar", 0));
     }
 
+    @Test
+    public void testReplayTransactionEndMarkerWithCommit() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Add pending transactional commit for producer id 5.
+        verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new 
OffsetAndMetadata(
+            100L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+
+        // Add pending transactional commit for producer id 6.
+        verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new 
OffsetAndMetadata(
+            200L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+
+        // Replaying an end marker with an unknown producer id should not fail.
+        context.replayEndTransactionMarker(1L, TransactionResult.COMMIT);
+
+        // Replaying an end marker to commit transaction of producer id 5.
+        context.replayEndTransactionMarker(5L, TransactionResult.COMMIT);
+
+        // The pending offset is removed...
+        assertNull(context.offsetMetadataManager.pendingTransactionalOffset(
+            5L,
+            "foo",
+            "bar",
+            0
+        ));
+
+        // ... and added to the main offset storage.
+        assertEquals(new OffsetAndMetadata(
+            100L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ), context.offsetMetadataManager.offset(
+            "foo",
+            "bar",
+            0
+        ));
+
+        // Replaying an end marker to abort transaction of producer id 6.
+        context.replayEndTransactionMarker(6L, TransactionResult.ABORT);
+
+        // The pending offset is removed.
+        assertNull(context.offsetMetadataManager.pendingTransactionalOffset(

Review Comment:
   should we check that we didn't add to main offset storage?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to