showuon commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889496255
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -514,10 +514,62 @@ public void
testCoordinatorNotAvailableWithUserAssignedType() {
coordinator.poll(time.timer(0));
assertTrue(coordinator.coordinatorUnknown());
- // should find an available node in next find coordinator request
+ // should not try to find coordinator since we are in manual assignment
+ // hence the prepared response should not be returned
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.poll(time.timer(Long.MAX_VALUE));
+ assertTrue(coordinator.coordinatorUnknown());
+ }
+
+ @Test
+ public void testAutoCommitAsyncWithUserAssignedType() {
+ try (ConsumerCoordinator coordinator =
buildCoordinator(rebalanceConfig, new Metrics(), assignors, true,
subscriptions)) {
+ subscriptions.assignFromUser(Collections.singleton(t1p));
+ // set timeout to 0 because we expect no requests sent
+ coordinator.poll(time.timer(0));
+ assertTrue(coordinator.coordinatorUnknown());
+ assertFalse(client.hasInFlightRequests());
+
+ // elapse auto commit interval and set committable position
+ time.sleep(autoCommitIntervalMs);
+ subscriptions.seekUnvalidated(t1p, new
SubscriptionState.FetchPosition(100L));
+
+ // should try to find coordinator since we are auto committing
+ coordinator.poll(time.timer(0));
+ assertTrue(coordinator.coordinatorUnknown());
+ assertTrue(client.hasInFlightRequests());
+
+ client.respond(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.poll(time.timer(0));
+ assertFalse(coordinator.coordinatorUnknown());
+ // after we've discovered the coordinator we should send
+ // out the commit request immediately
+ assertTrue(client.hasInFlightRequests());
+ }
+ }
+
+ @Test
+ public void testCommitAsyncWithUserAssignedType() {
+ subscriptions.assignFromUser(Collections.singleton(t1p));
+ // set timeout to 0 because we expect no requests sent
+ coordinator.poll(time.timer(0));
+ assertTrue(coordinator.coordinatorUnknown());
+ assertFalse(client.hasInFlightRequests());
+
+ // should try to find coordinator since we are commit async
+ coordinator.commitOffsetsAsync(singletonMap(t1p, new
OffsetAndMetadata(100L)), (offsets, exception) -> {
+ throw new AssertionError("Commit should not get responses");
Review Comment:
nit: use `fail` instead, and we might need to log the callback parameters
for troubleshooting.
```
fail("Commit should not get responses, but got offsets:" + offsets +", and
exception:" + exception)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]