AnatolyPopov commented on code in PR #15165:
URL: https://github.com/apache/kafka/pull/15165#discussion_r1619366626


##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTaskTest.java:
##########
@@ -254,6 +259,61 @@ public void testCanProcessRecord() throws 
InterruptedException {
         assertEquals(3, handler.metadataCounter);
     }
 
+    @Test
+    public void testCanReprocessSkippedRecords() throws InterruptedException {
+        final Uuid topicId = Uuid.fromString("Bp9TDduJRGa9Q5rlvCJOxg");
+        final TopicIdPartition tpId0 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 0));
+        final TopicIdPartition tpId1 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 1));
+        final TopicIdPartition tpId3 = new TopicIdPartition(topicId, new 
TopicPartition("sample", 3));
+        assertEquals(partitioner.metadataPartition(tpId0), 
partitioner.metadataPartition(tpId1));
+        assertNotEquals(partitioner.metadataPartition(tpId3), 
partitioner.metadataPartition(tpId0));
+
+        final int metadataPartition = partitioner.metadataPartition(tpId0);
+        final int anotherMetadataPartition = 
partitioner.metadataPartition(tpId3);
+
+        // Mocking the consumer to be able to wait for the second reassignment
+        doAnswer(invocation -> {

Review Comment:
   Totally agree about the mock-of-a-mock approach, I don't like it either. But 
hopefully it's good enough for now and it helps to focus on changes related 
specifically to this bug in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to