hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r969035562


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -2283,6 +2307,37 @@ public void testRestOffsetsAuthorizationFailure() {
         assertEquals(5, subscriptions.position(tp0).offset);
     }
 
+    @Test
+    public void testPendingRevacationPartitionFetching() {

Review Comment:
   nit: Revocation is misspelled
   
   I did not find the name very clear. It looks like the main difference 
between this and `testFetchingPendingPartitions` is that this method tests that 
the pending state gets reset after reassignment? Perhaps the name should 
reflect that?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -272,6 +272,30 @@ public void testFetchNormal() {
         }
     }
 
+    @Test
+    public void testFetchingPendingPartitions() {
+        buildFetcher();
+
+        assignFromUser(singleton(tp0));
+        subscriptions.seek(tp0, 0);
+
+        // normal fetch
+        assertEquals(1, fetcher.sendFetches());
+        client.prepareResponse(fullFetchResponse(tidp0, this.records, 
Errors.NONE, 100L, 0));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        assertEquals(4L, subscriptions.position(tp0).offset); // this is the 
next fetching position
+
+        // mark partition unfetchable
+        subscriptions.markPendingRevocation(singleton(tp0));

Review Comment:
   Another scenario is that we already have the fetch inflight when we mark 
pending revocation. Can we test that as well?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java:
##########
@@ -256,6 +256,15 @@ public void partitionPause() {
         assertTrue(state.isFetchable(tp0));
     }
 
+    @Test
+    public void testMarkingPartitionPending() {
+        state.assignFromUser(singleton(tp0));
+        state.seek(tp0, 100);
+        assertTrue(state.isFetchable(tp0));
+        state.markPendingRevocation(singleton(tp0));
+        assertFalse(state.isFetchable(tp0));

Review Comment:
   Perhaps we can also assert `isPaused` is false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to