showuon commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889496255


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -514,10 +514,62 @@ public void 
testCoordinatorNotAvailableWithUserAssignedType() {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.coordinatorUnknown());
 
-        // should find an available node in next find coordinator request
+        // should not try to find coordinator since we are in manual assignment
+        // hence the prepared response should not be returned
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
+        assertTrue(coordinator.coordinatorUnknown());
+    }
+
+    @Test
+    public void testAutoCommitAsyncWithUserAssignedType() {
+        try (ConsumerCoordinator coordinator = 
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true, 
subscriptions)) {
+            subscriptions.assignFromUser(Collections.singleton(t1p));
+            // set timeout to 0 because we expect no requests sent
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+            assertFalse(client.hasInFlightRequests());
+
+            // elapse auto commit interval and set committable position
+            time.sleep(autoCommitIntervalMs);
+            subscriptions.seekUnvalidated(t1p, new 
SubscriptionState.FetchPosition(100L));
+
+            // should try to find coordinator since we are auto committing
+            coordinator.poll(time.timer(0));
+            assertTrue(coordinator.coordinatorUnknown());
+            assertTrue(client.hasInFlightRequests());
+
+            client.respond(groupCoordinatorResponse(node, Errors.NONE));
+            coordinator.poll(time.timer(0));
+            assertFalse(coordinator.coordinatorUnknown());
+            // after we've discovered the coordinator we should send
+            // out the commit request immediately
+            assertTrue(client.hasInFlightRequests());
+        }
+    }
+
+    @Test
+    public void testCommitAsyncWithUserAssignedType() {
+        subscriptions.assignFromUser(Collections.singleton(t1p));
+        // set timeout to 0 because we expect no requests sent
+        coordinator.poll(time.timer(0));
+        assertTrue(coordinator.coordinatorUnknown());
+        assertFalse(client.hasInFlightRequests());
+
+        // should try to find coordinator since we are commit async
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new 
OffsetAndMetadata(100L)), (offsets, exception) -> {
+            throw new AssertionError("Commit should not get responses");

Review Comment:
   nit: use `fail` instead, and we might need to log the callback parameters 
for troubleshooting. 
   ```
   fail("Commit should not get responses, but got offsets:" + offsets +", and 
exception:" + exception)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to