dajac commented on a change in pull request #10965: URL: https://github.com/apache/kafka/pull/10965#discussion_r663739212
########## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ########## @@ -1532,9 +1525,10 @@ String coordinatorKey() { } @Override - public void handleResponse(AbstractResponse response) { + public void handleResponse(AbstractResponse response, short requestVersion) { FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response; CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType()); + boolean batchFindCoordinator = requestVersion >= FindCoordinatorRequest.MIN_BATCHED_VERSION; Review comment: The above suggestion would also us to avoid having to pass the `requestVersion` down here. ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ########## @@ -815,29 +813,22 @@ public void handle(SyncGroupResponse syncResponse, */ private RequestFuture<Void> sendFindCoordinatorRequest(Node node) { // initiate the group metadata request - log.debug("Sending FindCoordinator request to broker {} with batch={}", node, batchFindCoordinator); + log.debug("Sending FindCoordinator request to broker {}", node); FindCoordinatorRequestData data = new FindCoordinatorRequestData() .setKeyType(CoordinatorType.GROUP.id()); - if (batchFindCoordinator) { - data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId)); - } else { - data.setKey(this.rebalanceConfig.groupId); - } + data.setKey(this.rebalanceConfig.groupId); FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data); return client.send(node, requestBuilder) - .compose(new FindCoordinatorResponseHandler(batchFindCoordinator)); + .compose(new FindCoordinatorResponseHandler()); } private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> { - private boolean batch; - FindCoordinatorResponseHandler(boolean batch) { - this.batch = batch; - } @Override public void onSuccess(ClientResponse resp, RequestFuture<Void> future) { log.debug("Received FindCoordinator response {}", resp); + boolean batch = resp.requestHeader().apiVersion() >= FindCoordinatorRequest.MIN_BATCHED_VERSION; Review comment: How about adding a `coordinators` method to `FindCoordinatorResponse` which would either return the list of coordinators (`data.coordinators()`) if not empty or would return a list containing a `Coordinator` created from the top level information. That would remove all the `batch` checks below. ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3236,6 +3230,54 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { } } + @Test + public void testDeleteMultipleConsumerGroupsWithOlderBroker() throws Exception { + final List<String> groupIds = asList("group1", "group2"); + ApiVersion findCoordinatorV3 = new ApiVersion() + .setApiKey(ApiKeys.FIND_COORDINATOR.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 3); + ApiVersion describeGroups = new ApiVersion() + .setApiKey(ApiKeys.DESCRIBE_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion(ApiKeys.DELETE_GROUPS.latestVersion()); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions( + NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups))); + + // dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched Review comment: nit: `dummy` -> `Dummy`? ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3236,6 +3230,54 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception { } } + @Test + public void testDeleteMultipleConsumerGroupsWithOlderBroker() throws Exception { + final List<String> groupIds = asList("group1", "group2"); + ApiVersion findCoordinatorV3 = new ApiVersion() + .setApiKey(ApiKeys.FIND_COORDINATOR.id) + .setMinVersion((short) 0) + .setMaxVersion((short) 3); + ApiVersion describeGroups = new ApiVersion() + .setApiKey(ApiKeys.DESCRIBE_GROUPS.id) + .setMinVersion((short) 0) + .setMaxVersion(ApiKeys.DELETE_GROUPS.latestVersion()); + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions( + NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups))); + + // dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched + env.kafkaClient().prepareResponse(null); + //Retriable FindCoordinatorResponse errors should be retried Review comment: nit: Should we add a space before `Retriable`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org