AnatolyPopov commented on code in PR #15165: URL: https://github.com/apache/kafka/pull/15165#discussion_r1619366626
########## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java: ########## @@ -254,6 +259,61 @@ public void testCanProcessRecord() throws InterruptedException { assertEquals(3, handler.metadataCounter); } + @Test + public void testCanReprocessSkippedRecords() throws InterruptedException { + final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg"); + final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new TopicPartition("sample", 0)); + final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new TopicPartition("sample", 1)); + final TopicIdPartition tpId3 = new TopicIdPartition(topicId, new TopicPartition("sample", 3)); + assertEquals(partitioner.metadataPartition(tpId0), partitioner.metadataPartition(tpId1)); + assertNotEquals(partitioner.metadataPartition(tpId3), partitioner.metadataPartition(tpId0)); + + final int metadataPartition = partitioner.metadataPartition(tpId0); + final int anotherMetadataPartition = partitioner.metadataPartition(tpId3); + + // Mocking the consumer to be able to wait for the second reassignment + doAnswer(invocation -> { Review Comment: Totally agree about the mock-of-a-mock approach, I don't like it either. But hopefully it's good enough for now and it helps to focus on changes related specifically to this bug in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org